blob: 387a2fe40eaf9949aa02cf8bc4162a6ca62e579f [file] [log] [blame]
import base64
import time
from protos import xos_pb2, xos_pb2_grpc
from google.protobuf.empty_pb2 import Empty
from apistats import track_request_time, REQUEST_COUNT
from django.contrib.auth import authenticate as django_authenticate
from xos.exceptions import *
from apihelper import XOSAPIHelperMixin
from decorators import translate_exceptions
import grpc
class XosService(xos_pb2_grpc.xosServicer, XOSAPIHelperMixin):
def __init__(self, thread_pool):
self.thread_pool = thread_pool
XOSAPIHelperMixin.__init__(self)
def stop(self):
pass
{% for object in proto.messages | sort(attribute='name') %}
{%- if object.name!='XOSBase' %}
@translate_exceptions("{{ object.name }}", "List{{ object.name }}")
@track_request_time("{{ object.name }}", "List{{ object.name }}")
def List{{ object.name }}(self, request, context):
user=self.authenticate(context)
model=self.get_model("{{ object.name }}")
res = self.list(model, user)
REQUEST_COUNT.labels('xos-core', "{{ object.name }}", "List{{ object.name }}", grpc.StatusCode.OK).inc()
return res
@translate_exceptions("{{ object.name }}", "Filter{{ object.name }}")
@track_request_time("{{ object.name }}", "Filter{{ object.name }}")
def Filter{{ object.name }}(self, request, context):
user=self.authenticate(context)
model=self.get_model("{{ object.name }}")
res = self.filter(model, user, request)
REQUEST_COUNT.labels('xos-core', "{{ object.name }}", "List{{ object.name }}", grpc.StatusCode.OK).inc()
return res
@translate_exceptions("{{ object.name }}", "Get{{ object.name }}")
@track_request_time("{{ object.name }}", "Get{{ object.name }}")
def Get{{ object.name }}(self, request, context):
user=self.authenticate(context)
model=self.get_model("{{ object.name }}")
res = self.get(model, user, request.id)
REQUEST_COUNT.labels('xos-core', "{{ object.name }}", "List{{ object.name }}", grpc.StatusCode.OK).inc()
return res
@translate_exceptions("{{ object.name }}", "Create{{ object.name }}")
@track_request_time("{{ object.name }}", "Create{{ object.name }}")
def Create{{ object.name }}(self, request, context):
user=self.authenticate(context)
model=self.get_model("{{ object.name }}")
res = self.create(model, user, request)
REQUEST_COUNT.labels('xos-core', "{{ object.name }}", "List{{ object.name }}", grpc.StatusCode.OK).inc()
return res
@translate_exceptions("{{ object.name }}", "Delete{{ object.name }}")
@track_request_time("{{ object.name }}", "Delete{{ object.name }}")
def Delete{{ object.name }}(self, request, context):
user=self.authenticate(context)
model=self.get_model("{{ object.name }}")
res = self.delete(model, user, request.id)
REQUEST_COUNT.labels('xos-core', "{{ object.name }}", "List{{ object.name }}", grpc.StatusCode.OK).inc()
return res
@translate_exceptions("{{ object.name }}", "Update{{ object.name }}")
@track_request_time("{{ object.name }}", "Update{{ object.name }}")
def Update{{ object.name }}(self, request, context):
user=self.authenticate(context)
model=self.get_model("{{ object.name }}")
res = self.update(model, user, request.id, request, context)
REQUEST_COUNT.labels('xos-core', "{{ object.name }}", "List{{ object.name }}", grpc.StatusCode.OK).inc()
return res
{%- endif %}
{% endfor %}