blob: 2ce8d7c52ad3e32afb3ac7182f90ee04ef9c477f [file] [log] [blame]
Zsolt Haraszti5cd64702016-09-27 13:48:35 -07001#
2# Copyright 2016 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18gRPC client meant to connect to a gRPC server endpoint,
19and query the end-point's schema by calling
20SchemaService.Schema(NullMessage) and all of its
21semantics are derived from the recovered schema.
22"""
23import os
24
25import grpc
26import sys
27from consul import Consul
28from random import randint
29from structlog import get_logger
30from zlib import decompress
31
32from chameleon.protos.schema_pb2 import NullMessage, Schema, SchemaServiceStub
33
34log = get_logger()
35
36
37class GrpcClient(object):
38
39 def __init__(self, consul_endpoint, endpoint='localhost:50055'):
40 self.consul_endpoint = consul_endpoint
41 self.endpoint = endpoint
42 self.work_dir = '/tmp/chameleon'
43
44 self.channel = None
45 self.schema = None
46
47 self.shutting_down = False
48
49 def run(self):
50 self.connect()
51 return self
52
53 def shutdown(self):
54 if self.shutting_down:
55 return
56 self.shutting_down = True
57 pass
58
59 def connect(self):
60 """(re-)connect to end-point"""
61 if self.shutting_down:
62 return
63
64 try:
65 if self.endpoint.startswith('@'):
66 _endpoint = self.get_endpoint_from_consul(self.endpoint[1:])
67 else:
68 _endpoint = self.endpoint
69
70 log.info('connecting', endpoint=_endpoint)
71 self.channel = grpc.insecure_channel(_endpoint)
72
73 self._retrieve_schema()
74 self._compile_proto_files()
75
76 except Exception, e:
77 log.exception('cannot-connect', endpoint=_endpoint)
78
79 def get_endpoint_from_consul(self, service_name):
80 """Look up an appropriate grpc endpoint (host, port) from
81 consul, under the service name specified by service-name
82 """
83 host = self.consul_endpoint.split(':')[0].strip()
84 port = int(self.consul_endpoint.split(':')[1].strip())
85
86 consul = Consul(host=host, port=port)
87 _, services = consul.catalog.service(service_name)
88
89 if len(services) == 0:
90 raise Exception('Cannot find service %s in consul' % service_name)
91
92 # pick a random entry
93 # TODO should we prefer local IP addresses? Probably.
94
95 service = services[randint(0, len(services) - 1)]
96 endpoint = '{}:{}'.format(service['ServiceAddress'],
97 service['ServicePort'])
98 return endpoint
99
100 def _retrieve_schema(self):
101 """Retrieve schema from gRPC end-point"""
102 assert isinstance(self.channel, grpc.Channel)
103 stub = SchemaServiceStub(self.channel)
104 schema = stub.GetSchema(NullMessage())
105
106 os.system('mkdir -p %s' % self.work_dir)
107 os.system('rm -fr /tmp/%s/*' %
108 self.work_dir.replace('/tmp/', '')) # safer
109
110 for fname in schema.protos:
111 content = schema.protos[fname]
112 log.debug('saving-proto',
113 fname=fname, dir=self.work_dir, length=len(content))
114 with open(os.path.join(self.work_dir, fname), 'w') as f:
115 f.write(content)
116
117 for fname in schema.descriptors:
118 content = decompress(schema.descriptors[fname])
119 log.debug('saving-descriptor',
120 fname=fname, dir=self.work_dir, length=len(content))
121 with open(os.path.join(self.work_dir, fname), 'wb') as f:
122 f.write(content)
123
124 def _compile_proto_files(self):
125
126 google_api_dir = os.path.abspath(os.path.join(
127 os.path.dirname(__file__),
128 '../protos/third_party'
129 ))
130
131 for fname in [f for f in os.listdir(self.work_dir)
132 if f.endswith('.proto')]:
133 log.info('compiling', file=fname)
134
135 cmd = (
136 'cd %s && '
137 'python -m grpc.tools.protoc '
138 '-I. '
139 '-I%s '
140 '--python_out=. '
141 '--grpc_python_out=. '
142 '%s' % (self.work_dir, google_api_dir, fname)
143 )
144 os.system(cmd)
145
146 # test-load each _pb2 file to see all is right
147 if self.work_dir not in sys.path:
148 sys.path.insert(0, self.work_dir)
149
150 for fname in [f for f in os.listdir(self.work_dir)
151 if f.endswith('_pb2.py')]:
152 modname = fname[:-len('.py')]
153 log.debug('test-import', modname=modname)
154 _ = __import__(modname)