blob: ae190671813a2ba80d36f979b03288a01f8ffcc4 [file] [log] [blame]
Khen Nursimuluaaac7ee2016-12-11 22:03:52 -05001#
2# Copyright 2016 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18gRPC client meant to connect to a gRPC server endpoint, and query the
19end-point's schema by calling SchemaService.Schema(Empty) and all of its
20semantics are derived from the recovered schema.
21"""
22
23import os
24import sys
25from random import randint
26from zlib import decompress
27
28import grpc
29from consul import Consul
30from grpc._channel import _Rendezvous
31from structlog import get_logger
32from twisted.internet import reactor
33from twisted.internet.defer import inlineCallbacks, returnValue
34from werkzeug.exceptions import ServiceUnavailable
35
36from common.utils.asleep import asleep
37from netconf.protos import third_party
38from netconf.protos.schema_pb2 import SchemaServiceStub
39from google.protobuf.empty_pb2 import Empty
40from common.utils.consulhelpers import get_endpoint_from_consul
41from netconf.protos.voltha_pb2 import VolthaLocalServiceStub
42from twisted.internet import threads
43from google.protobuf import empty_pb2
44from google.protobuf.json_format import MessageToDict, ParseDict
45from simplejson import dumps, load
46
47log = get_logger()
48
49
50class GrpcClient(object):
51 """
52 Connect to a gRPC server, fetch its schema, and process the downloaded
53 proto schema files. The goal is to convert the proto schemas into yang
54 schemas which would be exposed to the Netconf client.
55 """
56 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
57
58 def __init__(self, consul_endpoint, work_dir,
59 grpc_endpoint='localhost:50055',
60 reconnect_callback=None,
61 on_start_callback=None):
62 self.consul_endpoint = consul_endpoint
63 self.grpc_endpoint = grpc_endpoint
64 self.work_dir = work_dir
65 self.reconnect_callback = reconnect_callback
66 self.on_start_callback = on_start_callback
67
68 self.plugin_dir = os.path.abspath(os.path.join(
69 os.path.dirname(__file__), '../protoc_plugins'))
70
71 self.channel = None
72 self.local_stub = None
73 self.schema = None
74 self.retries = 0
75 self.shutting_down = False
76 self.connected = False
77
78 def start(self):
79 log.debug('starting')
80 if not self.connected:
81 reactor.callLater(0, self.connect)
82 log.info('started')
83 return self
84
85 def stop(self):
86 log.debug('stopping')
87 if self.shutting_down:
88 return
89 self.shutting_down = True
90 log.info('stopped')
91
92
93 def set_on_start_callback(self, on_start_callback):
94 self.on_start_callback = on_start_callback
95 return self
96
97
98 def set_reconnect_callback(self, reconnect_callback):
99 self.reconnect_callback = reconnect_callback
100 return self
101
102
103 def resolve_endpoint(self, endpoint):
104 ip_port_endpoint = endpoint
105 if endpoint.startswith('@'):
106 try:
107 ip_port_endpoint = get_endpoint_from_consul(
108 self.consul_endpoint, endpoint[1:])
109 log.info('endpoint-found',
110 endpoint=endpoint, ip_port=ip_port_endpoint)
111 except Exception as e:
112 log.error('service-not-found-in-consul', endpoint=endpoint,
113 exception=repr(e))
114 return None, None
115 if ip_port_endpoint:
116 host, port = ip_port_endpoint.split(':', 2)
117 return host, int(port)
118
119
120 @inlineCallbacks
121 def connect(self):
122 """
123 (Re-)Connect to end-point
124 """
125 if self.shutting_down or self.connected:
126 return
127
128 try:
129 host, port = self.resolve_endpoint(self.grpc_endpoint)
130
131 # If host and port is not set then we will retry
132 if host and port:
133 log.info('grpc-endpoint-connecting', host=host, port=port)
134 self.channel = grpc.insecure_channel('{}:{}'.format(host, port))
135
136 yang_from = self._retrieve_schema()
137 log.info('proto-to-yang-schema', file=yang_from)
138 self._compile_proto_files(yang_from)
139 self._clear_backoff()
140
141 if self.on_start_callback is not None:
142 reactor.callLater(0, self.on_start_callback)
143
144 self.connected = True
145 if self.reconnect_callback is not None:
146 reactor.callLater(0, self.reconnect_callback)
147
148 self.local_stub = VolthaLocalServiceStub(self.channel)
149
150 return
151
152 except _Rendezvous, e:
153 if e.code() == grpc.StatusCode.UNAVAILABLE:
154 log.info('grpc-endpoint-not-available')
155 else:
156 log.exception(e)
157 yield self._backoff('not-available')
158
159 except Exception, e:
160 if not self.shutting_down:
161 log.exception('cannot-connect', endpoint=_endpoint)
162 yield self._backoff('unknown-error')
163
164 reactor.callLater(0, self.connect)
165
166 def _backoff(self, msg):
167 wait_time = self.RETRY_BACKOFF[min(self.retries,
168 len(self.RETRY_BACKOFF) - 1)]
169 self.retries += 1
170 log.error(msg, retry_in=wait_time)
171 return asleep(wait_time)
172
173 def _clear_backoff(self):
174 if self.retries:
175 log.info('reconnected', after_retries=self.retries)
176 self.retries = 0
177
178 def _retrieve_schema(self):
179 """
180 Retrieve schema from gRPC end-point, and save all *.proto files in
181 the work directory.
182 """
183 assert isinstance(self.channel, grpc.Channel)
184 stub = SchemaServiceStub(self.channel)
185 # try:
186 schemas = stub.GetSchema(Empty())
187 # except _Rendezvous, e:
188 # if e.code == grpc.StatusCode.UNAVAILABLE:
189 #
190 # else:
191 # raise e
192
193 os.system('mkdir -p %s' % self.work_dir)
194 os.system('rm -fr /tmp/%s/*' %
195 self.work_dir.replace('/tmp/', '')) # safer
196
197 for proto_file in schemas.protos:
198 proto_fname = proto_file.file_name
199 # TODO: Do we need to process a set of files using a prefix
200 # instead of just one?
201 proto_content = proto_file.proto
202 log.info('saving-proto', fname=proto_fname, dir=self.work_dir,
203 length=len(proto_content))
204 with open(os.path.join(self.work_dir, proto_fname), 'w') as f:
205 f.write(proto_content)
206
207 desc_content = decompress(proto_file.descriptor)
208 desc_fname = proto_fname.replace('.proto', '.desc')
209 log.info('saving-descriptor', fname=desc_fname, dir=self.work_dir,
210 length=len(desc_content))
211 with open(os.path.join(self.work_dir, desc_fname), 'wb') as f:
212 f.write(desc_content)
213 return schemas.yang_from
214
215 def _compile_proto_files(self, yang_from):
216 """
217 For each *.proto file in the work directory, compile the proto
218 file into the respective *_pb2.py file as well as generate the
219 corresponding yang schema.
220 :return: None
221 """
222 log.info('start')
223 google_api_dir = os.path.abspath(os.path.join(
224 os.path.dirname(__file__), '../protos/third_party'
225 ))
226
227 log.info('google-api', api_dir=google_api_dir)
228
229 netconf_base_dir = os.path.abspath(os.path.join(
230 os.path.dirname(__file__), '../..'
231 ))
232 log.info('netconf-dir', dir=netconf_base_dir)
233
234
235 for fname in [f for f in os.listdir(self.work_dir)
236 if f.endswith('.proto')]:
237 log.info('filename', file=fname)
238
239 need_yang = fname == yang_from
240 log.debug('compiling',
241 file=fname,
242 yang_schema_required=need_yang)
243 cmd = (
244 'cd %s && '
245 'env PATH=%s PYTHONPATH=%s '
246 'python -m grpc.tools.protoc '
247 '-I. '
248 '-I%s '
249 '--python_out=. '
250 '--grpc_python_out=. '
251 '--plugin=protoc-gen-custom=%s/proto2yang.py '
252 '%s'
253 '%s' % (
254 self.work_dir,
255 ':'.join([os.environ['PATH'], self.plugin_dir]),
256 ':'.join([google_api_dir, netconf_base_dir]),
257 google_api_dir,
258 self.plugin_dir,
259 '--custom_out=. ' if need_yang else '',
260 fname)
261 )
262 log.debug('executing', cmd=cmd, file=fname)
263 os.system(cmd)
264 log.info('compiled', file=fname)
265
266 # # test-load each _pb2 file to see all is right
267 # if self.work_dir not in sys.path:
268 # sys.path.insert(0, self.work_dir)
269 #
270 # for fname in [f for f in os.listdir(self.work_dir)
271 # if f.endswith('_pb2.py')]:
272 # modname = fname[:-len('.py')]
273 # log.debug('test-import', modname=modname)
274 # _ = __import__(modname)
275
276 #TODO: find a different way to test the generated yang files
277
278 @inlineCallbacks
279 def get_voltha_instance(self):
280 try:
281 res = yield threads.deferToThread(
282 self.local_stub.GetVolthaInstance, empty_pb2.Empty())
283
284 out_data = MessageToDict(res, True, True)
285 returnValue(out_data)
286 except Exception, e:
287 log.error('failure', exception=repr(e))
288