blob: 72c8e47ab717ea1c843a3c7a4e5105feefde3788 [file] [log] [blame]
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -07001#
2# Copyright 2016 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
Zsolt Harasztie7b60762016-10-05 17:49:27 -070018gRPC client meant to connect to a gRPC server endpoint, and query the
19end-point's schema by calling SchemaService.Schema(NullMessage) and all of its
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070020semantics are derived from the recovered schema.
21"""
Zsolt Harasztie7b60762016-10-05 17:49:27 -070022
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070023import os
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070024import sys
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070025from random import randint
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070026from zlib import decompress
27
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -070028import grpc
29from consul import Consul
Zsolt Harasztie7b60762016-10-05 17:49:27 -070030from grpc._channel import _Rendezvous
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -070031from structlog import get_logger
Zsolt Harasztie7b60762016-10-05 17:49:27 -070032from twisted.internet import reactor
33from twisted.internet.defer import inlineCallbacks
34from werkzeug.exceptions import ServiceUnavailable
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -070035
Zsolt Harasztie7b60762016-10-05 17:49:27 -070036from chameleon.asleep import asleep
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -070037from chameleon.protos.schema_pb2 import NullMessage, SchemaServiceStub
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070038
39log = get_logger()
40
41
42class GrpcClient(object):
Zsolt Harasztie7b60762016-10-05 17:49:27 -070043 """
44 Connect to a gRPC server, fetch its schema, and process the downloaded
45 schema files to drive the customization of the north-bound interface(s)
46 of Chameleon.
47 """
48 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070049
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -070050 def __init__(self, consul_endpoint, work_dir, endpoint='localhost:50055'):
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070051 self.consul_endpoint = consul_endpoint
52 self.endpoint = endpoint
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -070053 self.work_dir = work_dir
54 self.plugin_dir = os.path.abspath(os.path.join(
55 os.path.dirname(__file__), '../protoc_plugins'))
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070056
57 self.channel = None
58 self.schema = None
Zsolt Harasztie7b60762016-10-05 17:49:27 -070059 self.retries = 0
60 self.on_reconnect = None
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070061 self.shutting_down = False
62
Zsolt Harasztie7b60762016-10-05 17:49:27 -070063 def run(self, on_reconnect=None):
64 self.on_reconnect = on_reconnect
65 reactor.callLater(0, self.connect)
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070066 return self
67
68 def shutdown(self):
69 if self.shutting_down:
70 return
71 self.shutting_down = True
72 pass
73
Zsolt Harasztie7b60762016-10-05 17:49:27 -070074 @inlineCallbacks
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070075 def connect(self):
Zsolt Harasztie7b60762016-10-05 17:49:27 -070076 """
77 (Re-)Connect to end-point
78 """
79
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070080 if self.shutting_down:
81 return
82
83 try:
84 if self.endpoint.startswith('@'):
Zsolt Harasztie7b60762016-10-05 17:49:27 -070085 _endpoint = self._get_endpoint_from_consul(self.endpoint[1:])
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -070086 else:
87 _endpoint = self.endpoint
88
89 log.info('connecting', endpoint=_endpoint)
90 self.channel = grpc.insecure_channel(_endpoint)
91
92 self._retrieve_schema()
93 self._compile_proto_files()
Zsolt Harasztie7b60762016-10-05 17:49:27 -070094 self._clear_backoff()
95
96 if self.on_reconnect is not None:
97 reactor.callLater(0, self.on_reconnect)
98
99 return
100
101 except _Rendezvous, e:
102 if e.code() == grpc.StatusCode.UNAVAILABLE:
103 log.info('grpc-endpoint-not-available')
104 else:
105 log.exception(e)
106 yield self._backoff('not-available')
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700107
108 except Exception, e:
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700109 if not self.shutting_down:
110 log.exception('cannot-connect', endpoint=_endpoint)
111 yield self._backoff('unknown-error')
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700112
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700113 reactor.callLater(0, self.connect)
114
115 def _backoff(self, msg):
116 wait_time = self.RETRY_BACKOFF[min(self.retries,
117 len(self.RETRY_BACKOFF) - 1)]
118 self.retries += 1
119 log.error(msg, retry_in=wait_time)
120 return asleep(wait_time)
121
122 def _clear_backoff(self):
123 if self.retries:
124 log.info('reconnected', after_retries=self.retries)
125 self.retries = 0
126
127 def _get_endpoint_from_consul(self, service_name):
128 """
129 Look up an appropriate grpc endpoint (host, port) from
130 consul, under the service name specified by service-name
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700131 """
132 host = self.consul_endpoint.split(':')[0].strip()
133 port = int(self.consul_endpoint.split(':')[1].strip())
134
135 consul = Consul(host=host, port=port)
136 _, services = consul.catalog.service(service_name)
137
138 if len(services) == 0:
139 raise Exception('Cannot find service %s in consul' % service_name)
140
141 # pick a random entry
142 # TODO should we prefer local IP addresses? Probably.
143
144 service = services[randint(0, len(services) - 1)]
145 endpoint = '{}:{}'.format(service['ServiceAddress'],
146 service['ServicePort'])
147 return endpoint
148
149 def _retrieve_schema(self):
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700150 """
151 Retrieve schema from gRPC end-point, and save all *.proto files in
152 the work directory.
153 """
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700154 assert isinstance(self.channel, grpc.Channel)
155 stub = SchemaServiceStub(self.channel)
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700156 # try:
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700157 schema = stub.GetSchema(NullMessage())
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700158 # except _Rendezvous, e:
159 # if e.code == grpc.StatusCode.UNAVAILABLE:
160 #
161 # else:
162 # raise e
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700163
164 os.system('mkdir -p %s' % self.work_dir)
165 os.system('rm -fr /tmp/%s/*' %
166 self.work_dir.replace('/tmp/', '')) # safer
167
168 for fname in schema.protos:
169 content = schema.protos[fname]
170 log.debug('saving-proto',
171 fname=fname, dir=self.work_dir, length=len(content))
172 with open(os.path.join(self.work_dir, fname), 'w') as f:
173 f.write(content)
174
175 for fname in schema.descriptors:
176 content = decompress(schema.descriptors[fname])
177 log.debug('saving-descriptor',
178 fname=fname, dir=self.work_dir, length=len(content))
179 with open(os.path.join(self.work_dir, fname), 'wb') as f:
180 f.write(content)
181
182 def _compile_proto_files(self):
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700183 """
184 For each *.proto file in the work directory, compile the proto
185 file into the respective *_pb2.py file as well as generate the
186 web server gateway python file *_gw.py.
187 :return: None
188 """
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700189 google_api_dir = os.path.abspath(os.path.join(
190 os.path.dirname(__file__),
191 '../protos/third_party'
192 ))
193
194 for fname in [f for f in os.listdir(self.work_dir)
195 if f.endswith('.proto')]:
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700196
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700197 log.debug('compiling', file=fname)
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700198 cmd = (
199 'cd %s && '
Zsolt Haraszti05b837a2016-10-05 00:18:57 -0700200 'env PATH=%s PYTHONPATH=%s '
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700201 'python -m grpc.tools.protoc '
202 '-I. '
203 '-I%s '
204 '--python_out=. '
205 '--grpc_python_out=. '
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -0700206 '--plugin=protoc-gen-gw=%s/gw_gen.py '
207 '--gw_out=. '
208 '%s' % (
209 self.work_dir,
210 ':'.join([os.environ['PATH'], self.plugin_dir]),
211 google_api_dir,
Zsolt Haraszti05b837a2016-10-05 00:18:57 -0700212 google_api_dir,
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -0700213 self.plugin_dir,
214 fname)
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700215 )
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700216 log.debug('executing', cmd=cmd, file=fname)
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700217 os.system(cmd)
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700218 log.info('compiled', file=fname)
Zsolt Harasztia9a12dc2016-09-27 13:48:35 -0700219
220 # test-load each _pb2 file to see all is right
221 if self.work_dir not in sys.path:
222 sys.path.insert(0, self.work_dir)
223
224 for fname in [f for f in os.listdir(self.work_dir)
225 if f.endswith('_pb2.py')]:
226 modname = fname[:-len('.py')]
227 log.debug('test-import', modname=modname)
228 _ = __import__(modname)
Zsolt Haraszti3d55ffc2016-10-03 22:26:41 -0700229
230 def invoke(self, stub, method_name, request):
Zsolt Harasztie7b60762016-10-05 17:49:27 -0700231 """
232 Invoke a gRPC call to the remote server and return the response.
233 :param stub: Reference to the *_pb2 service stub
234 :param method_name: The method name inside the service stub
235 :param request: The request protobuf message
236 :return: The response protobuf message
237 """
238
239 if self.channel is None:
240 raise ServiceUnavailable()
241
242 try:
243 response = getattr(stub(self.channel), method_name)(request)
244 return response
245
246 except grpc._channel._Rendezvous, e:
247 if e.code() == grpc.StatusCode.UNAVAILABLE:
248 e = ServiceUnavailable()
249 else:
250 log.exception(e)
251
252 self.channel = None
253 reactor.callLater(0, self.connect)
254
255 raise e