blob: 1c92f71a64ca53ea02c5c7d5df927656352637fa [file] [log] [blame]
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -05001#
2# Copyright 2016 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18gRPC client meant to connect to a gRPC server endpoint, and query the
19end-point's schema by calling SchemaService.Schema(Empty) and all of its
20semantics are derived from the recovered schema.
21"""
22
23import os
24import sys
25from random import randint
26from zlib import decompress
27
28import grpc
29from consul import Consul
30from grpc._channel import _Rendezvous
31from structlog import get_logger
32from twisted.internet import reactor
33from twisted.internet.defer import inlineCallbacks, returnValue
34from werkzeug.exceptions import ServiceUnavailable
35
36from common.utils.asleep import asleep
37from netconf.protos import third_party
38from netconf.protos.schema_pb2 import SchemaServiceStub
39from google.protobuf.empty_pb2 import Empty
40from common.utils.consulhelpers import get_endpoint_from_consul
Khen Nursimulue0d53f82016-12-14 11:05:44 -080041from netconf.protos.voltha_pb2 import VolthaLocalServiceStub, \
42 VolthaGlobalServiceStub
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -050043from twisted.internet import threads
44from google.protobuf import empty_pb2
45from google.protobuf.json_format import MessageToDict, ParseDict
46from simplejson import dumps, load
47
48log = get_logger()
49
50
51class GrpcClient(object):
52 """
53 Connect to a gRPC server, fetch its schema, and process the downloaded
54 proto schema files. The goal is to convert the proto schemas into yang
55 schemas which would be exposed to the Netconf client.
56 """
57 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
58
59 def __init__(self, consul_endpoint, work_dir,
60 grpc_endpoint='localhost:50055',
61 reconnect_callback=None,
62 on_start_callback=None):
63 self.consul_endpoint = consul_endpoint
64 self.grpc_endpoint = grpc_endpoint
65 self.work_dir = work_dir
66 self.reconnect_callback = reconnect_callback
67 self.on_start_callback = on_start_callback
68
69 self.plugin_dir = os.path.abspath(os.path.join(
70 os.path.dirname(__file__), '../protoc_plugins'))
71
72 self.channel = None
73 self.local_stub = None
74 self.schema = None
75 self.retries = 0
76 self.shutting_down = False
77 self.connected = False
78
79 def start(self):
80 log.debug('starting')
81 if not self.connected:
82 reactor.callLater(0, self.connect)
83 log.info('started')
84 return self
85
86 def stop(self):
87 log.debug('stopping')
88 if self.shutting_down:
89 return
90 self.shutting_down = True
91 log.info('stopped')
92
93
94 def set_on_start_callback(self, on_start_callback):
95 self.on_start_callback = on_start_callback
96 return self
97
98
99 def set_reconnect_callback(self, reconnect_callback):
100 self.reconnect_callback = reconnect_callback
101 return self
102
103
104 def resolve_endpoint(self, endpoint):
105 ip_port_endpoint = endpoint
106 if endpoint.startswith('@'):
107 try:
108 ip_port_endpoint = get_endpoint_from_consul(
109 self.consul_endpoint, endpoint[1:])
110 log.info('endpoint-found',
111 endpoint=endpoint, ip_port=ip_port_endpoint)
112 except Exception as e:
113 log.error('service-not-found-in-consul', endpoint=endpoint,
114 exception=repr(e))
115 return None, None
116 if ip_port_endpoint:
117 host, port = ip_port_endpoint.split(':', 2)
118 return host, int(port)
119
120
121 @inlineCallbacks
122 def connect(self):
123 """
124 (Re-)Connect to end-point
125 """
126 if self.shutting_down or self.connected:
127 return
128
129 try:
130 host, port = self.resolve_endpoint(self.grpc_endpoint)
131
132 # If host and port is not set then we will retry
133 if host and port:
134 log.info('grpc-endpoint-connecting', host=host, port=port)
135 self.channel = grpc.insecure_channel('{}:{}'.format(host, port))
136
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800137 # yang_from = self._retrieve_schema()
138 # log.info('proto-to-yang-schema', file=yang_from)
139 # self._compile_proto_files(yang_from)
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500140 self._clear_backoff()
141
142 if self.on_start_callback is not None:
143 reactor.callLater(0, self.on_start_callback)
144
145 self.connected = True
146 if self.reconnect_callback is not None:
147 reactor.callLater(0, self.reconnect_callback)
148
149 self.local_stub = VolthaLocalServiceStub(self.channel)
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800150 self.global_stub = VolthaGlobalServiceStub(self.channel)
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -0500151
152 return
153
154 except _Rendezvous, e:
155 if e.code() == grpc.StatusCode.UNAVAILABLE:
156 log.info('grpc-endpoint-not-available')
157 else:
158 log.exception(e)
159 yield self._backoff('not-available')
160
161 except Exception, e:
162 if not self.shutting_down:
163 log.exception('cannot-connect', endpoint=_endpoint)
164 yield self._backoff('unknown-error')
165
166 reactor.callLater(0, self.connect)
167
168 def _backoff(self, msg):
169 wait_time = self.RETRY_BACKOFF[min(self.retries,
170 len(self.RETRY_BACKOFF) - 1)]
171 self.retries += 1
172 log.error(msg, retry_in=wait_time)
173 return asleep(wait_time)
174
175 def _clear_backoff(self):
176 if self.retries:
177 log.info('reconnected', after_retries=self.retries)
178 self.retries = 0
179
180 def _retrieve_schema(self):
181 """
182 Retrieve schema from gRPC end-point, and save all *.proto files in
183 the work directory.
184 """
185 assert isinstance(self.channel, grpc.Channel)
186 stub = SchemaServiceStub(self.channel)
187 # try:
188 schemas = stub.GetSchema(Empty())
189 # except _Rendezvous, e:
190 # if e.code == grpc.StatusCode.UNAVAILABLE:
191 #
192 # else:
193 # raise e
194
195 os.system('mkdir -p %s' % self.work_dir)
196 os.system('rm -fr /tmp/%s/*' %
197 self.work_dir.replace('/tmp/', '')) # safer
198
199 for proto_file in schemas.protos:
200 proto_fname = proto_file.file_name
201 # TODO: Do we need to process a set of files using a prefix
202 # instead of just one?
203 proto_content = proto_file.proto
204 log.info('saving-proto', fname=proto_fname, dir=self.work_dir,
205 length=len(proto_content))
206 with open(os.path.join(self.work_dir, proto_fname), 'w') as f:
207 f.write(proto_content)
208
209 desc_content = decompress(proto_file.descriptor)
210 desc_fname = proto_fname.replace('.proto', '.desc')
211 log.info('saving-descriptor', fname=desc_fname, dir=self.work_dir,
212 length=len(desc_content))
213 with open(os.path.join(self.work_dir, desc_fname), 'wb') as f:
214 f.write(desc_content)
215 return schemas.yang_from
216
217 def _compile_proto_files(self, yang_from):
218 """
219 For each *.proto file in the work directory, compile the proto
220 file into the respective *_pb2.py file as well as generate the
221 corresponding yang schema.
222 :return: None
223 """
224 log.info('start')
225 google_api_dir = os.path.abspath(os.path.join(
226 os.path.dirname(__file__), '../protos/third_party'
227 ))
228
229 log.info('google-api', api_dir=google_api_dir)
230
231 netconf_base_dir = os.path.abspath(os.path.join(
232 os.path.dirname(__file__), '../..'
233 ))
234 log.info('netconf-dir', dir=netconf_base_dir)
235
236
237 for fname in [f for f in os.listdir(self.work_dir)
238 if f.endswith('.proto')]:
239 log.info('filename', file=fname)
240
241 need_yang = fname == yang_from
242 log.debug('compiling',
243 file=fname,
244 yang_schema_required=need_yang)
245 cmd = (
246 'cd %s && '
247 'env PATH=%s PYTHONPATH=%s '
248 'python -m grpc.tools.protoc '
249 '-I. '
250 '-I%s '
251 '--python_out=. '
252 '--grpc_python_out=. '
253 '--plugin=protoc-gen-custom=%s/proto2yang.py '
254 '%s'
255 '%s' % (
256 self.work_dir,
257 ':'.join([os.environ['PATH'], self.plugin_dir]),
258 ':'.join([google_api_dir, netconf_base_dir]),
259 google_api_dir,
260 self.plugin_dir,
261 '--custom_out=. ' if need_yang else '',
262 fname)
263 )
264 log.debug('executing', cmd=cmd, file=fname)
265 os.system(cmd)
266 log.info('compiled', file=fname)
267
268 # # test-load each _pb2 file to see all is right
269 # if self.work_dir not in sys.path:
270 # sys.path.insert(0, self.work_dir)
271 #
272 # for fname in [f for f in os.listdir(self.work_dir)
273 # if f.endswith('_pb2.py')]:
274 # modname = fname[:-len('.py')]
275 # log.debug('test-import', modname=modname)
276 # _ = __import__(modname)
277
278 #TODO: find a different way to test the generated yang files
279
280 @inlineCallbacks
281 def get_voltha_instance(self):
282 try:
283 res = yield threads.deferToThread(
284 self.local_stub.GetVolthaInstance, empty_pb2.Empty())
285
286 out_data = MessageToDict(res, True, True)
287 returnValue(out_data)
288 except Exception, e:
289 log.error('failure', exception=repr(e))
290
Khen Nursimulue0d53f82016-12-14 11:05:44 -0800291
292 #TODO: should be generated code
293 @inlineCallbacks
294 def invoke_voltha_api(self, key):
295 # key = ''.join([service, '-', method])
296 try:
297 if key == 'VolthaGlobalService-GetVoltha':
298 res = yield threads.deferToThread(
299 self.global_stub.GetVoltha, empty_pb2.Empty())
300 elif key == 'VolthaLocalService-GetVolthaInstance':
301 res = yield threads.deferToThread(
302 self.local_stub.GetVolthaInstance, empty_pb2.Empty())
303 elif key == 'VolthaLocalService-GetHealth':
304 res = yield threads.deferToThread(
305 self.local_stub.GetHealth, empty_pb2.Empty())
306 elif key == 'VolthaLocalService-ListAdapters':
307 res = yield threads.deferToThread(
308 self.local_stub.ListAdapters, empty_pb2.Empty())
309 elif key == 'VolthaLocalService-ListLogicalDevices':
310 res = yield threads.deferToThread(
311 self.local_stub.ListLogicalDevices, empty_pb2.Empty())
312 elif key == 'VolthaLocalService-ListDevices':
313 res = yield threads.deferToThread(
314 self.local_stub.ListDevices, empty_pb2.Empty())
315 elif key == 'VolthaLocalService-ListDeviceTypes':
316 res = yield threads.deferToThread(
317 self.local_stub.ListDeviceTypes, empty_pb2.Empty())
318 elif key == 'VolthaLocalService-ListDeviceGroups':
319 res = yield threads.deferToThread(
320 self.local_stub.ListDeviceGroups, empty_pb2.Empty())
321 else: # for now just return voltha instance data
322 res = yield threads.deferToThread(
323 self.local_stub.GetVolthaInstance, empty_pb2.Empty())
324
325 out_data = MessageToDict(res, True, True)
326 returnValue(out_data)
327 except Exception, e:
328 log.error('failure', exception=repr(e))
329
330