blob: d65410e820526ebe2f8f0c65e9e31497abb7f4af [file] [log] [blame]
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -05001#
2# Copyright 2016 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18gRPC client meant to connect to a gRPC server endpoint, and query the
19end-point's schema by calling SchemaService.Schema(Empty) and all of its
20semantics are derived from the recovered schema.
21"""
22
23import os
24import sys
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -050025from zlib import decompress
26
27import grpc
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -050028from grpc._channel import _Rendezvous
29from structlog import get_logger
30from twisted.internet import reactor
31from twisted.internet.defer import inlineCallbacks, returnValue
32from werkzeug.exceptions import ServiceUnavailable
33
34from common.utils.asleep import asleep
35from netconf.protos import third_party
36from netconf.protos.schema_pb2 import SchemaServiceStub
37from google.protobuf.empty_pb2 import Empty
38from common.utils.consulhelpers import get_endpoint_from_consul
Khen Nursimulu7626ce12016-12-21 11:51:46 -050039from netconf.protos.voltha_pb2 import VolthaLocalServiceStub, \
Khen Nursimulue0d53f82016-12-14 11:05:44 -080040 VolthaGlobalServiceStub
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -050041from google.protobuf import empty_pb2
42from google.protobuf.json_format import MessageToDict, ParseDict
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -050043
44log = get_logger()
45
46
47class GrpcClient(object):
48 """
49 Connect to a gRPC server, fetch its schema, and process the downloaded
50 proto schema files. The goal is to convert the proto schemas into yang
51 schemas which would be exposed to the Netconf client.
52 """
53 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
54
55 def __init__(self, consul_endpoint, work_dir,
56 grpc_endpoint='localhost:50055',
57 reconnect_callback=None,
58 on_start_callback=None):
59 self.consul_endpoint = consul_endpoint
60 self.grpc_endpoint = grpc_endpoint
61 self.work_dir = work_dir
62 self.reconnect_callback = reconnect_callback
63 self.on_start_callback = on_start_callback
64
65 self.plugin_dir = os.path.abspath(os.path.join(
66 os.path.dirname(__file__), '../protoc_plugins'))
67
68 self.channel = None
69 self.local_stub = None
70 self.schema = None
71 self.retries = 0
72 self.shutting_down = False
73 self.connected = False
74
75 def start(self):
76 log.debug('starting')
77 if not self.connected:
78 reactor.callLater(0, self.connect)
79 log.info('started')
80 return self
81
82 def stop(self):
83 log.debug('stopping')
84 if self.shutting_down:
85 return
86 self.shutting_down = True
87 log.info('stopped')
88
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -050089 def set_on_start_callback(self, on_start_callback):
90 self.on_start_callback = on_start_callback
91 return self
92
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -050093 def set_reconnect_callback(self, reconnect_callback):
94 self.reconnect_callback = reconnect_callback
95 return self
96
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -050097 def resolve_endpoint(self, endpoint):
98 ip_port_endpoint = endpoint
99 if endpoint.startswith('@'):
100 try:
101 ip_port_endpoint = get_endpoint_from_consul(
102 self.consul_endpoint, endpoint[1:])
103 log.info('endpoint-found',
104 endpoint=endpoint, ip_port=ip_port_endpoint)
105 except Exception as e:
106 log.error('service-not-found-in-consul', endpoint=endpoint,
107 exception=repr(e))
108 return None, None
109 if ip_port_endpoint:
110 host, port = ip_port_endpoint.split(':', 2)
111 return host, int(port)
112
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500113 @inlineCallbacks
114 def connect(self):
115 """
116 (Re-)Connect to end-point
117 """
118 if self.shutting_down or self.connected:
119 return
120
121 try:
122 host, port = self.resolve_endpoint(self.grpc_endpoint)
123
124 # If host and port is not set then we will retry
125 if host and port:
126 log.info('grpc-endpoint-connecting', host=host, port=port)
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500127 self.channel = grpc.insecure_channel(
128 '{}:{}'.format(host, port))
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500129
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500130 yang_from = self._retrieve_schema()
131 log.info('proto-to-yang-schema', file=yang_from)
132 self._compile_proto_files(yang_from)
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500133 self._clear_backoff()
134
135 if self.on_start_callback is not None:
136 reactor.callLater(0, self.on_start_callback)
137
138 self.connected = True
139 if self.reconnect_callback is not None:
140 reactor.callLater(0, self.reconnect_callback)
141
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500142 # self.local_stub = voltha_pb2.VolthaLocalServiceStub(self.channel)
143 # self.global_stub = voltha_pb2.VolthaGlobalServiceStub(self.channel)
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500144
145 return
146
147 except _Rendezvous, e:
148 if e.code() == grpc.StatusCode.UNAVAILABLE:
149 log.info('grpc-endpoint-not-available')
150 else:
151 log.exception(e)
152 yield self._backoff('not-available')
153
154 except Exception, e:
155 if not self.shutting_down:
156 log.exception('cannot-connect', endpoint=_endpoint)
157 yield self._backoff('unknown-error')
158
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500159 reactor.callLater(1, self.connect)
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500160
161 def _backoff(self, msg):
162 wait_time = self.RETRY_BACKOFF[min(self.retries,
163 len(self.RETRY_BACKOFF) - 1)]
164 self.retries += 1
165 log.error(msg, retry_in=wait_time)
166 return asleep(wait_time)
167
168 def _clear_backoff(self):
169 if self.retries:
170 log.info('reconnected', after_retries=self.retries)
171 self.retries = 0
172
173 def _retrieve_schema(self):
174 """
175 Retrieve schema from gRPC end-point, and save all *.proto files in
176 the work directory.
177 """
178 assert isinstance(self.channel, grpc.Channel)
179 stub = SchemaServiceStub(self.channel)
180 # try:
181 schemas = stub.GetSchema(Empty())
182 # except _Rendezvous, e:
183 # if e.code == grpc.StatusCode.UNAVAILABLE:
184 #
185 # else:
186 # raise e
187
188 os.system('mkdir -p %s' % self.work_dir)
189 os.system('rm -fr /tmp/%s/*' %
190 self.work_dir.replace('/tmp/', '')) # safer
191
192 for proto_file in schemas.protos:
193 proto_fname = proto_file.file_name
194 # TODO: Do we need to process a set of files using a prefix
195 # instead of just one?
196 proto_content = proto_file.proto
197 log.info('saving-proto', fname=proto_fname, dir=self.work_dir,
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500198 length=len(proto_content))
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500199 with open(os.path.join(self.work_dir, proto_fname), 'w') as f:
200 f.write(proto_content)
201
202 desc_content = decompress(proto_file.descriptor)
203 desc_fname = proto_fname.replace('.proto', '.desc')
204 log.info('saving-descriptor', fname=desc_fname, dir=self.work_dir,
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500205 length=len(desc_content))
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500206 with open(os.path.join(self.work_dir, desc_fname), 'wb') as f:
207 f.write(desc_content)
208 return schemas.yang_from
209
210 def _compile_proto_files(self, yang_from):
211 """
212 For each *.proto file in the work directory, compile the proto
213 file into the respective *_pb2.py file as well as generate the
214 corresponding yang schema.
215 :return: None
216 """
217 log.info('start')
218 google_api_dir = os.path.abspath(os.path.join(
219 os.path.dirname(__file__), '../protos/third_party'
220 ))
221
222 log.info('google-api', api_dir=google_api_dir)
223
224 netconf_base_dir = os.path.abspath(os.path.join(
225 os.path.dirname(__file__), '../..'
226 ))
227 log.info('netconf-dir', dir=netconf_base_dir)
228
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500229 for fname in [f for f in os.listdir(self.work_dir)
230 if f.endswith('.proto')]:
231 log.info('filename', file=fname)
232
233 need_yang = fname == yang_from
234 log.debug('compiling',
235 file=fname,
236 yang_schema_required=need_yang)
237 cmd = (
238 'cd %s && '
239 'env PATH=%s PYTHONPATH=%s '
240 'python -m grpc.tools.protoc '
241 '-I. '
242 '-I%s '
243 '--python_out=. '
244 '--grpc_python_out=. '
245 '--plugin=protoc-gen-custom=%s/proto2yang.py '
246 '%s'
247 '%s' % (
248 self.work_dir,
249 ':'.join([os.environ['PATH'], self.plugin_dir]),
250 ':'.join([google_api_dir, netconf_base_dir]),
251 google_api_dir,
252 self.plugin_dir,
253 '--custom_out=. ' if need_yang else '',
254 fname)
255 )
256 log.debug('executing', cmd=cmd, file=fname)
257 os.system(cmd)
258 log.info('compiled', file=fname)
259
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500260 # # test-load each _pb2 file to see all is right
261 # if self.work_dir not in sys.path:
262 # sys.path.insert(0, self.work_dir)
263 #
264 # for fname in [f for f in os.listdir(self.work_dir)
265 # if f.endswith('_pb2.py')]:
266 # modname = fname[:-len('.py')]
267 # log.debug('test-import', modname=modname)
268 # _ = __import__(modname)
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500269
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500270 # TODO: find a different way to test the generated yang files
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500271
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500272 # TODO: should be generated code
273 # Focus for now is issuing a GET request for VolthaGlobalService or VolthaLocalService
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800274 @inlineCallbacks
275 def invoke_voltha_api(self, key):
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500276 # TODO: This should be part of a parameter request
277 depth = [('get-depth', '-1')]
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800278 try:
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500279 data = {}
280 req = ParseDict(data, empty_pb2.Empty())
281 service_method = key.split('-')
282 service = service_method[0]
283 method = service_method[1]
284 stub = None
285 # if service == 'VolthaGlobalService':
286 # stub = VolthaGlobalServiceStub
287 # elif service == 'VolthaLocalService':
288 # stub = VolthaLocalServiceStub
289 # else:
290 # raise # Exception
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800291
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500292 res, metadata = yield self.invoke(stub, method, req, depth)
293
294 returnValue(MessageToDict(res, True, True))
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800295 except Exception, e:
296 log.error('failure', exception=repr(e))
297
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500298 @inlineCallbacks
299 def invoke(self, stub, method_name, request, metadata, retry=1):
300 """
301 Invoke a gRPC call to the remote server and return the response.
302 :param stub: Reference to the *_pb2 service stub
303 :param method_name: The method name inside the service stub
304 :param request: The request protobuf message
305 :param metadata: [(str, str), (str, str), ...]
306 :return: The response protobuf message and returned trailing metadata
307 """
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800308
Khen Nursimulu7626ce12016-12-21 11:51:46 -0500309 if not self.connected:
310 raise ServiceUnavailable()
311
312 try:
313 method = getattr(stub(self.channel), method_name)
314 response, rendezvous = method.with_call(request, metadata=metadata)
315 returnValue((response, rendezvous.trailing_metadata()))
316
317 except grpc._channel._Rendezvous, e:
318 code = e.code()
319 if code == grpc.StatusCode.UNAVAILABLE:
320 e = ServiceUnavailable()
321
322 if self.connected:
323 self.connected = False
324 yield self.connect()
325 if retry > 0:
326 response = yield self.invoke(stub, method_name,
327 request, metadata,
328 retry=retry - 1)
329 returnValue(response)
330
331 elif code in (
332 grpc.StatusCode.NOT_FOUND,
333 grpc.StatusCode.INVALID_ARGUMENT,
334 grpc.StatusCode.ALREADY_EXISTS):
335
336 pass # don't log error, these occur naturally
337
338 else:
339 log.exception(e)
340
341 raise e