CORD-880 add accessor for core api
Change-Id: Idd83ac235504b7266a6a72f9a50831f00f5ad22b
diff --git a/containers/xos/Dockerfile.synchronizer-base b/containers/xos/Dockerfile.synchronizer-base
new file mode 100644
index 0000000..38c1fa4
--- /dev/null
+++ b/containers/xos/Dockerfile.synchronizer-base
@@ -0,0 +1,10 @@
+FROM xosproject/xos-client
+
+ADD xos/synchronizers/new_base /opt/xos/synchronizers/new_base
+ADD xos/xos/config.py /opt/xos/xos/config.py
+ADD xos/xos/logger.py /opt/xos/xos/logger.py
+ADD xos/xos/xml_util.py /opt/xos/xos/xml_util.py
+ADD xos/xos/__init__.py /opt/xos/xos/__init__.py
+ADD xos/synchronizers/__init__.py /opt/xos/synchronizers/__init__.py
+
+ENTRYPOINT ["bash"]
diff --git a/containers/xos/Makefile b/containers/xos/Makefile
index b46c9ca..e096612 100644
--- a/containers/xos/Makefile
+++ b/containers/xos/Makefile
@@ -66,6 +66,10 @@
-f Dockerfile.client -t xosproject/xos-client ${BUILD_ARGS} ../..
rm -rf tmp.chameleon
+synchronizer-base:
+ sudo docker build --no-cache=${NO_DOCKER_CACHE} --rm \
+ -f Dockerfile.synchronizer-base -t xosproject/xos-synchronizer-base ${BUILD_ARGS} ../..
+
run:
sudo docker run -d --name ${CONTAINER_NAME} -p 80:8000 \
${IMAGE_NAME}
diff --git a/xos/core/models/plcorebase.py b/xos/core/models/plcorebase.py
index 66046ee..924f162 100644
--- a/xos/core/models/plcorebase.py
+++ b/xos/core/models/plcorebase.py
@@ -310,6 +310,14 @@
if "silent" in kwargs:
silent=silent or kwargs.pop("silent")
+ caller_kind = "unknown"
+
+ if ('synchronizer' in threading.current_thread().name):
+ caller_kind = "synchronizer"
+
+ if "caller_kind" in kwargs:
+ caller_kind = kwargs.pop("caller_kind")
+
always_update_timestamp = False
if "always_update_timestamp" in kwargs:
always_update_timestamp = always_update_timestamp or kwargs.pop("always_update_timestamp")
@@ -324,7 +332,7 @@
if not (field in ["backend_register", "backend_status", "deleted", "enacted", "updated"]):
ignore_composite_key_check=False
- if ('synchronizer' not in threading.current_thread().name) or always_update_timestamp:
+ if (caller_kind!="synchronizer") or always_update_timestamp:
self.updated = timezone.now()
# Transmit update via Redis
diff --git a/xos/grpc/apihelper.py b/xos/grpc/apihelper.py
index 0ddaaa0..89d1545 100644
--- a/xos/grpc/apihelper.py
+++ b/xos/grpc/apihelper.py
@@ -1,5 +1,7 @@
import base64
+import datetime
import inspect
+import pytz
import time
from protos import xos_pb2
from google.protobuf.empty_pb2 import Empty
@@ -42,7 +44,9 @@
if not x:
return 0
else:
- return time.mktime(x.timetuple())
+ utc=pytz.utc
+ return (x-datetime.datetime(1970,1,1,tzinfo=utc)).total_seconds()
+ #return time.mktime(x.timetuple())
def convertForeignKey(self, x):
if not x:
@@ -124,7 +128,8 @@
elif (ftype == "ForeignKey"):
args[name] = val # field name already has "_id" at the end
elif (ftype == "DateTimeField"):
- pass # do something special here
+ utc = pytz.utc
+ args[name] = datetime.datetime.fromtimestamp(val,tz=utc)
elif (ftype == "FloatField"):
args[name] = val
elif (ftype == "GenericIPAddressField"):
@@ -169,6 +174,10 @@
for (k, v) in context.invocation_metadata():
if k=="update_fields":
save_kwargs["update_fields"] = v.split(",")
+ elif k=="caller_kind":
+ save_kwargs["caller_kind"] = v
+ elif k=="always_update_timestamp":
+ save_kwargs["always_update_timestamp"] = True
obj.save(**save_kwargs)
return self.objToProto(obj)
diff --git a/xos/synchronizers/new_base/apiaccessor.py b/xos/synchronizers/new_base/apiaccessor.py
new file mode 100644
index 0000000..f855a46
--- /dev/null
+++ b/xos/synchronizers/new_base/apiaccessor.py
@@ -0,0 +1,52 @@
+from modelaccessor import ModelAccessor
+import pytz
+import datetime
+import time
+
+class CoreApiModelAccessor(ModelAccessor):
+ def __init__(self, orm):
+ self.orm = orm
+ super(CoreApiModelAccessor, self).__init__()
+
+ def get_all_model_classes(self):
+ all_model_classes = {}
+ for k in self.orm.all_model_names:
+ all_model_classes[k] = getattr(self.orm,k)
+ return all_model_classes
+
+ def fetch_pending(self, main_objs, deletion=False):
+ if (type(main_objs) is not list):
+ main_objs=[main_objs]
+
+ objs = []
+ for main_obj in main_objs:
+ if (not deletion):
+ lobjs = main_obj.objects.filter_special(main_obj.objects.SYNCHRONIZER_DIRTY_OBJECTS)
+ else:
+ lobjs = main_obj.objects.filter_special(main_obj.objects.SYNCHRONIZER_DELETED_OBJECTS)
+ objs.extend(lobjs)
+
+ return objs
+
+ def obj_exists(self, o):
+ # gRPC will default id to '0' for uninitialized objects
+ return (o.id is not None) and (o.id != 0)
+
+ def obj_in_list(self, o, olist):
+ ids = [x.id for x in olist]
+ return o.id in ids
+
+ def now(self):
+ """ Return the current time for timestamping purposes """
+ utc = pytz.utc
+ now = datetime.datetime.utcnow().replace(tzinfo=utc)
+ return time.mktime(now.timetuple())
+
+ def is_type(self, obj, name):
+ return obj._wrapped_class.__class__.__name__ == name
+
+ def is_instance(self, obj, name):
+ return name in obj.class_names.split(",")
+
+
+
diff --git a/xos/synchronizers/new_base/modelaccessor.py b/xos/synchronizers/new_base/modelaccessor.py
index d56ead2..d8f2692 100644
--- a/xos/synchronizers/new_base/modelaccessor.py
+++ b/xos/synchronizers/new_base/modelaccessor.py
@@ -8,6 +8,9 @@
models into the calling module's scope.
"""
+import functools
+from xos.config import Config
+
class ModelAccessor(object):
def __init__(self):
self.all_model_classes = self.get_all_model_classes()
@@ -52,7 +55,7 @@
""" Return the current time for timestamping purposes """
raise Exception("Not Implemented")
- def update_diag(loop_end=None, loop_start=None, syncrecord_start=None, sync_start=None, backend_status=None):
+ def update_diag(self, loop_end=None, loop_start=None, syncrecord_start=None, sync_start=None, backend_status=None):
""" Update the diagnostic object """
pass
@@ -64,27 +67,62 @@
""" returns True if obj is of model type "name" or is a descendant """
raise Exception("Not Implemented")
+ def journal_object(o, operation, msg=None, timestamp=None):
+ pass
-# TODO: insert logic here to pick accessor based on config setting
-if True:
- from djangoaccessor import DjangoModelAccessor
- model_accessor = DjangoModelAccessor()
-else:
- from apiaccessor import ApiModelAccessor
- model_accessor = CoreApiModelAccessor()
+def import_models_to_globals():
+ # add all models to globals
+ for (k, v) in model_accessor.all_model_classes.items():
+ globals()[k] = v
-# add all models to globals
-for (k, v) in model_accessor.all_model_classes.items():
- globals()[k] = v
+ # plcorebase doesn't exist from the synchronizer's perspective, so fake out
+ # ModelLink.
+ if "ModelLink" not in globals():
+ class ModelLink:
+ def __init__(self,dest,via,into=None):
+ self.dest=dest
+ self.via=via
+ self.into=into
+ globals()["ModelLink"] = ModelLink
-# plcorebase doesn't exist from the synchronizer's perspective, so fake out
-# ModelLink.
-if "ModelLink" not in globals():
- class ModelLink:
- def __init__(self,dest,via,into=None):
- self.dest=dest
- self.via=via
- self.into=into
- globals()["ModelLink"] = ModelLink
+def grpcapi_reconnect(client, reactor):
+ global model_accessor
+ # this will prevent updated timestamps from being automatically updated
+ client.xos_orm.caller_kind = "synchronizer"
+
+ from apiaccessor import CoreApiModelAccessor
+ model_accessor = CoreApiModelAccessor(orm = client.xos_orm)
+ import_models_to_globals()
+
+ # Synchronizer framework isn't ready to embrace reactor yet...
+ reactor.stop()
+
+def config_accessor():
+ global model_accessor
+
+ accessor_kind = getattr(Config(), "observer_accessor_kind", "django")
+
+ if (accessor_kind == "django"):
+ from djangoaccessor import DjangoModelAccessor
+ model_accessor = DjangoModelAccessor()
+ import_models_to_globals()
+ else:
+ grpcapi_endpoint = getattr(Config(), "observer_accessor_endpoint", "xos-core.cord.lab:50051")
+ grpcapi_username = getattr(Config(), "observer_accessor_username", "xosadmin@opencord.org")
+ grpcapi_password = getattr(Config(), "observer_accessor_password")
+
+ from xosapi.xos_grpc_client import SecureClient
+ from twisted.internet import reactor
+
+ grpcapi_client = SecureClient(endpoint = grpcapi_endpoint, username = grpcapi_username, password=grpcapi_password)
+ grpcapi_client.set_reconnect_callback(functools.partial(grpcapi_reconnect, grpcapi_client, reactor))
+ grpcapi_client.start()
+
+ # Start reactor. This will cause the client to connect and then execute
+ # grpcapi_callback().
+
+ reactor.run()
+
+config_accessor()
diff --git a/xos/synchronizers/new_base/syncstep.py b/xos/synchronizers/new_base/syncstep.py
index f43ef6d..e8b18e8 100644
--- a/xos/synchronizers/new_base/syncstep.py
+++ b/xos/synchronizers/new_base/syncstep.py
@@ -5,7 +5,7 @@
from synchronizers.new_base.modelaccessor import *
#from synchronizers.new_base.steps import *
#from synchronizers.new_base.ansible_helper import *
-from generate.dependency_walker import *
+#from generate.dependency_walker import *
import json
import time
@@ -207,11 +207,11 @@
if (deletion):
if getattr(o, "backend_need_reap", False):
# the object has already been deleted and marked for reaping
- journal_object(o,"syncstep.call.already_marked_reap")
+ model_accessor.journal_object(o,"syncstep.call.already_marked_reap")
else:
- journal_object(o,"syncstep.call.delete_record")
+ model_accessor.journal_object(o,"syncstep.call.delete_record")
self.delete_record(o)
- journal_object(o,"syncstep.call.delete_set_reap")
+ model_accessor.journal_object(o,"syncstep.call.delete_set_reap")
o.backend_need_reap = True
o.save(update_fields=['backend_need_reap'])
#o.delete(purge=True)
@@ -228,7 +228,7 @@
o.backend_need_delete = True
o.save(update_fields=['backend_need_delete'])
- journal_object(o,"syncstep.call.sync_record")
+ model_accessor.journal_object(o,"syncstep.call.sync_record")
self.sync_record(o)
model_accessor.update_diag(syncrecord_start = time.time(), backend_status="1 - Synced Record")
@@ -236,8 +236,9 @@
scratchpad = {'next_run':0, 'exponent':0, 'last_success':time.time()}
o.backend_register = json.dumps(scratchpad)
o.backend_status = "1 - OK"
- journal_object(o,"syncstep.call.save_update")
+ model_accessor.journal_object(o,"syncstep.call.save_update")
o.save(update_fields=['enacted','backend_status','backend_register'])
+ logger.info("save sync object, new enacted = %s" % str(new_enacted))
except (InnocuousException,Exception,DeferredException) as e:
logger.log_exc("sync step failed!",extra=o.tologdict())
try:
diff --git a/xos/tools/apigen/modelgen b/xos/tools/apigen/modelgen
index 44eaec4..946ac6a 100755
--- a/xos/tools/apigen/modelgen
+++ b/xos/tools/apigen/modelgen
@@ -1,5 +1,6 @@
#!/usr/bin/python
+import inspect
import os
import pdb
import copy
@@ -20,8 +21,51 @@
django.setup()
+from core.models import PlCoreBase
+
options = None
+def is_model_class(model):
+ """ Return True if 'model' is something that we're interested in """
+ if not inspect.isclass(model):
+ return False
+ if model.__name__ in ["PlModelMixIn"]:
+ return False
+ bases = inspect.getmro(model)
+ bases = [x.__name__ for x in bases]
+ if ("PlCoreBase" in bases) or ("PlModelMixIn" in bases):
+ return True
+
+ return False
+
+def module_has_models(module):
+ """ return True if 'module' contains any models we're interested in """
+ for k in dir(module):
+ v=getattr(module,k)
+ if is_model_class(v):
+ return True
+
+ return False
+
+def app_get_models_module(app):
+ """ check whether 'app' includes XOS models """
+
+ app = app + ".models"
+ try:
+ models_module = __import__(app)
+ except ImportError:
+ return False
+
+ for part in app.split(".")[1:]:
+ if module_has_models(models_module):
+ return models_module
+ models_module = getattr(models_module,part)
+
+ if module_has_models(models_module):
+ return models_module
+
+ return None
+
def singular(foo, keys):
for k in keys:
@@ -41,15 +85,7 @@
model_classes = []
for app in apps:
orig_app=app
- app = app + ".models"
- models_module = __import__(app)
- for part in app.split(".")[1:]:
- if hasattr(models_module, "PlCoreBase"):
- break
- models_module = getattr(models_module,part)
-
- global PlCoreBase
- PlCoreBase = getattr(models_module,"PlCoreBase")
+ models_module = app_get_models_module(app)
for classname in dir(models_module):
c = getattr(models_module, classname, None)
@@ -60,7 +96,7 @@
if (c._meta.app_label == "core") and (orig_app!="core"):
continue
- if type(c)==type(PlCoreBase) and c.__name__ not in options.blacklist:
+ if is_model_class(c) and c.__name__ not in options.blacklist:
model_classes.append(c)
app_map[c.__name__]=orig_app
c.class_name = c.__name__
@@ -239,25 +275,6 @@
cobj.related_name = f.related_name
obj.reverse_refs.append(cobj)
-
-def app_has_models(app):
- """ check whether 'app' includes XOS models """
-
- app = app + ".models"
- try:
- models_module = __import__(app)
- except ImportError:
- return False
- for part in app.split(".")[1:]:
- if hasattr(models_module, "PlCoreBase"):
- return True
- models_module = getattr(models_module,part)
-
- if hasattr(models_module, "PlCoreBase"):
- return True
-
- return False
-
def main():
global options
parser = OptionParser(usage="modelgen [options] <template_fn>", )
@@ -290,7 +307,7 @@
options.apps = ["core"]
if options.apps == ["*"]:
- options.apps = [x for x in settings.INSTALLED_APPS if app_has_models(x)]
+ options.apps = [x for x in settings.INSTALLED_APPS if app_get_models_module(x)]
if len(args)!=1:
print 'Usage: modelgen [options] <template_fn>'
diff --git a/xos/xos_client/setup.py b/xos/xos_client/setup.py
index 4d601c2..0892a33 100644
--- a/xos/xos_client/setup.py
+++ b/xos/xos_client/setup.py
@@ -22,7 +22,8 @@
'xosapi.chameleon.protos.third_party.google.api',
'xosapi.chameleon.utils',
'xosapi.chameleon.protoc_plugins',
- 'xosapi'],
+ 'xosapi',
+ 'xosapi.convenience'],
py_modules= ['xosapi.chameleon.__init__'],
include_package_data=True,
package_data = {'xosapi.chameleon.protos.third_party.google.api': ['*.proto'],
diff --git a/xos/xos_client/xosapi/convenience/__init__.py b/xos/xos_client/xosapi/convenience/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/xos/xos_client/xosapi/convenience/__init__.py
diff --git a/xos/xos_client/xosapi/convenience/instance.py b/xos/xos_client/xosapi/convenience/instance.py
new file mode 100644
index 0000000..10da548
--- /dev/null
+++ b/xos/xos_client/xosapi/convenience/instance.py
@@ -0,0 +1,55 @@
+from xosapi.orm import ORMWrapper, register_convenience_wrapper
+
+class ORMWrapperInstance(ORMWrapper):
+
+ def all_ips(self):
+ ips={}
+ for ns in self.ports.all():
+ if ns.ip:
+ ips[ns.network.name] = ns.ip
+ return ips
+
+ def all_ips_string(self):
+ result = []
+ ips = self.all_ips()
+ for key in sorted(ips.keys()):
+ #result.append("%s = %s" % (key, ips[key]))
+ result.append(ips[key])
+ return ", ".join(result)
+
+ def get_public_ip(self):
+ for ns in self.ports.all():
+ if (ns.ip) and (ns.network.template.visibility=="public") and (ns.network.template.translation=="none"):
+ return ns.ip
+ return None
+
+ # return an address on nat-net
+ def get_network_ip(self, pattern):
+ for ns in self.ports.all():
+ if pattern in ns.network.name.lower():
+ return ns.ip
+ return None
+
+ # return an address that the synchronizer can use to SSH to the instance
+ def get_ssh_ip(self):
+ # first look specifically for a management_local network
+ for ns in self.ports.all():
+ if ns.network.template and ns.network.template.vtn_kind=="MANAGEMENT_LOCAL":
+ return ns.ip
+
+ # for compatibility, now look for any management network
+ management=self.get_network_ip("management")
+ if management:
+ return management
+
+ # if all else fails, look for nat-net (for OpenCloud?)
+ return self.get_network_ip("nat")
+
+ @property
+ def controller(self):
+ if self.node and self.node.site_deployment:
+ return self.node.site_deployment.controller
+ else:
+ return None
+
+register_convenience_wrapper("Instance", ORMWrapperInstance)
diff --git a/xos/xos_client/xosapi/orm.py b/xos/xos_client/xosapi/orm.py
index 9572087..37f706d 100644
--- a/xos/xos_client/xosapi/orm.py
+++ b/xos/xos_client/xosapi/orm.py
@@ -27,6 +27,8 @@
from google.protobuf import symbol_database as _symbol_database
_sym_db = _symbol_database.Default()
+convenience_wrappers = {}
+
class ORMWrapper(object):
""" Wraps a protobuf object to provide ORM features """
@@ -68,7 +70,7 @@
def fk_resolve(self, name):
if name in self.cache:
- return ORMWrapper(self.cache[name], self.stub)
+ return make_ORMWrapper(self.cache[name], self.stub)
fk_entry = self._fkmap[name]
id=self.stub.make_ID(id=getattr(self, fk_entry["src_fieldName"]))
@@ -76,7 +78,7 @@
self.cache[name] = dest_model
- return ORMWrapper(dest_model, self.stub)
+ return make_ORMWrapper(dest_model, self.stub)
def reverse_fk_resolve(self, name):
if name not in self.reverse_cache:
@@ -159,6 +161,14 @@
id = self.stub.make_ID(id=self._wrapped_class.id)
self.stub.invoke("Delete%s" % self._wrapped_class.__class__.__name__, id)
+ def tologdict(self):
+ try:
+ d = {'model_name':self.__class__.__name__, 'pk': self.pk}
+ except:
+ d = {}
+
+ return d
+
class ORMLocalObjectManager(object):
""" Manages a local list of objects """
@@ -182,7 +192,7 @@
def all(self):
models = self.resolve_queryset()
- return [ORMWrapper(x,self._stub) for x in models]
+ return [make_ORMWrapper(x,self._stub) for x in models]
class ORMObjectManager(object):
""" Manages a remote list of objects """
@@ -197,17 +207,23 @@
self._packageName = packageName
def wrap_single(self, obj):
- return ORMWrapper(obj, self._stub)
+ return make_ORMWrapper(obj, self._stub)
def wrap_list(self, obj):
result=[]
for item in obj.items:
- result.append(ORMWrapper(item, self._stub))
+ result.append(make_ORMWrapper(item, self._stub))
return result
def all(self):
return self.wrap_list(self._stub.invoke("List%s" % self._modelName, Empty()))
+ def first(self):
+ objs=self.wrap_list(self._stub.invoke("List%s" % self._modelName, Empty()))
+ if not objs:
+ return None
+ return objs[0]
+
def filter(self, **kwargs):
q = self._stub.make_Query()
q.kind = q.DEFAULT
@@ -243,23 +259,34 @@
q.kind = kind
return self.wrap_list(self._stub.invoke("Filter%s" % self._modelName, q))
- def get(self, id):
- return self.wrap_single(self._stub.invoke("Get%s" % self._modelName, self._stub.make_ID(id=id)))
+ def get(self, **kwargs):
+ if kwargs.keys() == ["id"]:
+ # the fast and easy case, look it up by id
+ return self.wrap_single(self._stub.invoke("Get%s" % self._modelName, self._stub.make_ID(id=kwargs["id"])))
+ else:
+ # the slightly more difficult case, filter and return the first item
+ objs = self.filter(**kwargs)
+ return objs[0]
def new(self, **kwargs):
full_model_name = "%s.%s" % (self._packageName, self._modelName)
cls = _sym_db._classes[full_model_name]
- return ORMWrapper(cls(), self._stub, is_new=True)
+ return make_ORMWrapper(cls(), self._stub, is_new=True)
class ORMModelClass(object):
def __init__(self, stub, model_name, package_name):
+ self.model_name = model_name
self.objects = ORMObjectManager(stub, model_name, package_name)
+ def __name__(self):
+ return self.model_name
+
class ORMStub(object):
- def __init__(self, stub, package_name, invoker=None):
+ def __init__(self, stub, package_name, invoker=None, caller_kind="grpcapi"):
self.grpc_stub = stub
self.all_model_names = []
self.invoker = invoker
+ self.caller_kind = caller_kind
for name in dir(stub):
if name.startswith("Get"):
@@ -271,7 +298,20 @@
def listObjects(self):
return self.all_model_names
+ def add_default_metadata(self, metadata):
+ default_metadata = [ ("caller_kind", self.caller_kind) ]
+
+ # build up a list of metadata keys we already have
+ md_keys=[x[0] for x in metadata]
+
+ # add any defaults that we don't already have
+ for md in default_metadata:
+ if md[0] not in md_keys:
+ metadata.append( (md[0], md[1]) )
+
def invoke(self, name, request, metadata=[]):
+ self.add_default_metadata(metadata)
+
if self.invoker:
# Hook in place to call Chameleon's invoke method, as soon as we
# have rewritten the synchronizer to use reactor.
@@ -301,5 +341,18 @@
def make_Query(self):
return _sym_db._classes["xos.Query"]()
+def register_convenience_wrapper(class_name, wrapper):
+ global convenience_wrappers
+ convenience_wrappers[class_name] = wrapper
+
+def make_ORMWrapper(wrapped_class, *args, **kwargs):
+ if wrapped_class.__class__.__name__ in convenience_wrappers:
+ cls = convenience_wrappers[wrapped_class.__class__.__name__]
+ else:
+ cls = ORMWrapper
+
+ return cls(wrapped_class, *args, **kwargs)
+
+import convenience.instance