blob: c2d66db073669953d61140bd60b8495e868a812b [file] [log] [blame]
Zsolt Haraszti5cd64702016-09-27 13:48:35 -07001#
2# Copyright 2016 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18gRPC client meant to connect to a gRPC server endpoint,
19and query the end-point's schema by calling
20SchemaService.Schema(NullMessage) and all of its
21semantics are derived from the recovered schema.
22"""
23import os
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070024import sys
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070025from random import randint
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070026from zlib import decompress
27
Zsolt Haraszti034db372016-10-03 22:26:41 -070028import grpc
29from consul import Consul
30from structlog import get_logger
31
32from chameleon.protos.schema_pb2 import NullMessage, SchemaServiceStub
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070033
34log = get_logger()
35
36
37class GrpcClient(object):
38
Zsolt Haraszti034db372016-10-03 22:26:41 -070039 def __init__(self, consul_endpoint, work_dir, endpoint='localhost:50055'):
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070040 self.consul_endpoint = consul_endpoint
41 self.endpoint = endpoint
Zsolt Haraszti034db372016-10-03 22:26:41 -070042 self.work_dir = work_dir
43 self.plugin_dir = os.path.abspath(os.path.join(
44 os.path.dirname(__file__), '../protoc_plugins'))
Zsolt Haraszti5cd64702016-09-27 13:48:35 -070045
46 self.channel = None
47 self.schema = None
48
49 self.shutting_down = False
50
51 def run(self):
52 self.connect()
53 return self
54
55 def shutdown(self):
56 if self.shutting_down:
57 return
58 self.shutting_down = True
59 pass
60
61 def connect(self):
62 """(re-)connect to end-point"""
63 if self.shutting_down:
64 return
65
66 try:
67 if self.endpoint.startswith('@'):
68 _endpoint = self.get_endpoint_from_consul(self.endpoint[1:])
69 else:
70 _endpoint = self.endpoint
71
72 log.info('connecting', endpoint=_endpoint)
73 self.channel = grpc.insecure_channel(_endpoint)
74
75 self._retrieve_schema()
76 self._compile_proto_files()
77
78 except Exception, e:
79 log.exception('cannot-connect', endpoint=_endpoint)
80
81 def get_endpoint_from_consul(self, service_name):
82 """Look up an appropriate grpc endpoint (host, port) from
83 consul, under the service name specified by service-name
84 """
85 host = self.consul_endpoint.split(':')[0].strip()
86 port = int(self.consul_endpoint.split(':')[1].strip())
87
88 consul = Consul(host=host, port=port)
89 _, services = consul.catalog.service(service_name)
90
91 if len(services) == 0:
92 raise Exception('Cannot find service %s in consul' % service_name)
93
94 # pick a random entry
95 # TODO should we prefer local IP addresses? Probably.
96
97 service = services[randint(0, len(services) - 1)]
98 endpoint = '{}:{}'.format(service['ServiceAddress'],
99 service['ServicePort'])
100 return endpoint
101
102 def _retrieve_schema(self):
103 """Retrieve schema from gRPC end-point"""
104 assert isinstance(self.channel, grpc.Channel)
105 stub = SchemaServiceStub(self.channel)
106 schema = stub.GetSchema(NullMessage())
107
108 os.system('mkdir -p %s' % self.work_dir)
109 os.system('rm -fr /tmp/%s/*' %
110 self.work_dir.replace('/tmp/', '')) # safer
111
112 for fname in schema.protos:
113 content = schema.protos[fname]
114 log.debug('saving-proto',
115 fname=fname, dir=self.work_dir, length=len(content))
116 with open(os.path.join(self.work_dir, fname), 'w') as f:
117 f.write(content)
118
119 for fname in schema.descriptors:
120 content = decompress(schema.descriptors[fname])
121 log.debug('saving-descriptor',
122 fname=fname, dir=self.work_dir, length=len(content))
123 with open(os.path.join(self.work_dir, fname), 'wb') as f:
124 f.write(content)
125
126 def _compile_proto_files(self):
127
128 google_api_dir = os.path.abspath(os.path.join(
129 os.path.dirname(__file__),
130 '../protos/third_party'
131 ))
132
133 for fname in [f for f in os.listdir(self.work_dir)
134 if f.endswith('.proto')]:
135 log.info('compiling', file=fname)
136
137 cmd = (
138 'cd %s && '
Zsolt Haraszti15044082016-10-05 00:18:57 -0700139 'env PATH=%s PYTHONPATH=%s '
Zsolt Haraszti5cd64702016-09-27 13:48:35 -0700140 'python -m grpc.tools.protoc '
141 '-I. '
142 '-I%s '
143 '--python_out=. '
144 '--grpc_python_out=. '
Zsolt Haraszti034db372016-10-03 22:26:41 -0700145 '--plugin=protoc-gen-gw=%s/gw_gen.py '
146 '--gw_out=. '
147 '%s' % (
148 self.work_dir,
149 ':'.join([os.environ['PATH'], self.plugin_dir]),
150 google_api_dir,
Zsolt Haraszti15044082016-10-05 00:18:57 -0700151 google_api_dir,
Zsolt Haraszti034db372016-10-03 22:26:41 -0700152 self.plugin_dir,
153 fname)
Zsolt Haraszti5cd64702016-09-27 13:48:35 -0700154 )
Zsolt Haraszti034db372016-10-03 22:26:41 -0700155 log.debug('executing', cmd=cmd)
Zsolt Haraszti5cd64702016-09-27 13:48:35 -0700156 os.system(cmd)
157
158 # test-load each _pb2 file to see all is right
159 if self.work_dir not in sys.path:
160 sys.path.insert(0, self.work_dir)
161
162 for fname in [f for f in os.listdir(self.work_dir)
163 if f.endswith('_pb2.py')]:
164 modname = fname[:-len('.py')]
165 log.debug('test-import', modname=modname)
166 _ = __import__(modname)
Zsolt Haraszti034db372016-10-03 22:26:41 -0700167
168 def invoke(self, stub, method_name, request):
169 response = getattr(stub(self.channel), method_name)(request)
170 return response