| import base64 |
| import time |
| from protos import xos_pb2, xos_pb2_grpc |
| from google.protobuf.empty_pb2 import Empty |
| |
| from django.contrib.auth import authenticate as django_authenticate |
| from xos.exceptions import * |
| from apihelper import XOSAPIHelperMixin, translate_exceptions |
| |
| class XosService(xos_pb2_grpc.xosServicer, XOSAPIHelperMixin): |
| def __init__(self, thread_pool): |
| self.thread_pool = thread_pool |
| XOSAPIHelperMixin.__init__(self) |
| |
| def stop(self): |
| pass |
| |
| {% for object in proto.messages | sort(attribute='name') %} |
| {%- if object.name!='XOSBase' %} |
| @translate_exceptions |
| def List{{ object.name }}(self, request, context): |
| user=self.authenticate(context) |
| model=self.get_model("{{ object.name }}") |
| return self.list(model, user) |
| |
| @translate_exceptions |
| def Filter{{ object.name }}(self, request, context): |
| user=self.authenticate(context) |
| model=self.get_model("{{ object.name }}") |
| return self.filter(model, user, request) |
| |
| @translate_exceptions |
| def Get{{ object.name }}(self, request, context): |
| user=self.authenticate(context) |
| model=self.get_model("{{ object.name }}") |
| return self.get(model, user, request.id) |
| |
| @translate_exceptions |
| def Create{{ object.name }}(self, request, context): |
| user=self.authenticate(context) |
| model=self.get_model("{{ object.name }}") |
| return self.create(model, user, request) |
| |
| @translate_exceptions |
| def Delete{{ object.name }}(self, request, context): |
| user=self.authenticate(context) |
| model=self.get_model("{{ object.name }}") |
| return self.delete(model, user, request.id) |
| |
| @translate_exceptions |
| def Update{{ object.name }}(self, request, context): |
| user=self.authenticate(context) |
| model=self.get_model("{{ object.name }}") |
| return self.update(model, user, request.id, request, context) |
| {%- endif %} |
| {% endfor %} |
| |
| |