CORD-762 factor out helper code from xos api
Change-Id: I6da416d2250d123389c190311d8e40dad677d63d
diff --git a/xos/grpc/apihelper.py b/xos/grpc/apihelper.py
new file mode 100644
index 0000000..bf128c2
--- /dev/null
+++ b/xos/grpc/apihelper.py
@@ -0,0 +1,154 @@
+import base64
+import time
+from protos import xos_pb2
+from google.protobuf.empty_pb2 import Empty
+
+from django.contrib.auth import authenticate as django_authenticate
+from core.models import *
+from xos.exceptions import *
+
+class XOSAPIHelperMixin(object):
+ def getProtoClass(self, djangoClass):
+ pClass = getattr(xos_pb2, djangoClass.__name__)
+ return pClass
+
+ def getPluralProtoClass(self, djangoClass):
+ pClass = getattr(xos_pb2, djangoClass.__name__ + "s")
+ return pClass
+
+ def convertFloat(self, x):
+ if not x:
+ return 0
+ else:
+ return float(x)
+
+ def convertDateTime(self, x):
+ if not x:
+ return 0
+ else:
+ return time.mktime(x.timetuple())
+
+ def convertForeignKey(self, x):
+ if not x:
+ return 0
+ else:
+ return int(x.id)
+
+ def objToProto(self, obj):
+ p_obj = self.getProtoClass(obj.__class__)()
+ for field in obj._meta.fields:
+ if getattr(obj, field.name) == None:
+ continue
+
+ ftype = field.get_internal_type()
+ if (ftype == "CharField") or (ftype == "TextField") or (ftype == "SlugField"):
+ setattr(p_obj, field.name, str(getattr(obj, field.name)))
+ elif (ftype == "BooleanField"):
+ setattr(p_obj, field.name, getattr(obj, field.name))
+ elif (ftype == "AutoField"):
+ setattr(p_obj, field.name, int(getattr(obj, field.name)))
+ elif (ftype == "IntegerField") or (ftype == "PositiveIntegerField"):
+ setattr(p_obj, field.name, int(getattr(obj, field.name)))
+ elif (ftype == "ForeignKey"):
+ setattr(p_obj, field.name+"_id", self.convertForeignKey(getattr(obj, field.name)))
+ elif (ftype == "DateTimeField"):
+ setattr(p_obj, field.name, self.convertDateTime(getattr(obj, field.name)))
+ elif (ftype == "FloatField"):
+ setattr(p_obj, field.name, float(getattr(obj, field.name)))
+ elif (ftype == "GenericIPAddressField"):
+ setattr(p_obj, field.name, str(getattr(obj, field.name)))
+ return p_obj
+
+ def protoToArgs(self, djangoClass, message):
+ args={}
+ fmap={}
+ fset={}
+ for field in djangoClass._meta.fields:
+ fmap[field.name] = field
+ if field.get_internal_type() == "ForeignKey":
+ # foreign key can be represented as an id
+ fmap[field.name + "_id"] = field
+
+ for (fieldDesc, val) in message.ListFields():
+ name = fieldDesc.name
+ if name in fmap:
+ if (name=="id"):
+ # don't let anyone set the id
+ continue
+ ftype = fmap[name].get_internal_type()
+ if (ftype == "CharField") or (ftype == "TextField") or (ftype == "SlugField"):
+ args[name] = val
+ elif (ftype == "BooleanField"):
+ args[name] = val
+ elif (ftype == "AutoField"):
+ args[name] = val
+ elif (ftype == "IntegerField") or (ftype == "PositiveIntegerField"):
+ args[name] = val
+ elif (ftype == "ForeignKey"):
+ args[name] = val # field name already has "_id" at the end
+ elif (ftype == "DateTimeField"):
+ pass # do something special here
+ elif (ftype == "FloatField"):
+ args[name] = val
+ elif (ftype == "GenericIPAddressField"):
+ args[name] = val
+ fset[name] = True
+
+ return args
+
+ def querysetToProto(self, djangoClass, queryset):
+ objs = queryset
+ p_objs = self.getPluralProtoClass(djangoClass)()
+
+ for obj in objs:
+ new_obj = p_objs.items.add()
+ new_obj.CopyFrom(self.objToProto(obj))
+
+ return p_objs
+
+ def get(self, djangoClass, id):
+ obj = djangoClass.objects.get(id=id)
+ return self.objToProto(obj)
+
+ def create(self, djangoClass, user, request):
+ args = self.protoToArgs(djangoClass, request)
+ new_obj = djangoClass(**args)
+ new_obj.caller = user
+ if (not user) or (not new_obj.can_update(user)):
+ raise XOSPermissionDenied()
+ new_obj.save()
+ return self.objToProto(new_obj)
+
+ def update(self, djangoClass, user, id, message):
+ obj = djangoClass.objects.get(id=id)
+ obj.caller = user
+ if (not user) or (not obj.can_update(user)):
+ raise XOSPermissionDenied()
+ args = self.protoToArgs(djangoClass, message)
+ for (k,v) in args.iteritems():
+ setattr(obj, k, v)
+ obj.save()
+ return self.objToProto(obj)
+
+ def delete(self, djangoClass, user, id):
+ obj = djangoClass.objects.get(id=id)
+ if (not user) or (not obj.can_update(user)):
+ raise XOSPermissionDenied()
+ obj.delete()
+ return Empty()
+
+ def authenticate(self, context):
+ for (k, v) in context.invocation_metadata():
+ if (k.lower()=="authorization"):
+ (method, auth) = v.split(" ",1)
+ if (method.lower() == "basic"):
+ auth = base64.b64decode(auth)
+ (username, password) = auth.split(":")
+ user = django_authenticate(username=username, password=password)
+ if not user:
+ raise Exception("failed to authenticate %s:%s" % (username, password))
+ print "authenticated %s:%s as %s" % (username, password, user)
+ return user
+
+ return None
+
diff --git a/xos/tools/apigen/grpc_api.template.py b/xos/tools/apigen/grpc_api.template.py
index ddd2b77..9e23db5 100644
--- a/xos/tools/apigen/grpc_api.template.py
+++ b/xos/tools/apigen/grpc_api.template.py
@@ -6,159 +6,15 @@
from django.contrib.auth import authenticate as django_authenticate
from core.models import *
from xos.exceptions import *
+from apihelper import XOSAPIHelperMixin
-class XosService(xos_pb2.xosServicer):
+class XosService(xos_pb2.xosServicer, XOSAPIHelperMixin):
def __init__(self, thread_pool):
self.thread_pool = thread_pool
def stop(self):
pass
- def getProtoClass(self, djangoClass):
- pClass = getattr(xos_pb2, djangoClass.__name__)
- return pClass
-
- def getPluralProtoClass(self, djangoClass):
- pClass = getattr(xos_pb2, djangoClass.__name__ + "s")
- return pClass
-
- def convertFloat(self, x):
- if not x:
- return 0
- else:
- return float(x)
-
- def convertDateTime(self, x):
- if not x:
- return 0
- else:
- return time.mktime(x.timetuple())
-
- def convertForeignKey(self, x):
- if not x:
- return 0
- else:
- return int(x.id)
-
- def objToProto(self, obj):
- p_obj = self.getProtoClass(obj.__class__)()
- for field in obj._meta.fields:
- if getattr(obj, field.name) == None:
- continue
-
- ftype = field.get_internal_type()
- if (ftype == "CharField") or (ftype == "TextField") or (ftype == "SlugField"):
- setattr(p_obj, field.name, str(getattr(obj, field.name)))
- elif (ftype == "BooleanField"):
- setattr(p_obj, field.name, getattr(obj, field.name))
- elif (ftype == "AutoField"):
- setattr(p_obj, field.name, int(getattr(obj, field.name)))
- elif (ftype == "IntegerField") or (ftype == "PositiveIntegerField"):
- setattr(p_obj, field.name, int(getattr(obj, field.name)))
- elif (ftype == "ForeignKey"):
- setattr(p_obj, field.name+"_id", self.convertForeignKey(getattr(obj, field.name)))
- elif (ftype == "DateTimeField"):
- setattr(p_obj, field.name, self.convertDateTime(getattr(obj, field.name)))
- elif (ftype == "FloatField"):
- setattr(p_obj, field.name, float(getattr(obj, field.name)))
- elif (ftype == "GenericIPAddressField"):
- setattr(p_obj, field.name, str(getattr(obj, field.name)))
- return p_obj
-
- def protoToArgs(self, djangoClass, message):
- args={}
- fmap={}
- fset={}
- for field in djangoClass._meta.fields:
- fmap[field.name] = field
- if field.get_internal_type() == "ForeignKey":
- # foreign key can be represented as an id
- fmap[field.name + "_id"] = field
-
- for (fieldDesc, val) in message.ListFields():
- name = fieldDesc.name
- if name in fmap:
- if (name=="id"):
- # don't let anyone set the id
- continue
- ftype = fmap[name].get_internal_type()
- if (ftype == "CharField") or (ftype == "TextField") or (ftype == "SlugField"):
- args[name] = val
- elif (ftype == "BooleanField"):
- args[name] = val
- elif (ftype == "AutoField"):
- args[name] = val
- elif (ftype == "IntegerField") or (ftype == "PositiveIntegerField"):
- args[name] = val
- elif (ftype == "ForeignKey"):
- args[name] = val # field name already has "_id" at the end
- elif (ftype == "DateTimeField"):
- pass # do something special here
- elif (ftype == "FloatField"):
- args[name] = val
- elif (ftype == "GenericIPAddressField"):
- args[name] = val
- fset[name] = True
-
- return args
-
- def querysetToProto(self, djangoClass, queryset):
- objs = queryset
- p_objs = self.getPluralProtoClass(djangoClass)()
-
- for obj in objs:
- new_obj = p_objs.items.add()
- new_obj.CopyFrom(self.objToProto(obj))
-
- return p_objs
-
- def get(self, djangoClass, id):
- obj = djangoClass.objects.get(id=id)
- return self.objToProto(obj)
-
- def create(self, djangoClass, user, request):
- args = self.protoToArgs(djangoClass, request)
- new_obj = djangoClass(**args)
- new_obj.caller = user
- if (not user) or (not new_obj.can_update(user)):
- raise XOSPermissionDenied()
- new_obj.save()
- return self.objToProto(new_obj)
-
- def update(self, djangoClass, user, id, message):
- obj = djangoClass.objects.get(id=id)
- obj.caller = user
- if (not user) or (not obj.can_update(user)):
- raise XOSPermissionDenied()
- args = self.protoToArgs(djangoClass, message)
- for (k,v) in args.iteritems():
- setattr(obj, k, v)
- obj.save()
- return self.objToProto(obj)
-
- def delete(self, djangoClass, user, id):
- obj = djangoClass.objects.get(id=id)
- if (not user) or (not obj.can_update(user)):
- raise XOSPermissionDenied()
- obj.delete()
- return Empty()
-
- def authenticate(self, context):
- for (k, v) in context.invocation_metadata():
- if (k.lower()=="authorization"):
- (method, auth) = v.split(" ",1)
- if (method.lower() == "basic"):
- auth = base64.b64decode(auth)
- (username, password) = auth.split(":")
- user = django_authenticate(username=username, password=password)
- if not user:
- raise Exception("failed to authenticate %s:%s" % (username, password))
- print "authenticated %s:%s as %s" % (username, password, user)
- return user
-
- return None
-
-
{% for object in generator.all() %}
def List{{ object.camel() }}(self, request, context):
user=self.authenticate(context)