Nomralize component start()/stop()
Also fixed the /schema swagger/rest entry. It did not work
because the 3rdparty protobuf_to_dict library cannot handle
Map fields. Changed the two map fields to a single list
entry.
Change-Id: Ib25a528701b67d58d32451687724c8247da6efa5
diff --git a/grpc_client/grpc_client.py b/grpc_client/grpc_client.py
index 9129b2e..75c78eb 100644
--- a/grpc_client/grpc_client.py
+++ b/grpc_client/grpc_client.py
@@ -47,31 +47,39 @@
"""
RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
- def __init__(self, consul_endpoint, work_dir, endpoint='localhost:50055'):
+ def __init__(self, consul_endpoint, work_dir, endpoint='localhost:50055',
+ reconnect_callback=None):
self.consul_endpoint = consul_endpoint
self.endpoint = endpoint
self.work_dir = work_dir
+ self.reconnect_callback = reconnect_callback
+
self.plugin_dir = os.path.abspath(os.path.join(
os.path.dirname(__file__), '../protoc_plugins'))
self.channel = None
self.schema = None
self.retries = 0
- self.on_reconnect = None
self.shutting_down = False
self.connected = False
- def run(self, on_reconnect=None):
- self.on_reconnect = on_reconnect
+ def start(self):
+ log.debug('starting')
if not self.connected:
reactor.callLater(0, self.connect)
+ log.info('started')
return self
- def shutdown(self):
+ def stop(self):
+ log.debug('stopping')
if self.shutting_down:
return
self.shutting_down = True
- pass
+ log.info('stopped')
+
+ def set_reconnect_callback(self, reconnect_callback):
+ self.reconnect_callback = reconnect_callback
+ return self
@inlineCallbacks
def connect(self):
@@ -96,8 +104,8 @@
self._clear_backoff()
self.connected = True
- if self.on_reconnect is not None:
- reactor.callLater(0, self.on_reconnect)
+ if self.reconnect_callback is not None:
+ reactor.callLater(0, self.reconnect_callback)
return
@@ -157,7 +165,7 @@
assert isinstance(self.channel, grpc.Channel)
stub = SchemaServiceStub(self.channel)
# try:
- schema = stub.GetSchema(NullMessage())
+ schemas = stub.GetSchema(NullMessage())
# except _Rendezvous, e:
# if e.code == grpc.StatusCode.UNAVAILABLE:
#
@@ -168,19 +176,20 @@
os.system('rm -fr /tmp/%s/*' %
self.work_dir.replace('/tmp/', '')) # safer
- for fname in schema.protos:
- content = schema.protos[fname]
- log.debug('saving-proto',
- fname=fname, dir=self.work_dir, length=len(content))
- with open(os.path.join(self.work_dir, fname), 'w') as f:
- f.write(content)
+ for proto_file in schemas.protos:
+ proto_fname = proto_file.file_name
+ proto_content = proto_file.proto
+ log.debug('saving-proto', fname=proto_fname, dir=self.work_dir,
+ length=len(proto_content))
+ with open(os.path.join(self.work_dir, proto_fname), 'w') as f:
+ f.write(proto_content)
- for fname in schema.descriptors:
- content = decompress(schema.descriptors[fname])
- log.debug('saving-descriptor',
- fname=fname, dir=self.work_dir, length=len(content))
- with open(os.path.join(self.work_dir, fname), 'wb') as f:
- f.write(content)
+ desc_content = decompress(proto_file.descriptor)
+ desc_fname = proto_fname.replace('.proto', '.desc')
+ log.debug('saving-descriptor', fname=desc_fname, dir=self.work_dir,
+ length=len(desc_content))
+ with open(os.path.join(self.work_dir, desc_fname), 'wb') as f:
+ f.write(desc_content)
def _compile_proto_files(self):
"""