This commit consists of:
1) Yang annotations to the protobuf definitions. These annotations, when
added to the relevant proto files in Voltha, allow us to convert
the voltha proto schemas into Yang schemas without the need to change the
model definitions.
2) Update to the Yang parser to handle the above annotations
3) Some initial work on the netconf GET RPCs (work in progress)
4) Cleanup
Change-Id: I5e4f4217850f0beb1c41aca1b2530a41e4f8a809
diff --git a/netconf/grpc_client/grpc_client.py b/netconf/grpc_client/grpc_client.py
index 1c92f71..d65410e 100644
--- a/netconf/grpc_client/grpc_client.py
+++ b/netconf/grpc_client/grpc_client.py
@@ -22,11 +22,9 @@
import os
import sys
-from random import randint
from zlib import decompress
import grpc
-from consul import Consul
from grpc._channel import _Rendezvous
from structlog import get_logger
from twisted.internet import reactor
@@ -38,12 +36,10 @@
from netconf.protos.schema_pb2 import SchemaServiceStub
from google.protobuf.empty_pb2 import Empty
from common.utils.consulhelpers import get_endpoint_from_consul
-from netconf.protos.voltha_pb2 import VolthaLocalServiceStub, \
+from netconf.protos.voltha_pb2 import VolthaLocalServiceStub, \
VolthaGlobalServiceStub
-from twisted.internet import threads
from google.protobuf import empty_pb2
from google.protobuf.json_format import MessageToDict, ParseDict
-from simplejson import dumps, load
log = get_logger()
@@ -90,17 +86,14 @@
self.shutting_down = True
log.info('stopped')
-
def set_on_start_callback(self, on_start_callback):
self.on_start_callback = on_start_callback
return self
-
def set_reconnect_callback(self, reconnect_callback):
self.reconnect_callback = reconnect_callback
return self
-
def resolve_endpoint(self, endpoint):
ip_port_endpoint = endpoint
if endpoint.startswith('@'):
@@ -117,7 +110,6 @@
host, port = ip_port_endpoint.split(':', 2)
return host, int(port)
-
@inlineCallbacks
def connect(self):
"""
@@ -132,11 +124,12 @@
# If host and port is not set then we will retry
if host and port:
log.info('grpc-endpoint-connecting', host=host, port=port)
- self.channel = grpc.insecure_channel('{}:{}'.format(host, port))
+ self.channel = grpc.insecure_channel(
+ '{}:{}'.format(host, port))
- # yang_from = self._retrieve_schema()
- # log.info('proto-to-yang-schema', file=yang_from)
- # self._compile_proto_files(yang_from)
+ yang_from = self._retrieve_schema()
+ log.info('proto-to-yang-schema', file=yang_from)
+ self._compile_proto_files(yang_from)
self._clear_backoff()
if self.on_start_callback is not None:
@@ -146,8 +139,8 @@
if self.reconnect_callback is not None:
reactor.callLater(0, self.reconnect_callback)
- self.local_stub = VolthaLocalServiceStub(self.channel)
- self.global_stub = VolthaGlobalServiceStub(self.channel)
+ # self.local_stub = voltha_pb2.VolthaLocalServiceStub(self.channel)
+ # self.global_stub = voltha_pb2.VolthaGlobalServiceStub(self.channel)
return
@@ -163,7 +156,7 @@
log.exception('cannot-connect', endpoint=_endpoint)
yield self._backoff('unknown-error')
- reactor.callLater(0, self.connect)
+ reactor.callLater(1, self.connect)
def _backoff(self, msg):
wait_time = self.RETRY_BACKOFF[min(self.retries,
@@ -202,14 +195,14 @@
# instead of just one?
proto_content = proto_file.proto
log.info('saving-proto', fname=proto_fname, dir=self.work_dir,
- length=len(proto_content))
+ length=len(proto_content))
with open(os.path.join(self.work_dir, proto_fname), 'w') as f:
f.write(proto_content)
desc_content = decompress(proto_file.descriptor)
desc_fname = proto_fname.replace('.proto', '.desc')
log.info('saving-descriptor', fname=desc_fname, dir=self.work_dir,
- length=len(desc_content))
+ length=len(desc_content))
with open(os.path.join(self.work_dir, desc_fname), 'wb') as f:
f.write(desc_content)
return schemas.yang_from
@@ -233,7 +226,6 @@
))
log.info('netconf-dir', dir=netconf_base_dir)
-
for fname in [f for f in os.listdir(self.work_dir)
if f.endswith('.proto')]:
log.info('filename', file=fname)
@@ -265,66 +257,85 @@
os.system(cmd)
log.info('compiled', file=fname)
- # # test-load each _pb2 file to see all is right
- # if self.work_dir not in sys.path:
- # sys.path.insert(0, self.work_dir)
- #
- # for fname in [f for f in os.listdir(self.work_dir)
- # if f.endswith('_pb2.py')]:
- # modname = fname[:-len('.py')]
- # log.debug('test-import', modname=modname)
- # _ = __import__(modname)
+ # # test-load each _pb2 file to see all is right
+ # if self.work_dir not in sys.path:
+ # sys.path.insert(0, self.work_dir)
+ #
+ # for fname in [f for f in os.listdir(self.work_dir)
+ # if f.endswith('_pb2.py')]:
+ # modname = fname[:-len('.py')]
+ # log.debug('test-import', modname=modname)
+ # _ = __import__(modname)
- #TODO: find a different way to test the generated yang files
+ # TODO: find a different way to test the generated yang files
- @inlineCallbacks
- def get_voltha_instance(self):
- try:
- res = yield threads.deferToThread(
- self.local_stub.GetVolthaInstance, empty_pb2.Empty())
-
- out_data = MessageToDict(res, True, True)
- returnValue(out_data)
- except Exception, e:
- log.error('failure', exception=repr(e))
-
-
- #TODO: should be generated code
+ # TODO: should be generated code
+ # Focus for now is issuing a GET request for VolthaGlobalService or VolthaLocalService
@inlineCallbacks
def invoke_voltha_api(self, key):
- # key = ''.join([service, '-', method])
+ # TODO: This should be part of a parameter request
+ depth = [('get-depth', '-1')]
try:
- if key == 'VolthaGlobalService-GetVoltha':
- res = yield threads.deferToThread(
- self.global_stub.GetVoltha, empty_pb2.Empty())
- elif key == 'VolthaLocalService-GetVolthaInstance':
- res = yield threads.deferToThread(
- self.local_stub.GetVolthaInstance, empty_pb2.Empty())
- elif key == 'VolthaLocalService-GetHealth':
- res = yield threads.deferToThread(
- self.local_stub.GetHealth, empty_pb2.Empty())
- elif key == 'VolthaLocalService-ListAdapters':
- res = yield threads.deferToThread(
- self.local_stub.ListAdapters, empty_pb2.Empty())
- elif key == 'VolthaLocalService-ListLogicalDevices':
- res = yield threads.deferToThread(
- self.local_stub.ListLogicalDevices, empty_pb2.Empty())
- elif key == 'VolthaLocalService-ListDevices':
- res = yield threads.deferToThread(
- self.local_stub.ListDevices, empty_pb2.Empty())
- elif key == 'VolthaLocalService-ListDeviceTypes':
- res = yield threads.deferToThread(
- self.local_stub.ListDeviceTypes, empty_pb2.Empty())
- elif key == 'VolthaLocalService-ListDeviceGroups':
- res = yield threads.deferToThread(
- self.local_stub.ListDeviceGroups, empty_pb2.Empty())
- else: # for now just return voltha instance data
- res = yield threads.deferToThread(
- self.local_stub.GetVolthaInstance, empty_pb2.Empty())
+ data = {}
+ req = ParseDict(data, empty_pb2.Empty())
+ service_method = key.split('-')
+ service = service_method[0]
+ method = service_method[1]
+ stub = None
+ # if service == 'VolthaGlobalService':
+ # stub = VolthaGlobalServiceStub
+ # elif service == 'VolthaLocalService':
+ # stub = VolthaLocalServiceStub
+ # else:
+ # raise # Exception
- out_data = MessageToDict(res, True, True)
- returnValue(out_data)
+ res, metadata = yield self.invoke(stub, method, req, depth)
+
+ returnValue(MessageToDict(res, True, True))
except Exception, e:
log.error('failure', exception=repr(e))
+ @inlineCallbacks
+ def invoke(self, stub, method_name, request, metadata, retry=1):
+ """
+ Invoke a gRPC call to the remote server and return the response.
+ :param stub: Reference to the *_pb2 service stub
+ :param method_name: The method name inside the service stub
+ :param request: The request protobuf message
+ :param metadata: [(str, str), (str, str), ...]
+ :return: The response protobuf message and returned trailing metadata
+ """
+ if not self.connected:
+ raise ServiceUnavailable()
+
+ try:
+ method = getattr(stub(self.channel), method_name)
+ response, rendezvous = method.with_call(request, metadata=metadata)
+ returnValue((response, rendezvous.trailing_metadata()))
+
+ except grpc._channel._Rendezvous, e:
+ code = e.code()
+ if code == grpc.StatusCode.UNAVAILABLE:
+ e = ServiceUnavailable()
+
+ if self.connected:
+ self.connected = False
+ yield self.connect()
+ if retry > 0:
+ response = yield self.invoke(stub, method_name,
+ request, metadata,
+ retry=retry - 1)
+ returnValue(response)
+
+ elif code in (
+ grpc.StatusCode.NOT_FOUND,
+ grpc.StatusCode.INVALID_ARGUMENT,
+ grpc.StatusCode.ALREADY_EXISTS):
+
+ pass # don't log error, these occur naturally
+
+ else:
+ log.exception(e)
+
+ raise e