[CORD-1360] Using new Config in synchronizers
Change-Id: Iaa7c3394971286f4823a906fc184b0b83ad7ebbd
diff --git a/containers/xos/Dockerfile.synchronizer-base b/containers/xos/Dockerfile.synchronizer-base
index 1310cd2..66cd3da 100644
--- a/containers/xos/Dockerfile.synchronizer-base
+++ b/containers/xos/Dockerfile.synchronizer-base
@@ -7,6 +7,11 @@
COPY xos/xos/xml_util.py /opt/xos/xos/xml_util.py
COPY xos/xos/__init__.py /opt/xos/xos/__init__.py
COPY xos/synchronizers/__init__.py /opt/xos/synchronizers/__init__.py
+COPY lib/xos-config /opt/xos/lib/xos-config
+
+# Install the config module
+RUN cd /opt/xos/lib/xos-config/; \
+ python setup.py install
# Label image
ARG org_label_schema_schema_version=1.0
diff --git a/containers/xos/pip_requirements.txt b/containers/xos/pip_requirements.txt
index 6ab4271..c7bcf6e 100644
--- a/containers/xos/pip_requirements.txt
+++ b/containers/xos/pip_requirements.txt
@@ -1,4 +1,5 @@
pytz==2017.2
+sphinx==1.3.1
Babel==2.3.4
Django==1.8.18
Jinja2==2.9.6
diff --git a/lib/xos-config/MANIFEST.in b/lib/xos-config/MANIFEST.in
index 19da3ea..2887117 100644
--- a/lib/xos-config/MANIFEST.in
+++ b/lib/xos-config/MANIFEST.in
@@ -1 +1,2 @@
-include xosconfig/config-schema.yaml
\ No newline at end of file
+include xosconfig/xos-config-schema.yaml
+include xosconfig/synchronizer-config-schema.yaml
\ No newline at end of file
diff --git a/lib/xos-config/tests/config_test.py b/lib/xos-config/tests/config_test.py
index a3e57db..b98b817 100644
--- a/lib/xos-config/tests/config_test.py
+++ b/lib/xos-config/tests/config_test.py
@@ -9,6 +9,8 @@
invalid_format = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/confs/invalid_format.yaml")
sample_conf = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/confs/sample_conf.yaml")
+small_schema = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/schemas/small_schema.yaml")
+
services_list = {
"xos-ws": [],
"xos-db": [],
@@ -81,13 +83,34 @@
def test_env_override(self):
"""
- [XOS-Config] the XOS-CONFIG environment variable should override the config_file
+ [XOS-Config] the XOS_CONFIG_FILE environment variable should override the config_file
"""
- os.environ["XOS-CONFIG"] = "env.yaml"
+ os.environ["XOS_CONFIG_FILE"] = "env.yaml"
with self.assertRaises(Exception) as e:
Config.init("missing_conf")
self.assertEqual(e.exception.message, "[XOS-Config] Config file not found at: env.yaml")
- del os.environ["XOS-CONFIG"]
+ del os.environ["XOS_CONFIG_FILE"]
+
+ def test_schema_override(self):
+ """
+ [XOS-Config] the XOS_CONFIG_SCHEMA environment variable should override the config_schema
+ """
+ os.environ["XOS_CONFIG_SCHEMA"] = "env-schema.yaml"
+ with self.assertRaises(Exception) as e:
+ Config.init(basic_conf)
+ self.assertRegexpMatches(e.exception.message, '\[XOS\-Config\] Config schema not found at: (.+)env-schema\.yaml')
+ # self.assertEqual(e.exception.message, "[XOS-Config] Config schema not found at: env-schema.yaml")
+ del os.environ["XOS_CONFIG_SCHEMA"]
+
+ def test_schema_override_usage(self):
+ """
+ [XOS-Config] the XOS_CONFIG_SCHEMA should be used to validate a config
+ """
+ os.environ["XOS_CONFIG_SCHEMA"] = small_schema
+ with self.assertRaises(Exception) as e:
+ Config.init(basic_conf)
+ self.assertEqual(e.exception.message, "[XOS-Config] The config format is wrong: Schema validation failed:\n - Key 'database' was not defined. Path: ''.")
+ del os.environ["XOS_CONFIG_SCHEMA"]
def test_get_cli_param(self):
"""
@@ -99,13 +122,15 @@
def test_get_default_val_for_missing_param(self):
"""
- [XOS-Config] Should raise reading a missing param
+ [XOS-Config] Should get the default value if nothing is specified
"""
Config.init(basic_conf)
log = Config.get("logging")
self.assertEqual(log, {
"level": "info",
- "channels": ["file", "console"]
+ "channels": ["file", "console"],
+ "logstash_hostport": "cordloghost:5617",
+ "file": "/var/log/xos.log",
})
def _test_get_missing_param(self):
@@ -113,9 +138,8 @@
[XOS-Config] Should raise reading a missing param
"""
Config.init(sample_conf)
- with self.assertRaises(Exception) as e:
- Config.get("foo")
- self.assertEqual(e.exception.message, "[XOS-Config] Config does not have a value (or a default) parameter foo")
+ res = Config.get("foo")
+ self.assertEqual(res, None)
def test_get_first_level(self):
"""
@@ -192,4 +216,8 @@
with patch("xosconfig.config.requests.get") as mock_get:
mock_get.return_value.json.return_value = db_service
endpoint = Config.get_service_endpoint("xos-db")
- self.assertEqual(endpoint, "http://172.18.0.4:5432")
\ No newline at end of file
+ self.assertEqual(endpoint, "http://172.18.0.4:5432")
+
+
+if __name__ == '__main__':
+ unittest.main()
\ No newline at end of file
diff --git a/lib/xos-config/tests/confs/sample_conf.yaml b/lib/xos-config/tests/confs/sample_conf.yaml
index 4d4c792..b9867ed 100644
--- a/lib/xos-config/tests/confs/sample_conf.yaml
+++ b/lib/xos-config/tests/confs/sample_conf.yaml
@@ -7,4 +7,5 @@
level: info
channels:
- file
- - console
\ No newline at end of file
+ - console
+xos_dir: /opt/xos
\ No newline at end of file
diff --git a/lib/xos-config/tests/schemas/small_schema.yaml b/lib/xos-config/tests/schemas/small_schema.yaml
new file mode 100644
index 0000000..4afbd92
--- /dev/null
+++ b/lib/xos-config/tests/schemas/small_schema.yaml
@@ -0,0 +1,4 @@
+map:
+ name:
+ type: str
+ required: True
\ No newline at end of file
diff --git a/lib/xos-config/xosconfig/config.py b/lib/xos-config/xosconfig/config.py
index 608beac..3e0482d 100644
--- a/lib/xos-config/xosconfig/config.py
+++ b/lib/xos-config/xosconfig/config.py
@@ -6,6 +6,7 @@
from pykwalify.core import Core as PyKwalify
DEFAULT_CONFIG_FILE = "/opt/xos/xos_config.yaml"
+DEFAULT_CONFIG_SCHEMA = 'xos-config-schema.yaml'
INITIALIZED = False
CONFIG = {}
@@ -15,7 +16,12 @@
"""
@staticmethod
- def init(config_file=DEFAULT_CONFIG_FILE):
+ def init(config_file=DEFAULT_CONFIG_FILE, config_schema=DEFAULT_CONFIG_SCHEMA):
+
+ # make schema relative to this directory
+ # TODO give the possibility to specify an absolute path
+ config_schema = Config.get_abs_path(config_schema)
+
global INITIALIZED
global CONFIG
# the config module can be initialized only one
@@ -23,16 +29,22 @@
raise Exception('[XOS-Config] Module already initialized')
INITIALIZED = True
- # if XOS-CONFIG is defined override the config_file
- if os.environ.get('XOS-CONFIG'):
- config_file = os.environ['XOS-CONFIG']
+ # if XOS_CONFIG_FILE is defined override the config_file
+ # FIXME shouldn't this stay in whatever module call this one? and then just pass the file to the init method
+ if os.environ.get('XOS_CONFIG_FILE'):
+ config_file = os.environ['XOS_CONFIG_FILE']
+
+ # if XOS_CONFIG_SCHEMA is defined override the config_schema
+ # FIXME shouldn't this stay in whatever module call this one? and then just pass the file to the init method
+ if os.environ.get('XOS_CONFIG_SCHEMA'):
+ config_schema = Config.get_abs_path(os.environ['XOS_CONFIG_SCHEMA'])
# if a -C parameter is set in the cli override the config_file
# FIXME shouldn't this stay in whatever module call this one? and then just pass the file to the init method
if Config.get_cli_param(sys.argv):
- config_file = Config.get_cli_param(sys.argv)
+ config_schema = Config.get_cli_param(sys.argv)
- CONFIG = Config.read_config(config_file)
+ CONFIG = Config.read_config(config_file, config_schema)
@staticmethod
def clear():
@@ -40,8 +52,14 @@
INITIALIZED = False
@staticmethod
- def validate_config_format(config_file):
- schema = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + '/config-schema.yaml')
+ def get_abs_path(path):
+ if os.path.isabs(path):
+ return path
+ return os.path.dirname(os.path.realpath(__file__)) + '/' + path
+
+ @staticmethod
+ def validate_config_format(config_file, config_schema):
+ schema = os.path.abspath(config_schema)
c = PyKwalify(source_file=config_file, schema_files=[schema])
c.validate(raise_exception=True)
@@ -54,7 +72,7 @@
last = arg
@staticmethod
- def read_config(config_file):
+ def read_config(config_file, config_schema):
"""
Read the configuration file and return a dictionary
:param config_file: string
@@ -63,8 +81,11 @@
if not os.path.exists(config_file):
raise Exception('[XOS-Config] Config file not found at: %s' % config_file)
+ if not os.path.exists(config_schema):
+ raise Exception('[XOS-Config] Config schema not found at: %s' % config_schema)
+
try:
- Config.validate_config_format(config_file)
+ Config.validate_config_format(config_file, config_schema)
except Exception, e:
raise Exception('[XOS-Config] The config format is wrong: %s' % e.msg)
@@ -88,7 +109,9 @@
if not val:
val = Config.get_param(query, default.DEFAULT_VALUES)
if not val:
- raise Exception('[XOS-Config] Config does not have a value (or a default) parameter %s' % query)
+ # TODO if no val return none
+ # raise Exception('[XOS-Config] Config does not have a value (or a default) parameter %s' % query)
+ return None
return val
@staticmethod
@@ -166,7 +189,5 @@
service = Config.get_service_info(service_name)
return 'http://%s:%s' % (service['url'], service['port'])
-# NOTE is this needed if this package is not meant to be execute from the CLI?
if __name__ == '__main__':
- config = Config()
- config.init()
\ No newline at end of file
+ Config.init()
\ No newline at end of file
diff --git a/lib/xos-config/xosconfig/default.py b/lib/xos-config/xosconfig/default.py
index 02abaa6..f5cd761 100644
--- a/lib/xos-config/xosconfig/default.py
+++ b/lib/xos-config/xosconfig/default.py
@@ -1,9 +1,27 @@
DEFAULT_VALUES = {
'xos_dir': '/opt/xos',
'logging': {
- 'file': '/var/log/xos.log',
+ 'file': '/var/log/xos.log', # TODO remove me, the new logger will be able to decide on which file to log
'level': 'info',
'channels': ['file', 'console'],
'logstash_hostport': 'cordloghost:5617'
- }
+ },
+ 'accessor': {
+ 'endpoint': 'xos-core.cord.lab:50051',
+ },
+ 'keep_temp_files': False,
+ 'enable_watchers': False,
+ 'dependency_graph': '/opt/xos/model-deps',
+ 'error_map_path': '/opt/xos/error_map.txt',
+ 'feefie': {
+ 'client_user': 'pl'
+ },
+ 'proxy_ssh': {
+ 'enabled': True,
+ 'key': '/opt/cord_profile/node_key',
+ 'user': 'root'
+ },
+ 'node_key': '/opt/cord_profile/node_key',
+ 'config_dir': '/etc/xos/sync',
+ 'backoff_disabled': True
}
\ No newline at end of file
diff --git a/lib/xos-config/xosconfig/synchronizer-config-schema.yaml b/lib/xos-config/xosconfig/synchronizer-config-schema.yaml
new file mode 100644
index 0000000..16c5433
--- /dev/null
+++ b/lib/xos-config/xosconfig/synchronizer-config-schema.yaml
@@ -0,0 +1,80 @@
+map:
+ name:
+ type: str
+ required: True
+ xos_dir:
+ type: str
+ logging:
+ type: map
+ map:
+ level:
+ type: str
+ channels:
+ type: seq
+ sequence:
+ - type: str
+ enum: ['file', 'console', 'elkstack']
+ dependency_graph:
+ type: str
+ steps_dir:
+ type: str
+ sys_dir:
+ type: str
+ accessor:
+ type: map
+ required: True
+ map:
+ username:
+ type: str
+ required: True
+ password:
+ type: str
+ required: True
+ required_models:
+ type: seq
+ sequence:
+ - type: str
+ keep_temp_files:
+ type: bool
+ proxy_ssh:
+ type: map
+ map:
+ enabled:
+ type: bool
+ required: True
+ key:
+ type: str
+ user:
+ type: str
+ enable_watchers:
+ type: bool
+ model_policies_dir:
+ type: str
+ error_map_path:
+ type: str
+ feefie:
+ type: map
+ map:
+ client_id:
+ type: str
+ user_id:
+ type: str
+ node_key:
+ type: str
+ config_dir:
+ type: str
+ backoff_disabled:
+ type: bool
+ images_directory:
+ type: str
+ nova:
+ type: map
+ map:
+ enabled:
+ type: bool
+ ca_ssl_cert:
+ type: str
+ default_flavor:
+ type: str
+ default_security_group:
+ type: str
\ No newline at end of file
diff --git a/lib/xos-config/xosconfig/config-schema.yaml b/lib/xos-config/xosconfig/xos-config-schema.yaml
similarity index 84%
rename from lib/xos-config/xosconfig/config-schema.yaml
rename to lib/xos-config/xosconfig/xos-config-schema.yaml
index b79905e..a2b2c75 100644
--- a/lib/xos-config/xosconfig/config-schema.yaml
+++ b/lib/xos-config/xosconfig/xos-config-schema.yaml
@@ -23,8 +23,11 @@
type: str
level:
type: str
+ # TODO add validation [info, debug, warning, error, critical]
channels:
type: seq
sequence:
- type: str
- enum: ['file', 'console', 'elkstack']
\ No newline at end of file
+ enum: ['file', 'console', 'elkstack']
+ xos_dir:
+ type: str
\ No newline at end of file
diff --git a/xos/generate/dependency_walker.py b/xos/generate/dependency_walker.py
index 65721a9..d2f97af 100644
--- a/xos/generate/dependency_walker.py
+++ b/xos/generate/dependency_walker.py
@@ -1,8 +1,10 @@
#!/usr/bin/env python
+# NOTE is this used or replaced by `new_base/dependency_walker_new.py`?
+
import os
import imp
-from xos.config import Config, XOS_DIR
+from xosconfig import Config
import inspect
import time
import traceback
@@ -11,16 +13,15 @@
import json
import pdb
from core.models import *
-
from xos.logger import Logger, logging
logger = Logger(level=logging.INFO)
missing_links={}
try:
- dep_data = open(Config().dependency_graph).read()
+ dep_data = open(Config.get('dependency_graph')).read()
except:
- dep_data = open(XOS_DIR + '/model-deps').read()
+ raise Exception('[XOS-Dependency-Walker] File %s not found' % Config.get('dependency_graph'))
dependencies = json.loads(dep_data)
diff --git a/xos/synchronizers/ec2/__init__.py b/xos/synchronizers/ec2/__init__.py
deleted file mode 100644
index f4c7743..0000000
--- a/xos/synchronizers/ec2/__init__.py
+++ /dev/null
@@ -1,32 +0,0 @@
-from xos.config import Config
-
-try:
- observer_disabled = Config().observer_disabled
-except:
- observer_disabled = False
-
-print_once = True
-
-if (not observer_disabled):
- from .event_manager import EventSender
-
- def notify_observer(model=None, delete=False, pk=None, model_dict={}):
- try:
- if (model and delete):
- if hasattr(model,"__name__"):
- modelName = model.__name__
- else:
- modelName = model.__class__.__name__
- EventSender().fire(delete_flag = delete, model = modelName, pk = pk, model_dict=model_dict)
- else:
- EventSender().fire()
- except Exception,e:
- print "Exception in Observer. This should not disrupt the front end. %s"%str(e)
-
-else:
- def notify_observer(model=None, delete=False, pk=None, model_dict={}):
- global print_once
- if (print_once):
- print "The observer is disabled"
- print_once = False
- return
diff --git a/xos/synchronizers/ec2/aws_lib.py b/xos/synchronizers/ec2/aws_lib.py
deleted file mode 100644
index e116295..0000000
--- a/xos/synchronizers/ec2/aws_lib.py
+++ /dev/null
@@ -1,18 +0,0 @@
-import os
-import json
-
-class AwsException(Exception):
- pass
-
-def aws_run(cmd):
- cmd = 'aws %s'%cmd
- pipe = os.popen(cmd)
- output_str = pipe.read()
-
- if (not pipe.close()):
- output = json.loads(output_str)
- return output
- else:
- raise AwsException("Error running command: %s"%cmd)
-
-
diff --git a/xos/synchronizers/ec2/awslib.py b/xos/synchronizers/ec2/awslib.py
deleted file mode 100644
index ca708ff..0000000
--- a/xos/synchronizers/ec2/awslib.py
+++ /dev/null
@@ -1,18 +0,0 @@
-import os
-import json
-
-class AwsException(Exception):
- pass
-
-def aws_run(cmd,env=''):
- cmd = '%s aws %s'%(env,cmd)
- pipe = os.popen(cmd)
- output_str = pipe.read()
-
- if (not pipe.close()):
- output = json.loads(output_str)
- return output
- else:
- raise AwsException("Error: %s"%output_str)
-
-
diff --git a/xos/synchronizers/ec2/backend.py b/xos/synchronizers/ec2/backend.py
deleted file mode 100644
index 7288e61..0000000
--- a/xos/synchronizers/ec2/backend.py
+++ /dev/null
@@ -1,22 +0,0 @@
-import threading
-import time
-from ec2_observer.event_loop import XOSObserver
-from ec2_observer.event_manager import EventListener
-from xos.logger import Logger, logging
-
-logger = Logger(level=logging.INFO)
-
-class Backend:
-
- def run(self):
- # start the openstack observer
- observer = XOSObserver()
- observer_thread = threading.Thread(target=observer.run)
- observer_thread.start()
-
- # start event listene
- event_manager = EventListener(wake_up=observer.wake_up)
- event_manager_thread = threading.Thread(target=event_manager.run)
- event_manager_thread.start()
- logger.log_exc("Exception in child thread")
-
diff --git a/xos/synchronizers/ec2/creds.py b/xos/synchronizers/ec2/creds.py
deleted file mode 100644
index 0a29c0d..0000000
--- a/xos/synchronizers/ec2/creds.py
+++ /dev/null
@@ -1,16 +0,0 @@
-from core.models import *
-
-def get_creds(user=None, slice=None, site=None, deployment=None):
- if (not user or not site):
- raise Exception('User and Site have to be in context to use EC2')
-
- cred = UserCredential.objects.filter(user=user)
- if (not cred):
- cred = SiteCredential.objects.filter(site=site)
-
- if (cred):
- env = 'AWS_ACCESS_KEY_ID=%s AWS_SECRET_ACCESS_KEY=%s'%(cred.key_id, cred.enc_value)
- else:
- env = ''
-
- return env
diff --git a/xos/synchronizers/ec2/deleter.py b/xos/synchronizers/ec2/deleter.py
deleted file mode 100644
index 93fa572..0000000
--- a/xos/synchronizers/ec2/deleter.py
+++ /dev/null
@@ -1,16 +0,0 @@
-import os
-import base64
-from xos.config import Config
-
-class Deleter:
- model=None # Must be overridden
-
- def __init__(self, *args, **kwargs):
- pass
-
- def call(self, pk, model_dict):
- # Fetch object from XOS db and delete it
- pass
-
- def __call__(self, *args, **kwargs):
- return self.call(*args, **kwargs)
diff --git a/xos/synchronizers/ec2/deleters/__init__.py b/xos/synchronizers/ec2/deleters/__init__.py
deleted file mode 100644
index 9cfd951..0000000
--- a/xos/synchronizers/ec2/deleters/__init__.py
+++ /dev/null
@@ -1,18 +0,0 @@
-import os
-
-deleters = {}
-_path = os.path.join('.',os.path.dirname(__file__))
-
-_files = os.listdir(_path)
-_files = filter(lambda x:x.endswith('deleter.py'),_files)
-_files = map(lambda x:x.rstrip('.py'),_files)
-
-"""
-for f in _files:
- m = __import__(f)
- deleter = getattr(m,f.title().replace('_',''))
- try:
- deleters[deleter.model].append(deleter)
- except KeyError:
- deleters[deleter.model]=[deleter]
-"""
diff --git a/xos/synchronizers/ec2/deleters/instance_deleter.py b/xos/synchronizers/ec2/deleters/instance_deleter.py
deleted file mode 100644
index ba454e7..0000000
--- a/xos/synchronizers/ec2/deleters/instance_deleter.py
+++ /dev/null
@@ -1,14 +0,0 @@
-from core.models import Instance, SliceDeployments
-from synchronizers.base.deleter import Deleter
-
-class InstanceDeleter(Deleter):
- model='Instance'
-
- def call(self, pk):
- instance = Instance.objects.get(pk=pk)
- if instance.instance_id:
- driver = self.driver.client_driver(caller=instance.creator,
- tenant=instance.slice.name,
- deployment=instance.deploymentNetwork.name)
- driver.destroy_instance(instance.instance_id)
- instance.delete()
diff --git a/xos/synchronizers/ec2/deleters/network_deleter.py b/xos/synchronizers/ec2/deleters/network_deleter.py
deleted file mode 100644
index ba9cd09..0000000
--- a/xos/synchronizers/ec2/deleters/network_deleter.py
+++ /dev/null
@@ -1,19 +0,0 @@
-from core.models import Network, NetworkDeployments
-from synchronizers.base.deleter import Deleter
-from synchronizers.base.deleters.network_deployment_deleter import NetworkDeploymentDeleter
-from xos.logger import Logger, logging
-
-logger = Logger(level=logging.INFO)
-
-class NetworkDeleter(Deleter):
- model='Network'
-
- def call(self, pk):
- network = Network.objects.get(pk=pk)
- network_deployment_deleter = NetworkDeploymentDeleter()
- for network_deployment in NetworkDeployments.objects.filter(network=network):
- try:
- network_deployment_deleter(network_deployment.id)
- except:
- logger.log_exc("Failed to delete network deployment %s" % network_deployment,extra=network.tologdict())
- network.delete()
diff --git a/xos/synchronizers/ec2/deleters/network_deployment_deleter.py b/xos/synchronizers/ec2/deleters/network_deployment_deleter.py
deleted file mode 100644
index f8aaa29..0000000
--- a/xos/synchronizers/ec2/deleters/network_deployment_deleter.py
+++ /dev/null
@@ -1,21 +0,0 @@
-from core.models import Network, NetworkDeployments
-from synchronizers.base.deleter import Deleter
-from openstack.driver import OpenStackDriver
-
-class NetworkDeploymentDeleter(Deleter):
- model='NetworkDeployment'
-
- def call(self, pk):
- network_deployment = NetworkDeployments.objects.get(pk=pk)
- driver = OpenStackDriver().client_driver(caller=network_deployment.network.owner.creator,
- tenant=network_deployment.network.owner.name,
- deployment=network_deployment.deployment.name)
- if (network_deployment.router_id) and (network_deployment.subnet_id):
- driver.delete_router_interface(network_deployment.router_id, network_deployment.subnet_id)
- if network_deployment.subnet_id:
- driver.delete_subnet(network_deployment.subnet_id)
- if network_deployment.router_id:
- driver.delete_router(network_deployment.router_id)
- if network_deployment.net_id:
- driver.delete_network(network_deployment.net_id)
- network_deployment.delete()
diff --git a/xos/synchronizers/ec2/deleters/network_instance_deleter.py b/xos/synchronizers/ec2/deleters/network_instance_deleter.py
deleted file mode 100644
index 21fe87f..0000000
--- a/xos/synchronizers/ec2/deleters/network_instance_deleter.py
+++ /dev/null
@@ -1,13 +0,0 @@
-from core.models import NetworkInstance
-from synchronizers.base.deleter import Deleter
-
-class NetworkInstanceDeleter(Deleter):
- model='NetworkInstance'
-
- def call(self, pk):
- network_instance = NetworkInstances.objects.get(pk=pk)
- # handle openstack delete
-
- network_instance.delete()
-
-
diff --git a/xos/synchronizers/ec2/deleters/site_deleter.py b/xos/synchronizers/ec2/deleters/site_deleter.py
deleted file mode 100644
index 832baf9..0000000
--- a/xos/synchronizers/ec2/deleters/site_deleter.py
+++ /dev/null
@@ -1,14 +0,0 @@
-from core.models import Site, SiteDeployment
-from synchronizers.base.deleter import Deleter
-from synchronizers.base.deleters.site_deployment_deleter import SiteDeploymentDeleter
-
-class SiteDeleter(Deleter):
- model='Site'
-
- def call(self, pk):
- site = Site.objects.get(pk=pk)
- site_deployments = SiteDeployment.objects.filter(site=site)
- site_deployment_deleter = SiteDeploymentDeleter()
- for site_deployment in site_deployments:
- site_deployment_deleter(site_deployment.id)
- site.delete()
diff --git a/xos/synchronizers/ec2/deleters/site_deployment_deleter.py b/xos/synchronizers/ec2/deleters/site_deployment_deleter.py
deleted file mode 100644
index 794b438..0000000
--- a/xos/synchronizers/ec2/deleters/site_deployment_deleter.py
+++ /dev/null
@@ -1,12 +0,0 @@
-from core.models import Site, SiteDeployment
-from synchronizers.base.deleter import Deleter
-
-class SiteDeploymentDeleter(Deleter):
- model='SiteDeployment'
-
- def call(self, pk):
- site_deployment = SiteDeployment.objects.get(pk=pk)
- if site_deployment.tenant_id:
- driver = self.driver.admin_driver(deployment=site_deployment.deployment.name)
- driver.delete_tenant(site_deployment.tenant_id)
- site_deployment.delete()
diff --git a/xos/synchronizers/ec2/deleters/slice_deleter.py b/xos/synchronizers/ec2/deleters/slice_deleter.py
deleted file mode 100644
index 6b800ac..0000000
--- a/xos/synchronizers/ec2/deleters/slice_deleter.py
+++ /dev/null
@@ -1,19 +0,0 @@
-from core.models import Slice, SliceDeployments, User
-from synchronizers.base.deleter import Deleter
-from synchronizers.base.deleters.slice_deployment_deleter import SliceDeploymentsDeleter
-from xos.logger import Logger, logging
-
-logger = Logger(level=logging.INFO)
-
-class SliceDeleter(Deleter):
- model='Slice'
-
- def call(self, pk):
- slice = Slice.objects.get(pk=pk)
- slice_deployment_deleter = SliceDeploymentsDeleter()
- for slice_deployment in SliceDeployments.objects.filter(slice=slice):
- try:
- slice_deployment_deleter(slice_deployment.id)
- except:
- logger.log_exc("Failed to delete slice_deployment %s" % slice_deployment,extra=slice.tologdict())
- slice.delete()
diff --git a/xos/synchronizers/ec2/deleters/slice_deployment_deleter.py b/xos/synchronizers/ec2/deleters/slice_deployment_deleter.py
deleted file mode 100644
index 39c557a..0000000
--- a/xos/synchronizers/ec2/deleters/slice_deployment_deleter.py
+++ /dev/null
@@ -1,34 +0,0 @@
-from core.models import Slice, SliceDeployments, User
-from synchronizers.base.deleter import Deleter
-from openstack.driver import OpenStackDriver
-
-class SliceDeploymentsDeleter(Deleter):
- model='SliceDeployments'
-
- def call(self, pk):
- slice_deployment = SliceDeployments.objects.get(pk=pk)
- user = User.objects.get(id=slice_deployment.slice.creator.id)
- driver = OpenStackDriver().admin_driver(deployment=slice_deployment.deployment.name)
- client_driver = driver.client_driver(caller=user,
- tenant=slice_deployment.slice.name,
- deployment=slice_deployment.deployment.name)
-
- if slice_deployment.router_id and slice_deployment.subnet_id:
- client_driver.delete_router_interface(slice_deployment.router_id, slice_deployment.subnet_id)
- if slice_deployment.subnet_id:
- client_driver.delete_subnet(slice_deployment.subnet_id)
- if slice_deployment.router_id:
- client_driver.delete_router(slice_deployment.router_id)
- if slice_deployment.network_id:
- client_driver.delete_network(slice_deployment.network_id)
- if slice_deployment.tenant_id:
- driver.delete_tenant(slice_deployment.tenant_id)
- # delete external route
- #subnet = None
- #subnets = client_driver.shell.quantum.list_subnets()['subnets']
- #for snet in subnets:
- # if snet['id'] == slice_deployment.subnet_id:
- # subnet = snet
- #if subnet:
- # driver.delete_external_route(subnet)
- slice_deployment.delete()
diff --git a/xos/synchronizers/ec2/deleters/user_deleter.py b/xos/synchronizers/ec2/deleters/user_deleter.py
deleted file mode 100644
index 12c8224..0000000
--- a/xos/synchronizers/ec2/deleters/user_deleter.py
+++ /dev/null
@@ -1,13 +0,0 @@
-from core.models import User, UserDeployments
-from synchronizers.base.deleter import Deleter
-from synchronizers.base.deleters.user_deployment_deleter import UserDeploymentsDeleter
-
-class UserDeleter(Deleter):
- model='User'
-
- def call(self, pk):
- user = User.objects.get(pk=pk)
- user_deployment_deleter = UserDeploymentsDeleter()
- for user_deployment in UserDeployments.objects.filter(user=user):
- user_deployment_deleter(user_deployment.id)
- user.delete()
diff --git a/xos/synchronizers/ec2/deleters/user_deployment_deleter.py b/xos/synchronizers/ec2/deleters/user_deployment_deleter.py
deleted file mode 100644
index 3b6113b..0000000
--- a/xos/synchronizers/ec2/deleters/user_deployment_deleter.py
+++ /dev/null
@@ -1,12 +0,0 @@
-from core.models import User, UserDeployments
-from synchronizers.base.deleter import Deleter
-
-class UserDeploymentsDeleter(Deleter):
- model='UserDeployments'
-
- def call(self, pk):
- user_deployment = UserDeployments.objects.get(pk=pk)
- if user_deployment.user.kuser_id:
- driver = self.driver.admin_driver(deployment=user_deployment.deployment.name)
- driver.delete_user(user_deployment.user.kuser_id)
- user_deployment.delete()
diff --git a/xos/synchronizers/ec2/dmdot b/xos/synchronizers/ec2/dmdot
deleted file mode 100644
index 9b2b587..0000000
--- a/xos/synchronizers/ec2/dmdot
+++ /dev/null
@@ -1,49 +0,0 @@
-#!/usr/bin/python
-
-import os
-import pdb
-import sys
-import json
-
-sys.path.append('.')
-
-os.environ.setdefault("DJANGO_SETTINGS_MODULE", "xos.settings")
-
-from django.db.models.fields.related import ForeignKey
-from core.models import *
-
-try:
- output = sys.args[1]
-except:
- output = '-json'
-
-g = globals()
-model_classes = []
-class_names = []
-for c in g.values():
- if type(c)==type(XOSBase):
- model_classes.append(c)
- class_names.append(c.__name__)
-
-
-if (output=='-dot'):
- print "digraph plstack {";
- for c in model_classes:
- fields = c._meta.fields
- for f in fields:
- if type(f)==ForeignKey and f.name.title() in class_names:
- print '\t"%s"->"%s";'%(c.__name__,f.name.title())
- print "}\n";
-elif (output=='-json'):
- d = {}
- for c in model_classes:
- fields = c._meta.fields
- for f in fields:
- if type(f)==ForeignKey and f.name.title() in class_names:
- try:
- d[c.__name__].append(f.name.title())
- except KeyError:
- d[c.__name__]=[f.name.title()]
- print json.dumps(d,indent=4)
-
-
diff --git a/xos/synchronizers/ec2/ec2_backend.py b/xos/synchronizers/ec2/ec2_backend.py
deleted file mode 100644
index 7edf63c..0000000
--- a/xos/synchronizers/ec2/ec2_backend.py
+++ /dev/null
@@ -1,24 +0,0 @@
-import threading
-import time
-from ec2_observer.event_loop import XOSObserver
-from ec2_observer.event_manager import EventListener
-from xos.logger import Logger, logging
-
-logger = Logger(level=logging.INFO)
-
-class Backend:
-
- def run(self):
- try:
- # start the openstack observer
- observer = XOSObserver()
- observer_thread = threading.Thread(target=observer.run)
- observer_thread.start()
-
- # start event listene
- event_manager = EventListener(wake_up=observer.wake_up)
- event_manager_thread = threading.Thread(target=event_manager.run)
- event_manager_thread.start()
- except:
- logger.log_exc("Exception in child thread")
-
diff --git a/xos/synchronizers/ec2/error_mapper.py b/xos/synchronizers/ec2/error_mapper.py
deleted file mode 100644
index 9eb878d..0000000
--- a/xos/synchronizers/ec2/error_mapper.py
+++ /dev/null
@@ -1,25 +0,0 @@
-from xos.config import Config
-from xos.logger import Logger, logging, logger
-
-class ErrorMapper:
- def __init__(self, error_map_file):
- self.error_map = {}
- try:
- error_map_lines = open(error_map_file).read().splitlines()
- for l in error_map_lines:
- if (not l.startswith('#')):
- splits = l.split('->')
- k,v = map(lambda i:i.rstrip(),splits)
- self.error_map[k]=v
- except:
- logging.info('Could not read error map')
-
-
- def map(self, error):
- return self.error_map[error]
-
-
-
-
-
-
diff --git a/xos/synchronizers/ec2/event_loop.py b/xos/synchronizers/ec2/event_loop.py
deleted file mode 100644
index cfeb212..0000000
--- a/xos/synchronizers/ec2/event_loop.py
+++ /dev/null
@@ -1,393 +0,0 @@
-import os
-import imp
-import inspect
-import time
-import traceback
-import commands
-import threading
-import json
-import pdb
-
-from datetime import datetime
-from collections import defaultdict
-from core.models import *
-from django.db.models import F, Q
-#from openstack.manager import OpenStackManager
-from openstack.driver import OpenStackDriver
-from xos.logger import Logger, logging, logger
-#from timeout import timeout
-from xos.config import Config, XOS_DIR
-from synchronizers.base.steps import *
-from syncstep import SyncStep
-from toposort import toposort
-from synchronizers.base.error_mapper import *
-
-debug_mode = False
-
-logger = Logger(level=logging.INFO)
-
-class StepNotReady(Exception):
- pass
-
-class NoOpDriver:
- def __init__(self):
- self.enabled = True
- self.dependency_graph = None
-
-STEP_STATUS_WORKING=1
-STEP_STATUS_OK=2
-STEP_STATUS_KO=3
-
-def invert_graph(g):
- ig = {}
- for k,v in g.items():
- for v0 in v:
- try:
- ig[v0].append(k)
- except:
- ig=[k]
- return ig
-
-class XOSObserver:
- #sync_steps = [SyncNetworks,SyncNetworkInstances,SyncSites,SyncSitePrivilege,SyncSlices,SyncSliceMemberships,SyncInstances,SyncInstanceIps,SyncExternalRoutes,SyncUsers,SyncRoles,SyncNodes,SyncImages,GarbageCollector]
- sync_steps = []
-
-
- def __init__(self):
- # The Condition object that gets signalled by Feefie events
- self.step_lookup = {}
- self.load_sync_step_modules()
- self.load_sync_steps()
- self.event_cond = threading.Condition()
-
- self.driver_kind = getattr(Config(), "observer_driver", "openstack")
- if self.driver_kind=="openstack":
- self.driver = OpenStackDriver()
- else:
- self.driver = NoOpDriver()
-
- def wait_for_event(self, timeout):
- self.event_cond.acquire()
- self.event_cond.wait(timeout)
- self.event_cond.release()
-
- def wake_up(self):
- logger.info('Wake up routine called. Event cond %r'%self.event_cond)
- self.event_cond.acquire()
- self.event_cond.notify()
- self.event_cond.release()
-
- def load_sync_step_modules(self, step_dir=None):
- if step_dir is None:
- if hasattr(Config(), "observer_steps_dir"):
- step_dir = Config().observer_steps_dir
- else:
- step_dir = XOS_DIR + "/observer/steps"
-
- for fn in os.listdir(step_dir):
- pathname = os.path.join(step_dir,fn)
- if os.path.isfile(pathname) and fn.endswith(".py") and (fn!="__init__.py"):
- module = imp.load_source(fn[:-3],pathname)
- for classname in dir(module):
- c = getattr(module, classname, None)
-
- # make sure 'c' is a descendent of SyncStep and has a
- # provides field (this eliminates the abstract base classes
- # since they don't have a provides)
-
- if inspect.isclass(c) and issubclass(c, SyncStep) and hasattr(c,"provides") and (c not in self.sync_steps):
- self.sync_steps.append(c)
- logger.info('loaded sync steps: %s' % ",".join([x.__name__ for x in self.sync_steps]))
- # print 'loaded sync steps: %s' % ",".join([x.__name__ for x in self.sync_steps])
-
- def load_sync_steps(self):
- dep_path = Config().observer_dependency_graph
- logger.info('Loading model dependency graph from %s' % dep_path)
- try:
- # This contains dependencies between records, not sync steps
- self.model_dependency_graph = json.loads(open(dep_path).read())
- except Exception,e:
- raise e
-
- try:
- backend_path = Config().observer_pl_dependency_graph
- logger.info('Loading backend dependency graph from %s' % backend_path)
- # This contains dependencies between backend records
- self.backend_dependency_graph = json.loads(open(backend_path).read())
- except Exception,e:
- logger.info('Backend dependency graph not loaded')
- # We can work without a backend graph
- self.backend_dependency_graph = {}
-
- provides_dict = {}
- for s in self.sync_steps:
- self.step_lookup[s.__name__] = s
- for m in s.provides:
- try:
- provides_dict[m.__name__].append(s.__name__)
- except KeyError:
- provides_dict[m.__name__]=[s.__name__]
-
- step_graph = {}
- for k,v in self.model_dependency_graph.iteritems():
- try:
- for source in provides_dict[k]:
- for m in v:
- try:
- for dest in provides_dict[m]:
- # no deps, pass
- try:
- if (dest not in step_graph[source]):
- step_graph[source].append(dest)
- except:
- step_graph[source]=[dest]
- except KeyError:
- pass
-
- except KeyError:
- pass
- # no dependencies, pass
-
- #import pdb
- #pdb.set_trace()
- if (self.backend_dependency_graph):
- backend_dict = {}
- for s in self.sync_steps:
- for m in s.serves:
- backend_dict[m]=s.__name__
-
- for k,v in backend_dependency_graph.iteritems():
- try:
- source = backend_dict[k]
- for m in v:
- try:
- dest = backend_dict[m]
- except KeyError:
- # no deps, pass
- pass
- step_graph[source]=dest
-
- except KeyError:
- pass
- # no dependencies, pass
-
- self.dependency_graph = step_graph
- self.deletion_dependency_graph = invert_graph(step_graph)
-
- self.ordered_steps = toposort(self.dependency_graph, map(lambda s:s.__name__,self.sync_steps))
- print "Order of steps=",self.ordered_steps
- self.load_run_times()
-
-
- def check_duration(self, step, duration):
- try:
- if (duration > step.deadline):
- logger.info('Sync step %s missed deadline, took %.2f seconds'%(step.name,duration))
- except AttributeError:
- # S doesn't have a deadline
- pass
-
- def update_run_time(self, step, deletion):
- if (not deletion):
- self.last_run_times[step.__name__]=time.time()
- else:
- self.last_deletion_run_times[step.__name__]=time.time()
-
-
- def check_schedule(self, step, deletion):
- last_run_times = self.last_run_times if not deletion else self.last_deletion_run_times
-
- time_since_last_run = time.time() - last_run_times.get(step.__name__, 0)
- try:
- if (time_since_last_run < step.requested_interval):
- raise StepNotReady
- except AttributeError:
- logger.info('Step %s does not have requested_interval set'%step.__name__)
- raise StepNotReady
-
- def load_run_times(self):
- try:
- jrun_times = open('/tmp/observer_run_times').read()
- self.last_run_times = json.loads(jrun_times)
- except:
- self.last_run_times={}
- for e in self.ordered_steps:
- self.last_run_times[e]=0
- try:
- jrun_times = open('/tmp/observer_deletion_run_times').read()
- self.last_deletion_run_times = json.loads(jrun_times)
- except:
- self.last_deletion_run_times={}
- for e in self.ordered_steps:
- self.last_deletion_run_times[e]=0
-
-
- def save_run_times(self):
- run_times = json.dumps(self.last_run_times)
- open('/tmp/observer_run_times','w').write(run_times)
-
- deletion_run_times = json.dumps(self.last_deletion_run_times)
- open('/tmp/observer_deletion_run_times','w').write(deletion_run_times)
-
- def check_class_dependency(self, step, failed_steps):
- step.dependenices = []
- for obj in step.provides:
- step.dependenices.extend(self.model_dependency_graph.get(obj.__name__, []))
- for failed_step in failed_steps:
- if (failed_step in step.dependencies):
- raise StepNotReady
-
- def sync(self, S, deletion):
- step = self.step_lookup[S]
- start_time=time.time()
-
- dependency_graph = self.dependency_graph if not deletion else self.deletion_dependency_graph
-
- # Wait for step dependencies to be met
- try:
- deps = self.dependency_graph[S]
- has_deps = True
- except KeyError:
- has_deps = False
-
- if (has_deps):
- for d in deps:
- cond = self.step_conditions[d]
- cond.acquire()
- if (self.step_status[d] is STEP_STATUS_WORKING):
- cond.wait()
- cond.release()
- go = self.step_status[d] == STEP_STATUS_OK
- else:
- go = True
-
- if (not go):
- self.failed_steps.append(sync_step)
- my_status = STEP_STATUS_KO
- else:
- sync_step = step(driver=self.driver,error_map=self.error_mapper)
- sync_step.__name__ = step.__name__
- sync_step.dependencies = []
- try:
- mlist = sync_step.provides
-
- for m in mlist:
- sync_step.dependencies.extend(self.model_dependency_graph[m.__name__])
- except KeyError:
- pass
- sync_step.debug_mode = debug_mode
-
- should_run = False
- try:
- # Various checks that decide whether
- # this step runs or not
- self.check_class_dependency(sync_step, self.failed_steps) # dont run Slices if Sites failed
- self.check_schedule(sync_step, deletion) # dont run sync_network_routes if time since last run < 1 hour
- should_run = True
- except StepNotReady:
- logging.info('Step not ready: %s'%sync_step.__name__)
- self.failed_steps.append(sync_step)
- my_status = STEP_STATUS_KO
- except Exception,e:
- logging.error('%r',e)
- logger.log_exc("sync step failed: %r. Deletion: %r"%(sync_step,deletion))
- self.failed_steps.append(sync_step)
- my_status = STEP_STATUS_KO
-
- if (should_run):
- try:
- duration=time.time() - start_time
-
- logger.info('Executing step %s' % sync_step.__name__)
-
- failed_objects = sync_step(failed=list(self.failed_step_objects), deletion=deletion)
-
- self.check_duration(sync_step, duration)
-
- if failed_objects:
- self.failed_step_objects.update(failed_objects)
-
- my_status = STEP_STATUS_OK
- self.update_run_time(sync_step,deletion)
- except Exception,e:
- logging.error('Model step failed. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!',e)
- logger.log_exc(e)
- self.failed_steps.append(S)
- my_status = STEP_STATUS_KO
- else:
- my_status = STEP_STATUS_OK
-
- try:
- my_cond = self.step_conditions[S]
- my_cond.acquire()
- self.step_status[S]=my_status
- my_cond.notify_all()
- my_cond.release()
- except KeyError,e:
- logging.info('Step %r is a leaf')
- pass
-
- def run(self):
- if not self.driver.enabled:
- return
-
- if (self.driver_kind=="openstack") and (not self.driver.has_openstack):
- return
-
- while True:
- try:
- error_map_file = getattr(Config(), "error_map_path", XOS_DIR + "/error_map.txt")
- self.error_mapper = ErrorMapper(error_map_file)
-
- # Set of whole steps that failed
- self.failed_steps = []
-
- # Set of individual objects within steps that failed
- self.failed_step_objects = set()
-
- # Set up conditions and step status
- # This is needed for steps to run in parallel
- # while obeying dependencies.
-
- providers = set()
- for v in self.dependency_graph.values():
- if (v):
- providers.update(v)
-
- self.step_conditions = {}
- self.step_status = {}
- for p in list(providers):
- self.step_conditions[p] = threading.Condition()
- self.step_status[p] = STEP_STATUS_WORKING
-
-
- logger.info('Waiting for event')
- tBeforeWait = time.time()
- self.wait_for_event(timeout=30)
- logger.info('Observer woke up')
-
- # Two passes. One for sync, the other for deletion.
- for deletion in [False,True]:
- threads = []
- logger.info('Deletion=%r...'%deletion)
- schedule = self.ordered_steps if not deletion else reversed(self.ordered_steps)
-
- for S in schedule:
- thread = threading.Thread(target=self.sync, args=(S, deletion))
-
- logger.info('Deletion=%r...'%deletion)
- threads.append(thread)
-
- # Start threads
- for t in threads:
- t.start()
-
- # Wait for all threads to finish before continuing with the run loop
- for t in threads:
- t.join()
-
- self.save_run_times()
- except Exception, e:
- logging.error('Core error. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!',e)
- logger.log_exc("Exception in observer run loop")
- traceback.print_exc()
diff --git a/xos/synchronizers/ec2/event_manager.py b/xos/synchronizers/ec2/event_manager.py
deleted file mode 100644
index fc07f64..0000000
--- a/xos/synchronizers/ec2/event_manager.py
+++ /dev/null
@@ -1,112 +0,0 @@
-import threading
-import requests, json
-
-from xos.config import Config, XOS_DIR
-
-import uuid
-import os
-import imp
-import inspect
-import base64
-from fofum import Fofum
-import json
-import traceback
-
-random_client_id=None
-def get_random_client_id():
- global random_client_id
-
- if (random_client_id is None) and os.path.exists(XOS_DIR + "/random_client_id"):
- # try to use the last one we used, if we saved it
- try:
- random_client_id = open(XOS_DIR+"/random_client_id","r").readline().strip()
- print "get_random_client_id: loaded %s" % random_client_id
- except:
- print "get_random_client_id: failed to read " + XOS_DIR + "/random_client_id"
-
- if random_client_id is None:
- random_client_id = base64.urlsafe_b64encode(os.urandom(12))
- print "get_random_client_id: generated new id %s" % random_client_id
-
- # try to save it for later (XXX: could race with another client here)
- try:
- open(XOS_DIR + "/random_client_id","w").write("%s\n" % random_client_id)
- except:
- print "get_random_client_id: failed to write " + XOS_DIR + "/random_client_id"
-
- return random_client_id
-
-# decorator that marks dispatachable event methods
-def event(func):
- setattr(func, 'event', func.__name__)
- return func
-
-class EventHandler:
- # This code is currently not in use.
- def __init__(self):
- pass
-
- @staticmethod
- def get_events():
- events = []
- for name in dir(EventHandler):
- attribute = getattr(EventHandler, name)
- if hasattr(attribute, 'event'):
- events.append(getattr(attribute, 'event'))
- return events
-
- def dispatch(self, event, *args, **kwds):
- if hasattr(self, event):
- return getattr(self, event)(*args, **kwds)
-
-
-class EventSender:
- def __init__(self,user=None,clientid=None):
- try:
- user = Config().feefie_client_user
- except:
- user = 'pl'
-
- try:
- clid = Config().feefie_client_id
- except:
- clid = get_random_client_id()
- print "EventSender: no feefie_client_id configured. Using random id %s" % clid
-
- self.fofum = Fofum(user=user)
- self.fofum.make(clid)
-
- def fire(self,**kwargs):
- kwargs["uuid"] = str(uuid.uuid1())
- self.fofum.fire(json.dumps(kwargs))
-
-class EventListener:
- def __init__(self,wake_up=None):
- self.handler = EventHandler()
- self.wake_up = wake_up
-
- def handle_event(self, payload):
- payload_dict = json.loads(payload)
-
- if (self.wake_up):
- self.wake_up()
-
- def run(self):
- # This is our unique client id, to be used when firing and receiving events
- # It needs to be generated once and placed in the config file
-
- try:
- user = Config().feefie_client_user
- except:
- user = 'pl'
-
- try:
- clid = Config().feefie_client_id
- except:
- clid = get_random_client_id()
- print "EventListener: no feefie_client_id configured. Using random id %s" % clid
-
- f = Fofum(user=user)
-
- listener_thread = threading.Thread(target=f.listen_for_event,args=(clid,self.handle_event))
- listener_thread.start()
diff --git a/xos/synchronizers/ec2/steps/__init__.py b/xos/synchronizers/ec2/steps/__init__.py
deleted file mode 100644
index de7a1fd..0000000
--- a/xos/synchronizers/ec2/steps/__init__.py
+++ /dev/null
@@ -1,15 +0,0 @@
-#from .sync_external_routes import SyncExternalRoutes
-#from .sync_network_instances import SyncNetworkInstances
-#from .sync_networks import SyncNetworks
-#from .sync_network_deployments import SyncNetworkDeployments
-#from .sync_site_privileges import SyncSitePrivilege
-#from .sync_slice_memberships import SyncSliceMemberships
-#from .sync_slices import SyncSlices
-#from .sync_instance_ips import SyncInstanceIps
-#from .sync_instances import SyncInstances
-#from .sync_users import SyncUsers
-#from .sync_roles import SyncRoles
-#from .sync_nodes import SyncNodes
-#from .sync_images import SyncImages
-#from .garbage_collector import GarbageCollector
-
diff --git a/xos/synchronizers/ec2/steps/sync_deployments.py b/xos/synchronizers/ec2/steps/sync_deployments.py
deleted file mode 100644
index 097a6e9..0000000
--- a/xos/synchronizers/ec2/steps/sync_deployments.py
+++ /dev/null
@@ -1,25 +0,0 @@
-import os
-import base64
-from django.db.models import F, Q
-from xos.config import Config
-from ec2_observer.syncstep import SyncStep
-from core.models.site import *
-
-class SyncDeployments(SyncStep):
- requested_interval=86400
- provides=[Deployment]
-
- def fetch_pending(self,deletion):
- if (deletion):
- return []
-
- deployments = Deployment.objects.filter(Q(name="Amazon EC2"))
- if (not deployments):
- deployments = [Deployment(name="Amazon EC2")]
- else:
- deployments = []
-
- return deployments
-
- def sync_record(self, deployment):
- deployment.save()
diff --git a/xos/synchronizers/ec2/steps/sync_images.py b/xos/synchronizers/ec2/steps/sync_images.py
deleted file mode 100644
index c3dc5a1..0000000
--- a/xos/synchronizers/ec2/steps/sync_images.py
+++ /dev/null
@@ -1,64 +0,0 @@
-import os
-import base64
-from django.db.models import F, Q
-from xos.config import Config, XOS_DIR
-from ec2_observer.syncstep import SyncStep
-from core.models.image import Image
-from ec2_observer.awslib import *
-
-
-class SyncImages(SyncStep):
- provides=[Image]
- requested_interval=3600
-
- def fetch_pending(self,deletion):
- if (deletion):
- return []
-
- images = Image.objects.all()
- image_names = [image.name for image in images]
-
- new_images = []
-
- try:
- aws_images = json.loads(open(XOS_DIR + '/aws-images').read())
- except:
- aws_images = aws_run('ec2 describe-images --owner 099720109477')
- open(XOS_DIR + '/aws-images','w').write(json.dumps(aws_images))
-
-
-
- aws_images=aws_images['Images']
- aws_images=filter(lambda x:x['ImageType']=='machine',aws_images)[:50]
-
- names = set([])
- for aws_image in aws_images:
- desc_ok = True
-
- try:
- desc = aws_image['Description']
- except:
- try:
- desc = aws_image['ImageLocation']
- except:
- desc_ok = False
-
- if (desc_ok):
- try:
- desc_ok = desc and desc not in names and desc not in image_names and '14.04' in desc
- except KeyError:
- desc_ok = False
-
- if desc_ok and aws_image['ImageType']=='machine':
- image = Image(disk_format=aws_image['ImageLocation'],
- name=desc,
- container_format=aws_image['Hypervisor'],
- path=aws_image['ImageId'])
- new_images.append(image)
- names.add(image.name)
-
- #print new_images
- return new_images
-
- def sync_record(self, image):
- image.save()
diff --git a/xos/synchronizers/ec2/steps/sync_instances.py b/xos/synchronizers/ec2/steps/sync_instances.py
deleted file mode 100644
index efab74d..0000000
--- a/xos/synchronizers/ec2/steps/sync_instances.py
+++ /dev/null
@@ -1,117 +0,0 @@
-import os
-import json
-import base64
-from django.db.models import F, Q
-from xos.config import Config
-from ec2_observer.syncstep import SyncStep
-from core.models.instance import Instance
-from core.models.slice import SlicePrivilege, SliceDeployments
-from core.models.network import Network, NetworkSlice, NetworkDeployments
-from xos.logger import Logger, logging
-from ec2_observer.awslib import *
-from core.models.site import *
-from core.models.slice import *
-from ec2_observer.creds import *
-import pdb
-
-logger = Logger(level=logging.INFO)
-
-class SyncInstances(SyncStep):
- provides=[Instance]
- requested_interval=0
-
- def fetch_pending(self, deletion):
- if deletion:
- object_source = Instance.deleted_objects
- else:
- object_source = Instance.objects
-
- all_instances = object_source.filter(Q(enacted__lt=F('updated')) | Q(enacted=None))
- my_instances = []
-
- for instance in all_instances:
- sd = SliceDeployments.objects.filter(Q(slice=instance.slice))
- if (sd):
- if (sd.deployment.name=='Amazon EC2'):
- my_instances.append(instance)
- if (instance.node.deployment.name=='Amazon EC2'):
- my_instances.append(instance)
- return my_instances
-
- def delete_record(self, instance):
- user = instance.creator
- e = get_creds(user=user, site=user.site)
- result = aws_run('ec2 terminate-instances --instance-ids=%s'%instance.instance_id, env=e)
-
- def sync_record(self, instance):
- logger.info("sync'ing instance:%s deployment:%s " % (instance, instance.node.deployment),extra=instance.tologdict())
-
- if not instance.instance_id:
- # public keys
- slice_memberships = SlicePrivilege.objects.filter(slice=instance.slice)
- pubkeys = [sm.user.public_key for sm in slice_memberships if sm.user.public_key]
-
- if instance.creator.public_key:
- pubkeys.append(instance.creator.public_key)
-
- if instance.slice.creator.public_key:
- pubkeys.append(instance.slice.creator.public_key)
-
- # netowrks
- # include all networks available to the slice and/or associated network templates
- #nics = []
- #networks = [ns.network for ns in NetworkSlice.objects.filter(slice=instance.slice)]
- #network_deployments = NetworkDeployments.objects.filter(network__in=networks,
- #deployment=instance.node.deployment)
- # Gather private networks first. This includes networks with a template that has
- # visibility = private and translation = none
- #for network_deployment in network_deployments:
- # if network_deployment.network.template.visibility == 'private' and \
- # network_deployment.network.template.translation == 'none':
- # nics.append({'net-id': network_deployment.net_id})
-
- # now include network template
- #network_templates = [network.template.sharedNetworkName for network in networks \
- # if network.template.sharedNetworkName]
- #for net in driver.shell.quantum.list_networks()['networks']:
- # if net['name'] in network_templates:
- # nics.append({'net-id': net['id']})
- # look up image id
-
- instance_type = instance.node.name.rsplit('.',1)[0]
-
- # Bail out of we don't have a key
- key_name = instance.creator.email.lower().replace('@', 'AT').replace('.', '')
- u = instance.creator
- s = u.site
- e = get_creds(user=u, site=s)
- key_sig = aws_run('ec2 describe-key-pairs', env=e)
- ec2_keys = key_sig['KeyPairs']
- key_found = False
- for key in ec2_keys:
- if (key['KeyName']==key_name):
- key_found = True
- break
-
- if (not key_found):
- # set backend_status
- raise Exception('Will not sync instance without key')
-
- image_id = instance.image.path
- instance_sig = aws_run('ec2 run-instances --image-id %s --instance-type %s --count 1 --key-name %s --placement AvailabilityZone=%s'%(image_id,instance_type,key_name,instance.node.site.name), env=e)
- instance.instance_id = instance_sig['Instances'][0]['InstanceId']
- instance.save()
- state = instance_sig['Instances'][0]['State']['Code']
- if (state==16):
- instance.ip = instance_sig['Instances'][0]['PublicIpAddress']
- instance.save()
- else:
- # This status message should go into backend_status
- raise Exception('Waiting for instance to start')
- else:
- ret = aws_run('ec2 describe-instances --instance-ids %s'%instance.instance_id, env=e)
- state = ret['Reservations'][0]['Instances'][0]['State']['Code']
- if (state==16):
- instance.ip = ret['Reservations'][0]['Instances'][0]['PublicIpAddress']
- instance.save()
-
diff --git a/xos/synchronizers/ec2/steps/sync_site_deployments.py b/xos/synchronizers/ec2/steps/sync_site_deployments.py
deleted file mode 100644
index ef66783..0000000
--- a/xos/synchronizers/ec2/steps/sync_site_deployments.py
+++ /dev/null
@@ -1,50 +0,0 @@
-import os
-import base64
-from django.db.models import F, Q
-from xos.config import Config
-from ec2_observer.syncstep import SyncStep
-from core.models.site import *
-from ec2_observer.awslib import *
-import pdb
-
-class SyncSiteDeployment(SyncStep):
- requested_interval=86400
- provides=[SiteDeployment]
-
- def fetch_pending(self, deletion):
- if (deletion):
- return []
-
- zone_ret = aws_run('ec2 describe-availability-zones')
- zones = zone_ret['AvailabilityZones']
- available_sites = [zone['ZoneName'] for zone in zones]
-
- current_sites = []
- for s in available_sites:
- site = Site.objects.filter(Q(name=s))
- if (site):
- current_sites.append(site[0])
-
- # OK not to catch exception
- # The syncstep should catch it
- # At any rate, we should not run if there are no deployments
- deployment = Deployment.objects.filter(Q(name="Amazon EC2"))[0]
- current_site_deployments = SiteDeployment.objects.filter(Q(deployment=deployment))
- site_dict = {}
-
- for sd in current_site_deployments:
- site_dict[sd.site]=sd
-
- updated_site_deployments = []
- for site in current_sites:
- try:
- site_record = site_dict[site]
- except KeyError:
- sd = SiteDeployment(site=site,deployment=deployment,tenant_id=base64.urlsafe_b64encode(os.urandom(12)))
- updated_site_deployments.append(sd)
-
- return updated_site_deployments
-
-
- def sync_record(self, site_deployment):
- site_deployment.save()
diff --git a/xos/synchronizers/ec2/steps/sync_sites.py b/xos/synchronizers/ec2/steps/sync_sites.py
deleted file mode 100644
index d139cc5..0000000
--- a/xos/synchronizers/ec2/steps/sync_sites.py
+++ /dev/null
@@ -1,43 +0,0 @@
-import os
-import base64
-from django.db.models import F, Q
-from xos.config import Config
-from ec2_observer.syncstep import SyncStep
-from core.models.site import *
-from ec2_observer.awslib import *
-import pdb
-
-class SyncSites(SyncStep):
- provides=[Site]
- requested_interval=3600
-
- def fetch_pending(self, deletion):
- if (deletion):
- return []
-
- deployment = Deployment.objects.filter(Q(name="Amazon EC2"))[0]
- current_site_deployments = SiteDeployment.objects.filter(Q(deployment=deployment))
-
- zone_ret = aws_run('ec2 describe-availability-zones')
- zones = zone_ret['AvailabilityZones']
-
- available_sites = [zone['ZoneName'] for zone in zones]
- site_names = [sd.site.name for sd in current_site_deployments]
-
- new_site_names = list(set(available_sites) - set(site_names))
-
- new_sites = []
- for s in new_site_names:
- site = Site(name=s,
- login_base=s,
- site_url="www.amazon.com",
- enabled=True,
- is_public=True,
- abbreviated_name=s)
- new_sites.append(site)
-
- return new_sites
-
- def sync_record(self, site):
- site.save()
-
diff --git a/xos/synchronizers/ec2/steps/sync_users.py b/xos/synchronizers/ec2/steps/sync_users.py
deleted file mode 100644
index 5fe70c7..0000000
--- a/xos/synchronizers/ec2/steps/sync_users.py
+++ /dev/null
@@ -1,52 +0,0 @@
-import os
-import base64
-import random
-import time
-from datetime import datetime
-from django.db.models import F, Q
-from xos.config import Config
-from ec2_observer.syncstep import SyncStep
-from core.models.user import User
-from core.models.site import *
-from ec2_observer.awslib import *
-from ec2_observer.creds import *
-import pdb
-
-class SyncUsers(SyncStep):
- provides=[User]
- requested_interval=0
-
- def fetch_pending(self, deletion):
- if (deletion):
- return []
-
- users = User.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None))
-
- keys = []
- creds = []
- for u in users:
- e = get_creds(user=u, site=u.site)
- key_sig = aws_run('ec2 describe-key-pairs', env=e)
- ec2_keys = key_sig['KeyPairs']
- creds.append(e)
- keys.append(ec2_keys)
- else:
- ec2_keys = []
-
- for user,ec2_keys,e in zip(users,keys,creds):
- if (user.public_key):
- key_name = user.email.lower().replace('@', 'AT').replace('.', '')
- key_found = False
-
- for key in ec2_keys:
- if (key['KeyName']==key_name):
- key_found = True
- break
-
- if (not key_found):
- aws_run('ec2 import-key-pair --key-name %s --public-key-material "%s"'%(key_name, user.public_key),env=e)
-
- return users
-
- def sync_record(self, node):
- node.save()
diff --git a/xos/synchronizers/ec2/syncstep.py b/xos/synchronizers/ec2/syncstep.py
deleted file mode 100644
index 3a31cb6..0000000
--- a/xos/synchronizers/ec2/syncstep.py
+++ /dev/null
@@ -1,101 +0,0 @@
-import os
-import base64
-from datetime import datetime
-
-from django.db.models import F, Q
-
-from xos.config import Config
-from xos.logger import Logger, logging
-from synchronizers.base.steps import *
-
-logger = Logger(level=logging.INFO)
-
-class FailedDependency(Exception):
- pass
-
-class SyncStep:
- """ A XOS Sync step.
-
- Attributes:
- psmodel Model name the step synchronizes
- dependencies list of names of models that must be synchronized first if the current model depends on them
- """
- slow=False
- def get_prop(self, prop):
- try:
- sync_config_dir = Config().sync_config_dir
- except:
- sync_config_dir = '/etc/xos/sync' # XXX TODO: update path
- prop_config_path = '/'.join(sync_config_dir,self.name,prop)
- return open(prop_config_path).read().rstrip()
-
- def __init__(self, **args):
- """Initialize a sync step
- Keyword arguments:
- name -- Name of the step
- provides -- XOS models sync'd by this step
- """
- dependencies = []
- self.driver = args.get('driver')
- self.error_map = args.get('error_map')
-
- try:
- self.soft_deadline = int(self.get_prop('soft_deadline_seconds'))
- except:
- self.soft_deadline = 5 # 5 seconds
-
- return
-
- def fetch_pending(self, deletion=False):
- # This is the most common implementation of fetch_pending
- # Steps should override it if they have their own logic
- # for figuring out what objects are outstanding.
- main_obj = self.provides[0]
- if (not deletion):
- objs = main_obj.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None))
- else:
- objs = main_obj.deleted_objects.all()
-
- return objs
- #return Instance.objects.filter(ip=None)
-
- def check_dependencies(self, obj, failed):
- for dep in self.dependencies:
- peer_name = dep[0].lower() + dep[1:] # django names are camelCased with the first letter lower
- peer_object = getattr(obj, peer_name)
-
- # peer_object can be None, and if so there
- # is no object-level dependency
- if (peer_object and peer_object.pk==failed.pk):
- raise FailedDependency
-
- def call(self, failed=[], deletion=False):
- pending = self.fetch_pending(deletion)
- for o in pending:
- try:
- for f in failed:
- self.check_dependencies(o,f) # Raises exception if failed
- if (deletion):
- self.delete_record(o)
- o.delete(purge=True)
- else:
- self.sync_record(o)
- o.enacted = datetime.now() # Is this the same timezone? XXX
- o.backend_status = "OK"
- o.save(update_fields=['enacted'])
- except Exception,e:
- try:
- o.backend_status = self.error_map.map(str(e))
- except:
- o.backend_status = str(e)
-
- if (o.pk):
- o.save(update_fields=['backend_status'])
-
- logger.log_exc("sync step failed!",extra=o.tologdict())
- failed.append(o)
-
- return failed
-
- def __call__(self, **args):
- return self.call(**args)
diff --git a/xos/synchronizers/ec2/toposort.py b/xos/synchronizers/ec2/toposort.py
deleted file mode 100644
index c0ec779..0000000
--- a/xos/synchronizers/ec2/toposort.py
+++ /dev/null
@@ -1,99 +0,0 @@
-#!/usr/bin/env python
-
-import time
-import traceback
-import commands
-import threading
-import json
-import pdb
-
-from datetime import datetime
-from collections import defaultdict
-
-# Assumes that there are no empty dependencies
-# in the graph. E.g. Foo -> []
-def dfs(graph, visit):
- nodes = graph.keys()
- edge_nodes = set()
-
- for n in nodes:
- edge_nodes|=set(graph[n])
-
- sinks = list(edge_nodes - set(nodes))
- sources = list(set(nodes) - edge_nodes)
-
- nodes.extend(sinks)
-
- visited = set(sources)
- stack = sources
- while stack:
- current = stack.pop()
- visit(current)
- for node in graph[current]:
- if node not in visited:
- stack.append(node)
- visited.add(node)
-
- return sources
-
-# Topological sort
-# Notes:
-# - Uses a stack instead of recursion
-# - Forfeits optimization involving tracking currently visited nodes
-def toposort(g, steps=None):
- # Get set of all nodes, including those without outgoing edges
- keys = set(g.keys())
- values = set({})
- for v in g.values():
- values=values | set(v)
-
- all_nodes=list(keys|values)
- if (not steps):
- steps = all_nodes
-
- # Final order
- order = []
-
- # DFS stack, not using recursion
- stack = []
-
- # Unmarked set
- unmarked = all_nodes
-
- # visiting = [] - skip, don't expect 1000s of nodes, |E|/|V| is small
-
- while unmarked:
- stack.insert(0,unmarked[0]) # push first unmarked
-
- while (stack):
- n = stack[0]
- add = True
- try:
- for m in g[n]:
- if (m in unmarked):
- if (m not in stack):
- add = False
- stack.insert(0,m)
- else:
- # Should not happen, if so there's a loop
- print 'Loop at %s'%m
- except KeyError:
- pass
- if (add):
- if (n in steps):
- order.append(n)
- item = stack.pop(0)
- unmarked.remove(item)
-
- noorder = list(set(steps) - set(order))
- return order + noorder
-
-def main():
- graph_file=open('xos.deps').read()
- g = json.loads(graph_file)
- print toposort(g)
-
-if (__name__=='__main__'):
- main()
-
-#print toposort({'a':'b','b':'c','c':'d','d':'c'},['d','c','b','a'])
diff --git a/xos/synchronizers/ec2/xos.deps b/xos/synchronizers/ec2/xos.deps
deleted file mode 100644
index 6abf765..0000000
--- a/xos/synchronizers/ec2/xos.deps
+++ /dev/null
@@ -1,47 +0,0 @@
-{
- "Node": [
- "Site",
- "Deployment"
- ],
- "Slice": [
- "Site"
- ],
- "ReservedResource": [
- "Instance"
- ],
- "SliceMembership": [
- "User",
- "Slice",
- "Role"
- ],
- "NetworkSlice": [
- "Network",
- "Slice"
- ],
- "Tag": [
- "Project"
- ],
- "User": [
- "Site"
- ],
- "SliceTag": [
- "Slice"
- ],
- "Reservation": [
- "Slice"
- ],
- "NetworkInstance": [
- "Network",
- "Instance"
- ],
- "SitePrivilege": [
- "User",
- "Site",
- "Role"
- ],
- "Instance": [
- "Image",
- "Slice",
- "Node"
- ]
-}
diff --git a/xos/synchronizers/exampleservice_old/exampleservice-synchronizer.py b/xos/synchronizers/exampleservice_old/exampleservice-synchronizer.py
deleted file mode 100644
index 90d2c98..0000000
--- a/xos/synchronizers/exampleservice_old/exampleservice-synchronizer.py
+++ /dev/null
@@ -1,14 +0,0 @@
-#!/usr/bin/env python
-
-# Runs the standard XOS synchronizer
-
-import importlib
-import os
-import sys
-
-synchronizer_path = os.path.join(os.path.dirname(
- os.path.realpath(__file__)), "../../synchronizers/base")
-sys.path.append(synchronizer_path)
-mod = importlib.import_module("xos-synchronizer")
-mod.main()
-
diff --git a/xos/synchronizers/exampleservice_old/exampleservice_config b/xos/synchronizers/exampleservice_old/exampleservice_config
deleted file mode 100644
index 7e59fdd..0000000
--- a/xos/synchronizers/exampleservice_old/exampleservice_config
+++ /dev/null
@@ -1,24 +0,0 @@
-# Required by XOS
-[db]
-name=xos
-user=postgres
-password=password
-host=localhost
-port=5432
-
-# Required by XOS
-[api]
-nova_enabled=True
-
-# Sets options for the synchronizer
-[observer]
-name=exampleservice
-dependency_graph=/opt/xos/synchronizers/exampleservice/model-deps
-steps_dir=/opt/xos/synchronizers/exampleservice/steps
-sys_dir=/opt/xos/synchronizers/exampleservice/sys
-logfile=/var/log/xos_backend.log
-pretend=False
-backoff_disabled=True
-save_ansible_output=True
-proxy_ssh=False
-
diff --git a/xos/synchronizers/exampleservice_old/model-deps b/xos/synchronizers/exampleservice_old/model-deps
deleted file mode 100644
index 0967ef4..0000000
--- a/xos/synchronizers/exampleservice_old/model-deps
+++ /dev/null
@@ -1 +0,0 @@
-{}
diff --git a/xos/synchronizers/exampleservice_old/steps/exampletenant_playbook.yaml b/xos/synchronizers/exampleservice_old/steps/exampletenant_playbook.yaml
deleted file mode 100644
index 89e4617..0000000
--- a/xos/synchronizers/exampleservice_old/steps/exampletenant_playbook.yaml
+++ /dev/null
@@ -1,16 +0,0 @@
----
-# exampletenant_playbook
-
-- hosts: "{{ instance_name }}"
- connection: ssh
- user: ubuntu
- sudo: yes
- gather_facts: no
- vars:
- - tenant_message: "{{ tenant_message }}"
- - service_message: "{{ service_message }}"
-
- roles:
- - install_apache
- - create_index
-
diff --git a/xos/synchronizers/exampleservice_old/steps/roles/create_index/tasks/main.yml b/xos/synchronizers/exampleservice_old/steps/roles/create_index/tasks/main.yml
deleted file mode 100644
index 91c6029..0000000
--- a/xos/synchronizers/exampleservice_old/steps/roles/create_index/tasks/main.yml
+++ /dev/null
@@ -1,7 +0,0 @@
----
-
-- name: Write index.html file to apache document root
- template:
- src=index.html.j2
- dest=/var/www/html/index.html
-
diff --git a/xos/synchronizers/exampleservice_old/steps/roles/create_index/templates/index.html.j2 b/xos/synchronizers/exampleservice_old/steps/roles/create_index/templates/index.html.j2
deleted file mode 100644
index 9cec084..0000000
--- a/xos/synchronizers/exampleservice_old/steps/roles/create_index/templates/index.html.j2
+++ /dev/null
@@ -1,4 +0,0 @@
-ExampleService
- Service Message: "{{ service_message }}"
- Tenant Message: "{{ tenant_message }}"
-
diff --git a/xos/synchronizers/exampleservice_old/steps/roles/install_apache/tasks/main.yml b/xos/synchronizers/exampleservice_old/steps/roles/install_apache/tasks/main.yml
deleted file mode 100644
index d9a155c..0000000
--- a/xos/synchronizers/exampleservice_old/steps/roles/install_apache/tasks/main.yml
+++ /dev/null
@@ -1,7 +0,0 @@
----
-
-- name: Install apache using apt
- apt:
- name=apache2
- update_cache=yes
-
diff --git a/xos/synchronizers/exampleservice_old/steps/sync_exampletenant.py b/xos/synchronizers/exampleservice_old/steps/sync_exampletenant.py
deleted file mode 100644
index fbde96f..0000000
--- a/xos/synchronizers/exampleservice_old/steps/sync_exampletenant.py
+++ /dev/null
@@ -1,55 +0,0 @@
-import os
-import sys
-from django.db.models import Q, F
-from services.exampleservice.models import ExampleService, ExampleTenant
-from synchronizers.base.SyncInstanceUsingAnsible import SyncInstanceUsingAnsible
-
-parentdir = os.path.join(os.path.dirname(__file__), "..")
-sys.path.insert(0, parentdir)
-
-class SyncExampleTenant(SyncInstanceUsingAnsible):
-
- provides = [ExampleTenant]
-
- observes = ExampleTenant
-
- requested_interval = 0
-
- template_name = "exampletenant_playbook.yaml"
-
- service_key_name = "/opt/xos/synchronizers/exampleservice/exampleservice_private_key"
-
- def __init__(self, *args, **kwargs):
- super(SyncExampleTenant, self).__init__(*args, **kwargs)
-
- def fetch_pending(self, deleted):
-
- if (not deleted):
- objs = ExampleTenant.get_tenant_objects().filter(
- Q(enacted__lt=F('updated')) | Q(enacted=None), Q(lazy_blocked=False))
- else:
- # If this is a deletion we get all of the deleted tenants..
- objs = ExampleTenant.get_deleted_tenant_objects()
-
- return objs
-
- def get_exampleservice(self, o):
- if not o.provider_service:
- return None
-
- exampleservice = ExampleService.get_service_objects().filter(id=o.provider_service.id)
-
- if not exampleservice:
- return None
-
- return exampleservice[0]
-
- # Gets the attributes that are used by the Ansible template but are not
- # part of the set of default attributes.
- def get_extra_attributes(self, o):
- fields = {}
- fields['tenant_message'] = o.tenant_message
- exampleservice = self.get_exampleservice(o)
- fields['service_message'] = exampleservice.service_message
- return fields
-
diff --git a/xos/synchronizers/new_base/SyncInstanceUsingAnsible.py b/xos/synchronizers/new_base/SyncInstanceUsingAnsible.py
index 342099c..c865f6f 100644
--- a/xos/synchronizers/new_base/SyncInstanceUsingAnsible.py
+++ b/xos/synchronizers/new_base/SyncInstanceUsingAnsible.py
@@ -4,7 +4,7 @@
import sys
import base64
import time
-from xos.config import Config
+from xosconfig import Config
from synchronizers.new_base.syncstep import SyncStep
from synchronizers.new_base.ansible_helper import run_template_ssh
from synchronizers.new_base.modelaccessor import *
@@ -12,6 +12,7 @@
logger = Logger(level=logging.INFO)
+
class SyncInstanceUsingAnsible(SyncStep):
# All of the following should be defined for classes derived from this
# base class. Examples below use VSGTenant.
@@ -33,7 +34,7 @@
def defer_sync(self, o, reason):
# zdw, 2017-02-18 - is raising the exception here necessary? - seems like
# it's just logging the same thing twice
- logger.info("defer object %s due to %s" % (str(o), reason),extra=o.tologdict())
+ logger.info("defer object %s due to %s" % (str(o), reason), extra=o.tologdict())
raise Exception("defer object %s due to %s" % (str(o), reason))
def get_extra_attributes(self, o):
@@ -63,7 +64,7 @@
template_name = self.template_name
tStart = time.time()
run_template_ssh(template_name, fields, object=o)
- logger.info("playbook execution time %d" % int(time.time()-tStart),extra=o.tologdict())
+ logger.info("playbook execution time %d" % int(time.time() - tStart), extra=o.tologdict())
def pre_sync_hook(self, o, fields):
pass
@@ -77,19 +78,20 @@
def prepare_record(self, o):
pass
- def get_node(self,o):
+ def get_node(self, o):
return o.node
def get_node_key(self, node):
- return getattr(Config(), "observer_node_key", "/opt/cord_profile/node_key")
+ # NOTE `node_key` is never defined, does it differ from `proxy_ssh_key`? the value looks to be the same
+ return Config.get("node_key")
def get_key_name(self, instance):
- if instance.isolation=="vm":
- if (instance.slice) and (instance.slice.service) and (instance.slice.service.private_key_fn):
+ if instance.isolation == "vm":
+ if instance.slice and instance.slice.service and instance.slice.service.private_key_fn:
key_name = instance.slice.service.private_key_fn
else:
raise Exception("Make sure to set private_key_fn in the service")
- elif instance.isolation=="container":
+ elif instance.isolation == "container":
node = self.get_node(instance)
key_name = self.get_node_key(node)
else:
@@ -105,24 +107,24 @@
if (instance.isolation == "vm"):
# legacy where container was configured by sync_vcpetenant.py
- fields = { "instance_name": instance.name,
- "hostname": instance.node.name,
- "instance_id": instance.instance_id,
- "username": "ubuntu",
- "ssh_ip": instance.get_ssh_ip(),
- }
+ fields = {"instance_name": instance.name,
+ "hostname": instance.node.name,
+ "instance_id": instance.instance_id,
+ "username": "ubuntu",
+ "ssh_ip": instance.get_ssh_ip(),
+ }
elif (instance.isolation == "container"):
# container on bare metal
node = self.get_node(instance)
hostname = node.name
- fields = { "hostname": hostname,
- "baremetal_ssh": True,
- "instance_name": "rootcontext",
- "username": "root",
- "container_name": "%s-%s" % (instance.slice.name, str(instance.id))
- # ssh_ip is not used for container-on-metal
- }
+ fields = {"hostname": hostname,
+ "baremetal_ssh": True,
+ "instance_name": "rootcontext",
+ "username": "root",
+ "container_name": "%s-%s" % (instance.slice.name, str(instance.id))
+ # ssh_ip is not used for container-on-metal
+ }
else:
# container in a VM
if not instance.parent:
@@ -133,13 +135,13 @@
raise Exception("Container-in-VM parent has no service")
if not instance.parent.slice.service.private_key_fn:
raise Exception("Container-in-VM parent service has no private_key_fn")
- fields = { "hostname": instance.parent.node.name,
- "instance_name": instance.parent.name,
- "instance_id": instance.parent.instance_id,
- "username": "ubuntu",
- "ssh_ip": instance.parent.get_ssh_ip(),
- "container_name": "%s-%s" % (instance.slice.name, str(instance.id))
- }
+ fields = {"hostname": instance.parent.node.name,
+ "instance_name": instance.parent.name,
+ "instance_id": instance.parent.instance_id,
+ "username": "ubuntu",
+ "ssh_ip": instance.parent.get_ssh_ip(),
+ "container_name": "%s-%s" % (instance.slice.name, str(instance.id))
+ }
key_name = self.get_key_name(instance)
if not os.path.exists(key_name):
@@ -162,14 +164,14 @@
fields.update({"keystone_tenant_id": cslice.tenant_id,
"keystone_user_id": cuser.kuser_id,
- "rabbit_user": getattr(instance.controller,"rabbit_user", None),
+ "rabbit_user": getattr(instance.controller, "rabbit_user", None),
"rabbit_password": getattr(instance.controller, "rabbit_password", None),
"rabbit_host": getattr(instance.controller, "rabbit_host", None)})
return fields
def sync_record(self, o):
- logger.info("sync'ing object %s" % str(o),extra=o.tologdict())
+ logger.info("sync'ing object %s" % str(o), extra=o.tologdict())
self.prepare_record(o)
@@ -182,12 +184,12 @@
# UNTESTED
(hostname, container_name) = self.get_external_sync(o)
- fields = { "hostname": hostname,
- "baremetal_ssh": True,
- "instance_name": "rootcontext",
- "username": "root",
- "container_name": container_name
- }
+ fields = {"hostname": hostname,
+ "baremetal_ssh": True,
+ "instance_name": "rootcontext",
+ "username": "root",
+ "container_name": container_name
+ }
key_name = self.get_node_key(node)
if not os.path.exists(key_name):
raise Exception("Node key %s does not exist" % key_name)
@@ -228,8 +230,8 @@
controller = o.get_controller()
controller_register = json.loads(o.node.site_deployment.controller.backend_register)
- if (controller_register.get('disabled',False)):
- raise InnocuousException('Controller %s is disabled'%o.node.site_deployment.controller.name)
+ if (controller_register.get('disabled', False)):
+ raise InnocuousException('Controller %s is disabled' % o.node.site_deployment.controller.name)
except AttributeError:
pass
@@ -248,10 +250,10 @@
# XXX - this probably needs more work...
- fields = { "hostname": instance,
- "instance_id": "ubuntu", # this is the username to log into
- "private_key": service.key,
- }
+ fields = {"hostname": instance,
+ "instance_id": "ubuntu", # this is the username to log into
+ "private_key": service.key,
+ }
else:
# sync to an XOS instance
fields = self.get_ansible_fields(instance)
@@ -267,24 +269,24 @@
if hasattr(self, "map_delete_inputs"):
fields.update(self.map_delete_inputs(o))
- fields['delete']=True
- res = self.run_playbook(o,fields)
+ fields['delete'] = True
+ res = self.run_playbook(o, fields)
if hasattr(self, "map_delete_outputs"):
- self.map_delete_outputs(o,res)
+ self.map_delete_outputs(o, res)
- #In order to enable the XOS watcher functionality for a synchronizer, define the 'watches' attribute
- #in the derived class: eg. watches = [ModelLink(ServiceDependency,via='servicedependency')]
- #This base class implements the notification handler for handling ServiceDependency model notifications
- #If a synchronizer need to watch on multiple objects, the additional handlers need to be implemented
- #in the derived class and override the below handle_watched_object() method to route the notifications
- #accordingly
+ # In order to enable the XOS watcher functionality for a synchronizer, define the 'watches' attribute
+ # in the derived class: eg. watches = [ModelLink(ServiceDependency,via='servicedependency')]
+ # This base class implements the notification handler for handling ServiceDependency model notifications
+ # If a synchronizer need to watch on multiple objects, the additional handlers need to be implemented
+ # in the derived class and override the below handle_watched_object() method to route the notifications
+ # accordingly
def handle_watched_object(self, o):
- logger.info("handle_watched_object is invoked for object %s" % (str(o)),extra=o.tologdict())
+ logger.info("handle_watched_object is invoked for object %s" % (str(o)), extra=o.tologdict())
if (model_accessor.is_type(o, "ServiceDependency")):
- self.handle_service_composition_watch_notification(o)
+ self.handle_service_composition_watch_notification(o)
elif (model_accessor.is_type(o, "ServiceMonitoringAgentInfo")):
- self.handle_service_monitoringagentinfo_watch_notification(o)
+ self.handle_service_monitoringagentinfo_watch_notification(o)
pass
def handle_service_composition_watch_notification(self, coarse_tenant):
@@ -303,14 +305,14 @@
def handle_service_composition_for_object(self, obj, coarse_tenant):
try:
- instance = self.get_instance(obj)
- valid_instance = True
+ instance = self.get_instance(obj)
+ valid_instance = True
except:
- valid_instance = False
+ valid_instance = False
if not valid_instance:
- logger.warn("handle_watched_object: No valid instance found for object %s" % (str(obj)))
- return
+ logger.warn("handle_watched_object: No valid instance found for object %s" % (str(obj)))
+ return
provider_service = coarse_tenant.provider_service
subscriber_service = coarse_tenant.subscriber_service
@@ -323,7 +325,8 @@
matched_service = subscriber_service
other_service = provider_service
else:
- logger.info("handle_watched_object: Service object %s does not match with any of composed services" % (str(obj)))
+ logger.info("handle_watched_object: Service object %s does not match with any of composed services" % (
+ str(obj)))
return
elif model_accessor.is_instance(obj, "Tenant"):
if obj.provider_service.id == provider_service.id:
@@ -333,27 +336,32 @@
matched_service = subscriber_service
other_service = provider_service
else:
- logger.info("handle_watched_object: Tenant object %s does not match with any of composed services" % (str(obj)))
+ logger.info(
+ "handle_watched_object: Tenant object %s does not match with any of composed services" % (str(obj)))
return
else:
- logger.warn("handle_watched_object: Model object %s is of neither Service nor Tenant type" % (str(obj)))
+ logger.warn("handle_watched_object: Model object %s is of neither Service nor Tenant type" % (str(obj)))
src_networks = matched_service.get_composable_networks()
target_networks = other_service.get_composable_networks()
if src_networks and target_networks:
- src_network = src_networks[0] #Only one composable network should present per service
+ src_network = src_networks[0] # Only one composable network should present per service
target_network = target_networks[0]
src_ip = instance.get_network_ip(src_network.name)
target_subnet = target_network.controllernetworks.all()[0].subnet
- #Run ansible playbook to update the routing table entries in the instance
+ # Run ansible playbook to update the routing table entries in the instance
fields = self.get_ansible_fields(instance)
- fields["ansible_tag"] = getattr(obj, "ansible_tag", obj.__class__.__name__ + "_" + str(obj.id)) + "_service_composition"
+ fields["ansible_tag"] = getattr(obj, "ansible_tag",
+ obj.__class__.__name__ + "_" + str(obj.id)) + "_service_composition"
fields["src_intf_ip"] = src_ip
fields["target_subnet"] = target_subnet
- #Template file is available under .../synchronizers/shared_templates
+ # Template file is available under .../synchronizers/shared_templates
service_composition_template_name = "sync_service_composition.yaml"
- logger.info("handle_watched_object: Updating routing tables in the instance associated with object %s: target_subnet:%s src_ip:%s" % (str(obj), target_subnet, src_ip))
+ logger.info(
+ "handle_watched_object: Updating routing tables in the instance associated with object %s: target_subnet:%s src_ip:%s" % (
+ str(obj), target_subnet, src_ip))
SyncInstanceUsingAnsible.run_playbook(self, obj, fields, service_composition_template_name)
else:
- logger.info("handle_watched_object: No intersection of composable networks between composed services %s" % (str(coarse_tenant)))
+ logger.info("handle_watched_object: No intersection of composable networks between composed services %s" % (
+ str(coarse_tenant)))
diff --git a/xos/synchronizers/new_base/__init__.py b/xos/synchronizers/new_base/__init__.py
index e56cd39..e69de29 100644
--- a/xos/synchronizers/new_base/__init__.py
+++ b/xos/synchronizers/new_base/__init__.py
@@ -1,36 +0,0 @@
-from xos.config import Config
-
-try:
- observer_disabled = Config().observer_disabled
-except:
- observer_disabled = False
-
-def EnableObserver(x):
- """ used for manage.py --noobserver """
- global observer_disabled
- observer_disabled = not x
-
-print_once = True
-
-def notify_observer(model=None, delete=False, pk=None, model_dict={}):
- if (observer_disabled):
- global print_once
- if (print_once):
- print "The observer is disabled"
- print_once = False
- return
-
- try:
- from .event_manager import EventSender
- if (model and delete):
- if hasattr(model,"__name__"):
- modelName = model.__name__
- else:
- modelName = model.__class__.__name__
- EventSender().fire(delete_flag = delete, model = modelName, pk = pk, model_dict=model_dict)
- else:
- EventSender().fire()
- except Exception,e:
- print "Exception in Observer. This should not disrupt the front end. %s"%str(e)
-
-
diff --git a/xos/synchronizers/new_base/ansible_helper.py b/xos/synchronizers/new_base/ansible_helper.py
index 5bc79b4..2eca993 100644
--- a/xos/synchronizers/new_base/ansible_helper.py
+++ b/xos/synchronizers/new_base/ansible_helper.py
@@ -12,13 +12,13 @@
import traceback
import subprocess
import threading
-from xos.config import Config, XOS_DIR
+from xosconfig import Config
from xos.logger import observer_logger as logger
from multiprocessing import Process, Queue
-step_dir = Config().observer_steps_dir
-sys_dir = Config().observer_sys_dir
+step_dir = Config.get("steps_dir")
+sys_dir = Config.get("sys_dir")
os_template_loader = jinja2.FileSystemLoader( searchpath=[step_dir, "/opt/xos/synchronizers/shared_templates"])
os_template_env = jinja2.Environment(loader=os_template_loader)
@@ -56,7 +56,7 @@
"fqp": fqp,
"opts": opts}
- keep_temp_files = getattr(Config(), "observer_keep_temp_files", False)
+ keep_temp_files = Config.get("keep_temp_files")
dir = tempfile.mkdtemp()
args_fn = None
@@ -198,10 +198,7 @@
else:
instance_id = opts["instance_id"]
ssh_ip = opts["ssh_ip"]
- try:
- proxy_ssh = Config().observer_proxy_ssh
- except:
- proxy_ssh = True
+ proxy_ssh = Config.get("proxy_ssh.enabled")
if (not ssh_ip):
raise Exception('IP of ssh proxy not available. Synchronization deferred')
@@ -218,8 +215,8 @@
f = open(config_pathname, "w")
f.write("[ssh_connection]\n")
if proxy_ssh:
- proxy_ssh_key = getattr(Config(), "observer_proxy_ssh_key", None)
- proxy_ssh_user = getattr(Config(), "observer_proxy_ssh_user", "root")
+ proxy_ssh_key = Config.get("proxy_ssh.key")
+ proxy_ssh_user = Config.get("proxy_ssh.user")
if proxy_ssh_key:
# If proxy_ssh_key is known, then we can proxy into the compute
# node without needing to have the OpenCloud sshd machinery in
diff --git a/xos/synchronizers/new_base/backend.py b/xos/synchronizers/new_base/backend.py
index eccd570..1ef2ec6 100644
--- a/xos/synchronizers/new_base/backend.py
+++ b/xos/synchronizers/new_base/backend.py
@@ -9,9 +9,9 @@
from synchronizers.new_base.model_policy_loop import XOSPolicyEngine
from synchronizers.new_base.modelaccessor import *
from xos.logger import Logger, logging
-from xos.config import Config
+from xosconfig import Config
-watchers_enabled = getattr(Config(), "observer_enable_watchers", None)
+watchers_enabled = Config.get("enable_watchers")
if (watchers_enabled):
from synchronizers.new_base.watchers import XOSWatcher
@@ -56,7 +56,7 @@
model_accessor.update_diag(sync_start=time.time(), backend_status="0 - Synchronizer Start")
- steps_dir = Config().observer_steps_dir
+ steps_dir = Config.get("steps_dir")
if steps_dir:
sync_steps = self.load_sync_step_modules(steps_dir)
if sync_steps:
@@ -74,7 +74,7 @@
logger.info("Skipping observer and watcher threads due to no steps dir.")
# start model policies thread
- policies_dir = getattr(Config(), "observer_model_policies_dir", None)
+ policies_dir = Config.get("model_policies_dir")
if policies_dir:
policy_engine = XOSPolicyEngine(policies_dir=policies_dir)
model_policy_thread = threading.Thread(target=policy_engine.run, name="policy_engine")
diff --git a/xos/synchronizers/new_base/backend_modelpolicy.py b/xos/synchronizers/new_base/backend_modelpolicy.py
index 9aeb731..612f810 100644
--- a/xos/synchronizers/new_base/backend_modelpolicy.py
+++ b/xos/synchronizers/new_base/backend_modelpolicy.py
@@ -7,10 +7,11 @@
from syncstep import SyncStep
from synchronizers.new_base.event_loop import XOSObserver
from xos.logger import Logger, logging
-from xos.config import Config
+from xosconfig import Config
-watchers_enabled = getattr(Config(), "observer_enable_watchers", None)
+watchers_enabled = Config.get("enable_watchers")
+# NOTE is this used or can be removed?
if (watchers_enabled):
from synchronizers.new_base.watchers import XOSWatcher
@@ -19,7 +20,7 @@
class Backend:
def run(self):
# start model policies thread
- policies_dir = getattr(Config(), "observer_model_policies_dir", None)
+ policies_dir = Config("model_policies_dir")
if policies_dir:
from synchronizers.model_policy import run_policy
model_policy_thread = threading.Thread(target=run_policy)
diff --git a/xos/synchronizers/new_base/deleter.py b/xos/synchronizers/new_base/deleter.py
index 93fa572..7f0d332 100644
--- a/xos/synchronizers/new_base/deleter.py
+++ b/xos/synchronizers/new_base/deleter.py
@@ -1,6 +1,4 @@
-import os
-import base64
-from xos.config import Config
+# NOTE this appear not to be used, can we delete it?
class Deleter:
model=None # Must be overridden
diff --git a/xos/synchronizers/new_base/dependency_walker_new.py b/xos/synchronizers/new_base/dependency_walker_new.py
index 17b7a9a..add8f27 100644
--- a/xos/synchronizers/new_base/dependency_walker_new.py
+++ b/xos/synchronizers/new_base/dependency_walker_new.py
@@ -6,92 +6,89 @@
import os
import imp
-from xos.config import Config, XOS_DIR
import inspect
import time
import traceback
import commands
import threading
+from xosconfig import Config
import json
from xos.logger import Logger, logging
+
logger = Logger(level=logging.INFO)
-missing_links={}
+missing_links = {}
-try:
- dep_data = open(Config().observer_dependency_graph).read()
-except:
- dep_data = open(XOS_DIR + '/model-deps').read()
+dep_data = open(Config.get("dependency_graph")).read()
dependencies = json.loads(dep_data)
inv_dependencies = {}
for k, lst in dependencies.items():
- for v in lst:
- try:
- inv_dependencies[v].append(k)
- except KeyError:
- inv_dependencies[v]=[k]
-
+ for v in lst:
+ try:
+ inv_dependencies[v].append(k)
+ except KeyError:
+ inv_dependencies[v] = [k]
+
def plural(name):
- if (name.endswith('s')):
- return name+'es'
- else:
- return name+'s'
+ if name.endswith('s'):
+ return name + 'es'
+ else:
+ return name + 's'
def walk_deps(fn, object):
- model = object.__class__.__name__
- try:
- deps = dependencies[model]
- except:
- deps = []
- return __walk_deps(fn, object, deps)
+ model = object.__class__.__name__
+ try:
+ deps = dependencies[model]
+ except:
+ deps = []
+ return __walk_deps(fn, object, deps)
+
def walk_inv_deps(fn, object):
- model = object.__class__.__name__
- try:
- deps = inv_dependencies[model]
- except:
- deps = []
- return __walk_deps(fn, object, deps)
+ model = object.__class__.__name__
+ try:
+ deps = inv_dependencies[model]
+ except:
+ deps = []
+ return __walk_deps(fn, object, deps)
+
def __walk_deps(fn, object, deps):
- model = object.__class__.__name__
- ret = []
- for dep in deps:
- #print "Checking dep %s"%dep
- peer=None
- link = dep.lower()
- try:
- peer = getattr(object, link)
- except AttributeError:
- link = plural(link)
- try:
- peer = getattr(object, link)
- except AttributeError:
- if not missing_links.has_key(model+'.'+link):
- print "Model %s missing link for dependency %s"%(model, link)
- logger.log_exc("WARNING: Model %s missing link for dependency %s."%(model, link))
- missing_links[model+'.'+link]=True
+ model = object.__class__.__name__
+ ret = []
+ for dep in deps:
+ # print "Checking dep %s"%dep
+ peer = None
+ link = dep.lower()
+ try:
+ peer = getattr(object, link)
+ except AttributeError:
+ link = plural(link)
+ try:
+ peer = getattr(object, link)
+ except AttributeError:
+ if not missing_links.has_key(model + '.' + link):
+ print "Model %s missing link for dependency %s" % (model, link)
+ logger.log_exc("WARNING: Model %s missing link for dependency %s." % (model, link))
+ missing_links[model + '.' + link] = True
+ if (peer):
+ try:
+ peer_objects = peer.all()
+ except AttributeError:
+ peer_objects = [peer]
+ except:
+ peer_objects = []
- if (peer):
- try:
- peer_objects = peer.all()
- except AttributeError:
- peer_objects = [peer]
- except:
- peer_objects = []
-
- for o in peer_objects:
- if (hasattr(o,'updated')):
- fn(o, object)
- ret.append(o)
- # Uncomment the following line to enable recursion
- # walk_inv_deps(fn, o)
- return ret
-
-
+ for o in peer_objects:
+ if (hasattr(o, 'updated')):
+ fn(o, object)
+ ret.append(o)
+ # Uncomment the following line to enable recursion
+ # walk_inv_deps(fn, o)
+ return ret
diff --git a/xos/synchronizers/new_base/diag.py b/xos/synchronizers/new_base/diag.py
index 3384c0b..ff0a341 100644
--- a/xos/synchronizers/new_base/diag.py
+++ b/xos/synchronizers/new_base/diag.py
@@ -1,16 +1,15 @@
-import os
-import time
-import sys
import traceback
import json
-from xos.config import Config, XOS_DIR
+from xosconfig import Config
from xos.logger import Logger, logging, logger
logger = Logger(level=logging.INFO)
-def update_diag(diag_class, loop_end=None, loop_start=None, syncrecord_start=None, sync_start=None, backend_status=None):
- observer_name = Config().observer_name
+
+def update_diag(diag_class, loop_end=None, loop_start=None, syncrecord_start=None, sync_start=None,
+ backend_status=None):
+ observer_name = Config.get("name")
try:
diag = diag_class.objects.filter(name=observer_name).first()
@@ -41,4 +40,3 @@
except:
logger.log_exc("Exception in update_diag")
traceback.print_exc()
-
diff --git a/xos/synchronizers/new_base/error_mapper.py b/xos/synchronizers/new_base/error_mapper.py
index 97f5ad5..82805f1 100644
--- a/xos/synchronizers/new_base/error_mapper.py
+++ b/xos/synchronizers/new_base/error_mapper.py
@@ -1,27 +1,20 @@
-from xos.config import Config
from xos.logger import Logger, logging, logger
logger = Logger(level=logging.INFO)
+
class ErrorMapper:
- def __init__(self, error_map_file):
- self.error_map = {}
- try:
- error_map_lines = open(error_map_file).read().splitlines()
- for l in error_map_lines:
- if (not l.startswith('#')):
- splits = l.split('->')
- k,v = map(lambda i:i.rstrip(),splits)
- self.error_map[k]=v
- except:
- logger.info('Could not read error map')
+ def __init__(self, error_map_file):
+ self.error_map = {}
+ try:
+ error_map_lines = open(error_map_file).read().splitlines()
+ for l in error_map_lines:
+ if (not l.startswith('#')):
+ splits = l.split('->')
+ k, v = map(lambda i: i.rstrip(), splits)
+ self.error_map[k] = v
+ except:
+ logger.info('Could not read error map')
-
- def map(self, error):
- return self.error_map[error]
-
-
-
-
-
-
+ def map(self, error):
+ return self.error_map[error]
diff --git a/xos/synchronizers/new_base/event_loop.py b/xos/synchronizers/new_base/event_loop.py
index 0c89a3c..32d3da4 100644
--- a/xos/synchronizers/new_base/event_loop.py
+++ b/xos/synchronizers/new_base/event_loop.py
@@ -10,11 +10,9 @@
import pdb
import pprint
import traceback
-
-
from datetime import datetime
from xos.logger import Logger, logging, logger
-from xos.config import Config, XOS_DIR
+from xosconfig import Config
from synchronizers.new_base.steps import *
from syncstep import SyncStep, NullSyncStep
from toposort import toposort
@@ -24,6 +22,7 @@
debug_mode = False
+
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
@@ -34,6 +33,7 @@
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
+
logger = Logger(level=logging.INFO)
@@ -42,11 +42,11 @@
class NoOpDriver:
-
def __init__(self):
self.enabled = True
self.dependency_graph = None
+
# Everyone gets NoOpDriver by default. To use a different driver, call
# set_driver() below.
@@ -57,6 +57,7 @@
global DRIVER
DRIVER = x
+
STEP_STATUS_WORKING = 1
STEP_STATUS_OK = 2
STEP_STATUS_KO = 3
@@ -84,7 +85,7 @@
self.event_cond = threading.Condition()
self.driver = DRIVER
- self.observer_name = getattr(Config(), "observer_name", "")
+ self.observer_name = Config.get("name")
def wait_for_event(self, timeout):
self.event_cond.acquire()
@@ -98,7 +99,7 @@
self.event_cond.release()
def load_sync_steps(self):
- dep_path = Config().observer_dependency_graph
+ dep_path = Config.get("dependency_graph")
logger.info('Loading model dependency graph from %s' % dep_path)
try:
# This contains dependencies between records, not sync steps
@@ -118,7 +119,9 @@
raise e
try:
- backend_path = Config().observer_pl_dependency_graph
+ # FIXME `pl_dependency_graph` is never defined, this will always fail
+ # NOTE can we remove it?
+ backend_path = Config.get("pl_dependency_graph")
logger.info(
'Loading backend dependency graph from %s' %
backend_path)
@@ -345,7 +348,7 @@
my_status = STEP_STATUS_KO
else:
sync_step = self.lookup_step(S)
- sync_step. __name__ = step.__name__
+ sync_step.__name__ = step.__name__
sync_step.dependencies = []
try:
mlist = sync_step.provides
@@ -452,16 +455,12 @@
model_accessor.check_db_connection_okay()
loop_start = time.time()
- error_map_file = getattr(
- Config(),
- "error_map_path",
- XOS_DIR +
- "/error_map.txt")
+ error_map_file = Config.get('error_map_path')
self.error_mapper = ErrorMapper(error_map_file)
# Two passes. One for sync, the other for deletion.
for deletion in [False, True]:
- # Set of individual objects within steps that failed
+ # Set of individual objects within steps that failed
self.failed_step_objects = set()
# Set up conditions and step status
diff --git a/xos/synchronizers/new_base/event_manager.py b/xos/synchronizers/new_base/event_manager.py
index fce2b68..7592650 100644
--- a/xos/synchronizers/new_base/event_manager.py
+++ b/xos/synchronizers/new_base/event_manager.py
@@ -1,8 +1,7 @@
+# FIXME Appear that a lot of unused code sits in here
+
import threading
-import requests, json
-
-from xos.config import Config, XOS_DIR
-
+from xosconfig import Config
import uuid
import os
import imp
@@ -11,7 +10,11 @@
import json
import traceback
-if getattr(Config(),"observer_fofum_disabled", False) != True:
+# NOTE can we use a path relative to this file?
+XOS_DIR = "/opt/xos"
+
+# NOTE I believe fofum is not used anymore, can we remove this?
+if Config.get("fofum_disabled") is None:
from fofum import Fofum
fofum_enabled = True
else:
@@ -67,13 +70,11 @@
class EventSender:
def __init__(self,user=None,clientid=None):
- try:
- user = Config().feefie_client_user
- except:
- user = 'pl'
+
+ user = Config.get("feefie.client_user")
try:
- clid = Config().feefie_client_id
+ clid = Config.get("feefie.client_id")
except:
clid = get_random_client_id()
print "EventSender: no feefie_client_id configured. Using random id %s" % clid
@@ -102,13 +103,10 @@
# This is our unique client id, to be used when firing and receiving events
# It needs to be generated once and placed in the config file
- try:
- user = Config().feefie_client_user
- except:
- user = 'pl'
+ user = Config.get("feefie.client_user")
try:
- clid = Config().feefie_client_id
+ clid = Config.get("feefie.client_id")
except:
clid = get_random_client_id()
print "EventListener: no feefie_client_id configured. Using random id %s" % clid
diff --git a/xos/synchronizers/new_base/modelaccessor.py b/xos/synchronizers/new_base/modelaccessor.py
index 0912ffb..8c6fbe7 100644
--- a/xos/synchronizers/new_base/modelaccessor.py
+++ b/xos/synchronizers/new_base/modelaccessor.py
@@ -11,14 +11,16 @@
import functools
import os
import signal
-from xos.config import Config
+from xosconfig import Config
from diag import update_diag
from xos.logger import Logger, logging
+
logger = Logger(level=logging.INFO)
orig_sigint = None
+
class ModelAccessor(object):
def __init__(self):
self.all_model_classes = self.get_all_model_classes()
@@ -73,7 +75,8 @@
def update_diag(self, loop_end=None, loop_start=None, syncrecord_start=None, sync_start=None, backend_status=None):
if self.has_model_class("Diag"):
- return update_diag(self.get_model_class("Diag"), loop_end, loop_start, syncrecord_start, sync_start, backend_status)
+ return update_diag(self.get_model_class("Diag"), loop_end, loop_start, syncrecord_start, sync_start,
+ backend_status)
def is_type(self, obj, name):
""" returns True is obj is of model type "name" """
@@ -92,6 +95,7 @@
def create_obj(self, cls, **kwargs):
raise Exception("Not Implemented")
+
def import_models_to_globals():
# add all models to globals
for (k, v) in model_accessor.all_model_classes.items():
@@ -101,12 +105,14 @@
# ModelLink.
if "ModelLink" not in globals():
class ModelLink:
- def __init__(self,dest,via,into=None):
- self.dest=dest
- self.via=via
- self.into=into
+ def __init__(self, dest, via, into=None):
+ self.dest = dest
+ self.via = via
+ self.into = into
+
globals()["ModelLink"] = ModelLink
+
def keep_trying(client, reactor):
# Keep checking the connection to wait for it to become unavailable.
# Then reconnect.
@@ -128,6 +134,7 @@
reactor.callLater(1, functools.partial(keep_trying, client, reactor))
+
def grpcapi_reconnect(client, reactor):
global model_accessor
@@ -135,15 +142,14 @@
client.xos_orm.caller_kind = "synchronizer"
from apiaccessor import CoreApiModelAccessor
- model_accessor = CoreApiModelAccessor(orm = client.xos_orm)
+ model_accessor = CoreApiModelAccessor(orm=client.xos_orm)
# If required_models is set, then check to make sure the required_models
# are present. If not, then the synchronizer needs to go to sleep until
# the models show up.
- required_models = getattr(Config(), "observer_required_models", None)
+ required_models = Config.get("required_models")
if required_models:
- required_models = required_models.split(",")
required_models = [x.strip() for x in required_models]
missing = []
@@ -171,43 +177,40 @@
# Restore the sigint handler
signal.signal(signal.SIGINT, orig_sigint)
+
def config_accessor():
global model_accessor
global orig_sigint
- accessor_kind = getattr(Config(), "observer_accessor_kind", "django")
+ grpcapi_endpoint = Config.get("accessor.endpoint")
+ grpcapi_username = Config.get("accessor.username")
+ grpcapi_password = Config.get("accessor.password")
- if (accessor_kind == "django"):
- from djangoaccessor import DjangoModelAccessor
- model_accessor = DjangoModelAccessor()
- import_models_to_globals()
- else:
- grpcapi_endpoint = getattr(Config(), "observer_accessor_endpoint", "xos-core.cord.lab:50051")
- grpcapi_username = getattr(Config(), "observer_accessor_username", "xosadmin@opencord.org")
- grpcapi_password = getattr(Config(), "observer_accessor_password")
+ # if password starts with "@", then retreive the password from a file
+ if grpcapi_password.startswith("@"):
+ fn = grpcapi_password[1:]
+ if not os.path.exists(fn):
+ raise Exception("%s does not exist" % fn)
+ grpcapi_password = open(fn).readline().strip()
- # if password starts with "@", then retreive the password from a file
- if grpcapi_password.startswith("@"):
- fn = grpcapi_password[1:]
- if not os.path.exists(fn):
- raise Exception("%s does not exist" % fn)
- grpcapi_password = open(fn).readline().strip()
+ from xosapi.xos_grpc_client import SecureClient
+ from twisted.internet import reactor
- from xosapi.xos_grpc_client import SecureClient
- from twisted.internet import reactor
+ grpcapi_client = SecureClient(endpoint=grpcapi_endpoint, username=grpcapi_username, password=grpcapi_password)
+ grpcapi_client.set_reconnect_callback(functools.partial(grpcapi_reconnect, grpcapi_client, reactor))
+ grpcapi_client.start()
- grpcapi_client = SecureClient(endpoint = grpcapi_endpoint, username = grpcapi_username, password=grpcapi_password)
- grpcapi_client.set_reconnect_callback(functools.partial(grpcapi_reconnect, grpcapi_client, reactor))
- grpcapi_client.start()
+ # Start reactor. This will cause the client to connect and then execute
+ # grpcapi_callback().
- # Reactor will take over SIGINT during reactor.run(), but does not return it when reactor.stop() is called.
+ # Reactor will take over SIGINT during reactor.run(), but does not return it when reactor.stop() is called.
- orig_sigint = signal.getsignal(signal.SIGINT)
+ orig_sigint = signal.getsignal(signal.SIGINT)
- # Start reactor. This will cause the client to connect and then execute
- # grpcapi_callback().
+ # Start reactor. This will cause the client to connect and then execute
+ # grpcapi_callback().
- reactor.run()
+ reactor.run()
+
config_accessor()
-
diff --git a/xos/synchronizers/new_base/steps/__history/sync_object.py.~1~ b/xos/synchronizers/new_base/steps/__history/sync_object.py.~1~
deleted file mode 100644
index 5049325..0000000
--- a/xos/synchronizers/new_base/steps/__history/sync_object.py.~1~
+++ /dev/null
@@ -1,18 +0,0 @@
-import os
-import base64
-from collections import defaultdict
-from django.db.models import F, Q
-from xos.config import Config
-from synchronizers.base.syncstep import *
-from core.models import *
-from synchronizers.base.ansible_helper import *
-from xos.logger import observer_logger as logger
-import json
-
-class SyncObject(SyncStep):
- provides=[] # Caller fills this in
- requested_interval=0
- observes=[] # Caller fills this in
-
- def sync_record(self, r):
- raise DeferredException('Waiting for Service dependency: %r'%r)
diff --git a/xos/synchronizers/new_base/steps/__history/sync_object.py.~2~ b/xos/synchronizers/new_base/steps/__history/sync_object.py.~2~
deleted file mode 100644
index e4495f4..0000000
--- a/xos/synchronizers/new_base/steps/__history/sync_object.py.~2~
+++ /dev/null
@@ -1,16 +0,0 @@
-import os
-import base64
-from collections import defaultdict
-from xos.config import Config
-from synchronizers.new_base.syncstep import *
-from core.models import *
-from xos.logger import observer_logger as logger
-import json
-
-class SyncObject(SyncStep):
- provides=[] # Caller fills this in
- requested_interval=0
- observes=[] # Caller fills this in
-
- def sync_record(self, r):
- raise DeferredException('Waiting for Service dependency: %r'%r)
diff --git a/xos/synchronizers/new_base/steps/sync_object.py b/xos/synchronizers/new_base/steps/sync_object.py
index 1c10b24..291605e 100644
--- a/xos/synchronizers/new_base/steps/sync_object.py
+++ b/xos/synchronizers/new_base/steps/sync_object.py
@@ -1,10 +1,4 @@
-import os
-import base64
-from collections import defaultdict
-from xos.config import Config
from synchronizers.new_base.syncstep import *
-from xos.logger import observer_logger as logger
-import json
class SyncObject(SyncStep):
provides=[] # Caller fills this in
diff --git a/xos/synchronizers/new_base/syncstep.py b/xos/synchronizers/new_base/syncstep.py
index cb3de5f..34b112f 100644
--- a/xos/synchronizers/new_base/syncstep.py
+++ b/xos/synchronizers/new_base/syncstep.py
@@ -1,12 +1,9 @@
import os
import base64
-from xos.config import Config
+from xosconfig import Config
from xos.logger import Logger, logging
from synchronizers.new_base.modelaccessor import *
from synchronizers.new_base.ansible_helper import run_template
-#from synchronizers.new_base.steps import *
-#from synchronizers.new_base.ansible_helper import *
-#from generate.dependency_walker import *
import json
import time
@@ -14,31 +11,39 @@
logger = Logger(level=logging.DEBUG)
+
def f7(seq):
seen = set()
seen_add = seen.add
- return [ x for x in seq if not (x in seen or seen_add(x))]
+ return [x for x in seq if not (x in seen or seen_add(x))]
+
def elim_dups(backend_str):
strs = backend_str.split(' // ')
strs2 = f7(strs)
return ' // '.join(strs2)
+
def deepgetattr(obj, attr):
return reduce(getattr, attr.split('.'), obj)
+
def obj_class_name(obj):
return getattr(obj, "model_name", obj.__class__.__name__)
+
class InnocuousException(Exception):
pass
+
class DeferredException(Exception):
pass
+
class FailedDependency(Exception):
pass
+
class SyncStep(object):
""" An XOS Sync step.
@@ -52,13 +57,12 @@
# on nat networks.
SYNC_WITHOUT_RUNNING = "sync_without_running"
- slow=False
+ slow = False
+
def get_prop(self, prop):
- try:
- sync_config_dir = Config().sync_config_dir
- except:
- sync_config_dir = '/etc/xos/sync'
- prop_config_path = '/'.join(sync_config_dir,self.name,prop)
+ # NOTE config_dir is never define, is this used?
+ sync_config_dir = Config.get("config_dir")
+ prop_config_path = '/'.join(sync_config_dir, self.name, prop)
return open(prop_config_path).read().rstrip()
def __init__(self, **args):
@@ -74,7 +78,7 @@
try:
self.soft_deadline = int(self.get_prop('soft_deadline_seconds'))
except:
- self.soft_deadline = 5 # 5 seconds
+ self.soft_deadline = 5 # 5 seconds
return
@@ -87,12 +91,12 @@
def check_dependencies(self, obj, failed):
for dep in self.dependencies:
- peer_name = dep[0].lower() + dep[1:] # django names are camelCased with the first letter lower
+ peer_name = dep[0].lower() + dep[1:] # django names are camelCased with the first letter lower
- peer_objects=[]
+ peer_objects = []
try:
peer_names = plural(peer_name)
- peer_object_list=[]
+ peer_object_list = []
try:
peer_object_list.append(deepgetattr(obj, peer_name))
@@ -112,69 +116,70 @@
except:
peer_objects = []
-# if (hasattr(obj,'controller')):
-# try:
-# peer_objects = filter(lambda o:o.controller==obj.controller, peer_objects)
-# except AttributeError:
-# pass
+ # if (hasattr(obj,'controller')):
+ # try:
+ # peer_objects = filter(lambda o:o.controller==obj.controller, peer_objects)
+ # except AttributeError:
+ # pass
if (model_accessor.obj_in_list(failed, peer_objects)):
if (obj.backend_status != failed.backend_status):
obj.backend_status = failed.backend_status
obj.save(update_fields=['backend_status'])
- raise FailedDependency("Failed dependency for %s:%s peer %s:%s failed %s:%s" % (obj_class_name(obj), str(getattr(obj,"pk","no_pk")), obj_class_name(peer_object), str(getattr(peer_object,"pk","no_pk")), obj_class_name(failed), str(getattr(failed,"pk","no_pk"))))
-
+ raise FailedDependency("Failed dependency for %s:%s peer %s:%s failed %s:%s" % (
+ obj_class_name(obj), str(getattr(obj, "pk", "no_pk")), obj_class_name(peer_object),
+ str(getattr(peer_object, "pk", "no_pk")), obj_class_name(failed), str(getattr(failed, "pk", "no_pk"))))
def sync_record(self, o):
logger.debug("Sync_record called for %s %s" % (obj_class_name(o), str(o)))
-# try:
-# controller = o.get_controller()
-# controller_register = json.loads(controller.backend_register)
-#
-# if (controller_register.get('disabled',False)):
-# raise InnocuousException('Controller %s is disabled'%controller.name)
-# except AttributeError:
-# pass
+ # try:
+ # controller = o.get_controller()
+ # controller_register = json.loads(controller.backend_register)
+ #
+ # if (controller_register.get('disabled',False)):
+ # raise InnocuousException('Controller %s is disabled'%controller.name)
+ # except AttributeError:
+ # pass
tenant_fields = self.map_sync_inputs(o)
if tenant_fields == SyncStep.SYNC_WITHOUT_RUNNING:
return
- main_objs=self.observes
+ main_objs = self.observes
if (type(main_objs) is list):
- main_objs=main_objs[0]
+ main_objs = main_objs[0]
path = ''.join(main_objs.__name__).lower()
res = run_template(self.playbook, tenant_fields, path=path, object=o)
if hasattr(self, "map_sync_outputs"):
- self.map_sync_outputs(o,res)
+ self.map_sync_outputs(o, res)
def delete_record(self, o):
-# try:
-# controller = o.get_controller()
-# controller_register = json.loads(o.node.site_deployment.controller.backend_register)
-#
-# if (controller_register.get('disabled',False)):
-# raise InnocuousException('Controller %s is disabled'%sliver.node.site_deployment.controller.name)
-# except AttributeError:
-# pass
+ # try:
+ # controller = o.get_controller()
+ # controller_register = json.loads(o.node.site_deployment.controller.backend_register)
+ #
+ # if (controller_register.get('disabled',False)):
+ # raise InnocuousException('Controller %s is disabled'%sliver.node.site_deployment.controller.name)
+ # except AttributeError:
+ # pass
tenant_fields = self.map_delete_inputs(o)
- main_objs=self.observes
+ main_objs = self.observes
if (type(main_objs) is list):
- main_objs=main_objs[0]
+ main_objs = main_objs[0]
path = ''.join(main_objs.__name__).lower()
- tenant_fields['delete']=True
+ tenant_fields['delete'] = True
res = run_template(self.playbook, tenant_fields, path=path)
try:
- self.map_delete_outputs(o,res)
+ self.map_delete_outputs(o, res)
except AttributeError:
- pass
+ pass
def call(self, failed=[], deletion=False):
pending = self.fetch_pending(deletion)
@@ -185,39 +190,37 @@
model_accessor.reset_queries()
except:
# this shouldn't happen, but in case it does, catch it...
- logger.log_exc("exception in reset_queries",extra=o.tologdict())
+ logger.log_exc("exception in reset_queries", extra=o.tologdict())
sync_failed = False
- try:
- backoff_disabled = Config().observer_backoff_disabled
- except:
- backoff_disabled = 0
+
+ backoff_disabled = Config.get("backoff_disabled")
try:
scratchpad = json.loads(o.backend_register)
if (scratchpad):
next_run = scratchpad['next_run']
- if (not backoff_disabled and next_run>time.time()):
+ if (not backoff_disabled and next_run > time.time()):
sync_failed = True
except:
- logger.log_exc("Exception while loading scratchpad",extra=o.tologdict())
+ logger.log_exc("Exception while loading scratchpad", extra=o.tologdict())
pass
if (not sync_failed):
try:
for f in failed:
- self.check_dependencies(o,f) # Raises exception if failed
+ self.check_dependencies(o, f) # Raises exception if failed
if (deletion):
if getattr(o, "backend_need_reap", False):
# the object has already been deleted and marked for reaping
- model_accessor.journal_object(o,"syncstep.call.already_marked_reap")
+ model_accessor.journal_object(o, "syncstep.call.already_marked_reap")
else:
- model_accessor.journal_object(o,"syncstep.call.delete_record")
+ model_accessor.journal_object(o, "syncstep.call.delete_record")
self.delete_record(o)
- model_accessor.journal_object(o,"syncstep.call.delete_set_reap")
+ model_accessor.journal_object(o, "syncstep.call.delete_set_reap")
o.backend_need_reap = True
o.save(update_fields=['backend_need_reap'])
- #o.delete(purge=True)
+ # o.delete(purge=True)
else:
new_enacted = model_accessor.now()
try:
@@ -231,67 +234,67 @@
o.backend_need_delete = True
o.save(update_fields=['backend_need_delete'])
- model_accessor.journal_object(o,"syncstep.call.sync_record")
+ model_accessor.journal_object(o, "syncstep.call.sync_record")
self.sync_record(o)
- model_accessor.update_diag(syncrecord_start = time.time(), backend_status="1 - Synced Record")
+ model_accessor.update_diag(syncrecord_start=time.time(), backend_status="1 - Synced Record")
o.enacted = new_enacted
- scratchpad = {'next_run':0, 'exponent':0, 'last_success':time.time()}
+ scratchpad = {'next_run': 0, 'exponent': 0, 'last_success': time.time()}
o.backend_register = json.dumps(scratchpad)
o.backend_status = "1 - OK"
- model_accessor.journal_object(o,"syncstep.call.save_update")
- o.save(update_fields=['enacted','backend_status','backend_register'])
+ model_accessor.journal_object(o, "syncstep.call.save_update")
+ o.save(update_fields=['enacted', 'backend_status', 'backend_register'])
logger.info("save sync object, new enacted = %s" % str(new_enacted))
- except (InnocuousException,Exception,DeferredException) as e:
- logger.log_exc("sync step failed!",extra=o.tologdict())
+ except (InnocuousException, Exception, DeferredException) as e:
+ logger.log_exc("sync step failed!", extra=o.tologdict())
try:
if (o.backend_status.startswith('2 - ')):
- str_e = '%s // %r'%(o.backend_status[4:],e)
+ str_e = '%s // %r' % (o.backend_status[4:], e)
str_e = elim_dups(str_e)
else:
- str_e = '%r'%e
+ str_e = '%r' % e
except:
- str_e = '%r'%e
+ str_e = '%r' % e
try:
error = self.error_map.map(str_e)
except:
- error = '%s'%str_e
+ error = '%s' % str_e
if isinstance(e, InnocuousException):
- o.backend_status = '1 - %s'%error
+ o.backend_status = '1 - %s' % error
else:
- o.backend_status = '2 - %s'%error
+ o.backend_status = '2 - %s' % error
try:
scratchpad = json.loads(o.backend_register)
scratchpad['exponent']
except:
- logger.log_exc("Exception while updating scratchpad",extra=o.tologdict())
- scratchpad = {'next_run':0, 'exponent':0, 'last_success':time.time(),'failures':0}
+ logger.log_exc("Exception while updating scratchpad", extra=o.tologdict())
+ scratchpad = {'next_run': 0, 'exponent': 0, 'last_success': time.time(), 'failures': 0}
# Second failure
if (scratchpad['exponent']):
- if isinstance(e,DeferredException):
- delay = scratchpad['exponent'] * 60 # 1 minute
+ if isinstance(e, DeferredException):
+ delay = scratchpad['exponent'] * 60 # 1 minute
else:
- delay = scratchpad['exponent'] * 600 # 10 minutes
+ delay = scratchpad['exponent'] * 600 # 10 minutes
# cap delays at 8 hours
- if (delay>8*60*60):
- delay=8*60*60
+ if (delay > 8 * 60 * 60):
+ delay = 8 * 60 * 60
scratchpad['next_run'] = time.time() + delay
try:
- scratchpad['exponent']+=1
+ scratchpad['exponent'] += 1
except:
- scratchpad['exponent']=1
+ scratchpad['exponent'] = 1
try:
- scratchpad['failures']+=1
+ scratchpad['failures'] += 1
except KeyError:
- scratchpad['failures']=1
+ scratchpad['failures'] = 1
- scratchpad['last_failure']=time.time()
+ scratchpad['last_failure'] = time.time()
o.backend_register = json.dumps(scratchpad)
@@ -300,13 +303,12 @@
if (model_accessor.obj_exists(o)):
try:
o.backend_status = o.backend_status[:1024]
- o.save(update_fields=['backend_status','backend_register','updated'])
+ o.save(update_fields=['backend_status', 'backend_register', 'updated'])
except:
print "Could not update backend status field!"
pass
sync_failed = True
-
if (sync_failed):
failed.append(o)
@@ -315,12 +317,13 @@
def __call__(self, **args):
return self.call(**args)
+
# TODO: What does this do? It seems like it's just going to toss exceptions.
-class NullSyncStep(SyncStep): # was SyncObject
- provides=[] # Caller fills this in
- requested_interval=0
- observes=[] # Caller fills this in
+class NullSyncStep(SyncStep): # was SyncObject
+ provides = [] # Caller fills this in
+ requested_interval = 0
+ observes = [] # Caller fills this in
def sync_record(self, r):
- raise DeferredException('Waiting for Service dependency: %r'%r)
+ raise DeferredException('Waiting for Service dependency: %r' % r)
diff --git a/xos/synchronizers/new_base/watchers.py b/xos/synchronizers/new_base/watchers.py
index 20c147f..6c78fca 100644
--- a/xos/synchronizers/new_base/watchers.py
+++ b/xos/synchronizers/new_base/watchers.py
@@ -10,30 +10,26 @@
import pprint
import traceback
-
from datetime import datetime
from collections import defaultdict
from xos.logger import Logger, logging, logger
-from xos.config import Config, XOS_DIR
+from xosconfig import Config
from syncstep import SyncStep
from synchronizers.new_base.error_mapper import *
import redis
logger = Logger(level=logging.INFO)
+
class XOSWatcher:
def load_sync_step_modules(self, step_dir=None):
if step_dir is None:
- try:
- step_dir = Config().observer_steps_dir
- except:
- step_dir = '/opt/xos/synchronizers/openstack/steps'
-
+ step_dir = Config.get("steps_dir")
for fn in os.listdir(step_dir):
- pathname = os.path.join(step_dir,fn)
- if os.path.isfile(pathname) and fn.endswith(".py") and (fn!="__init__.py"):
- module = imp.load_source(fn[:-3],pathname)
+ pathname = os.path.join(step_dir, fn)
+ if os.path.isfile(pathname) and fn.endswith(".py") and (fn != "__init__.py"):
+ module = imp.load_source(fn[:-3], pathname)
for classname in dir(module):
c = getattr(module, classname, None)
@@ -41,23 +37,24 @@
# provides field (this eliminates the abstract base classes
# since they don't have a provides)
- if inspect.isclass(c) and issubclass(c, SyncStep) and hasattr(c,"provides") and (c not in self.sync_steps):
+ if inspect.isclass(c) and issubclass(c, SyncStep) and hasattr(c, "provides") and (
+ c not in self.sync_steps):
self.sync_steps.append(c)
def load_sync_steps(self):
for s in self.sync_steps:
- if hasattr(s,'watches'):
+ if hasattr(s, 'watches'):
for w in s.watches:
w.source = s
try:
self.watch_map[w.dest.__name__].append(w)
except:
- self.watch_map[w.dest.__name__]=[w]
+ self.watch_map[w.dest.__name__] = [w]
- def __init__(self,sync_steps):
+ def __init__(self, sync_steps):
self.watch_map = {}
self.sync_steps = sync_steps
- #self.load_sync_step_modules()
+ # self.load_sync_step_modules()
self.load_sync_steps()
r = redis.Redis("redis")
channels = self.watch_map.keys()
@@ -81,5 +78,5 @@
step = w.source()
step.handle_watched_object(o)
except Exception as e:
- logger.log_exc("XOS watcher: exception %s while processing object: %s" % (type(e),e))
+ logger.log_exc("XOS watcher: exception %s while processing object: %s" % (type(e), e))
pass
diff --git a/xos/synchronizers/new_base/xos-synchronizer.py b/xos/synchronizers/new_base/xos-synchronizer.py
index 51a646f..0bb45fa 100644
--- a/xos/synchronizers/new_base/xos-synchronizer.py
+++ b/xos/synchronizers/new_base/xos-synchronizer.py
@@ -6,55 +6,16 @@
sys.path.append('/opt/xos')
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "xos.settings")
-from xos.config import Config, DEFAULT_CONFIG_FN
+from xosconfig import Config
from xos.logger import Logger, logging, logger
import time
from synchronizers.new_base.modelaccessor import *
from synchronizers.new_base.backend import Backend
-config = Config()
-
logger = Logger(level=logging.INFO)
-# after http://www.erlenstar.demon.co.uk/unix/faq_2.html
-def daemon():
- """Daemonize the current process."""
- if os.fork() != 0: os._exit(0)
- os.setsid()
- if os.fork() != 0: os._exit(0)
- os.umask(0)
- devnull = os.open(os.devnull, os.O_RDWR)
- os.dup2(devnull, 0)
- # xxx fixme - this is just to make sure that nothing gets stupidly lost - should use devnull
- logdir=os.path.dirname(config.observer_logfile)
- # when installed in standalone we might not have httpd installed
- if not os.path.isdir(logdir): os.mkdir(logdir)
- crashlog = os.open('%s'%config.observer_logfile, os.O_RDWR | os.O_APPEND | os.O_CREAT, 0644)
- os.dup2(crashlog, 1)
- os.dup2(crashlog, 2)
-
- if hasattr(config, "observer_pidfile"):
- pidfile = config.get("observer_pidfile")
- else:
- pidfile = "/var/run/xosobserver.pid"
- try:
- file(pidfile,"w").write(str(os.getpid()))
- except:
- print "failed to create pidfile %s" % pidfile
-
def main():
- # Generate command line parser
- parser = argparse.ArgumentParser(usage='%(prog)s [options]')
- parser.add_argument('-d', '--daemon', dest='daemon', action='store_true', default=False,
- help='Run as daemon.')
- # smbaker: util/config.py parses sys.argv[] directly to get config file name; include the option here to avoid
- # throwing unrecognized argument exceptions
- parser.add_argument('-C', '--config', dest='config_file', action='store', default=DEFAULT_CONFIG_FN,
- help='Name of config file.')
- args = parser.parse_args()
-
- if args.daemon: daemon()
models_active = False
wait = False
diff --git a/xos/synchronizers/syndicate/__init__.py b/xos/synchronizers/syndicate/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/xos/synchronizers/syndicate/__init__.py
+++ /dev/null
diff --git a/xos/synchronizers/syndicate/model-deps b/xos/synchronizers/syndicate/model-deps
deleted file mode 100644
index b15c5d1..0000000
--- a/xos/synchronizers/syndicate/model-deps
+++ /dev/null
@@ -1,16 +0,0 @@
-{
- "SlicePrivilege": [
- "User",
- "Slice"
- ],
- "Slice": [
- "Site",
- "Service"
- ],
- "VolumeAccessRight": [
- "Volume"
- ],
- "User": [
- "Site"
- ]
-}
diff --git a/xos/synchronizers/syndicate/requirements.py b/xos/synchronizers/syndicate/requirements.py
deleted file mode 100644
index 303fd3d..0000000
--- a/xos/synchronizers/syndicate/requirements.py
+++ /dev/null
@@ -1,5 +0,0 @@
-requests
-gevent
-grequests
-setproctitle
-psutil
diff --git a/xos/synchronizers/syndicate/run.sh b/xos/synchronizers/syndicate/run.sh
deleted file mode 100644
index 82960a9..0000000
--- a/xos/synchronizers/syndicate/run.sh
+++ /dev/null
@@ -1,2 +0,0 @@
-export XOS_DIR=/opt/xos
-python syndicate-backend.py -C $XOS_DIR/observers/syndicate/syndicate_observer_config
diff --git a/xos/synchronizers/syndicate/start.sh b/xos/synchronizers/syndicate/start.sh
deleted file mode 100644
index 1c408a1..0000000
--- a/xos/synchronizers/syndicate/start.sh
+++ /dev/null
@@ -1,2 +0,0 @@
-export XOS_DIR=/opt/xos
-nohup python syndicate-backend.py -C $XOS_DIR/observers/syndicate/syndicate_observer_config > /dev/null 2>&1 &
diff --git a/xos/synchronizers/syndicate/steps/sync_volume.py b/xos/synchronizers/syndicate/steps/sync_volume.py
deleted file mode 100644
index 8773542..0000000
--- a/xos/synchronizers/syndicate/steps/sync_volume.py
+++ /dev/null
@@ -1,138 +0,0 @@
-#!/usr/bin/env python
-
-import os
-import sys
-import traceback
-import base64
-
-if __name__ == "__main__":
- # for testing
- if os.getenv("OPENCLOUD_PYTHONPATH"):
- sys.path.append( os.getenv("OPENCLOUD_PYTHONPATH") )
- else:
- print >> sys.stderr, "No OPENCLOUD_PYTHONPATH variable set. Assuming that OpenCloud is in PYTHONPATH"
-
- os.environ.setdefault("DJANGO_SETTINGS_MODULE", "xos.settings")
-
-
-from django.db.models import F, Q
-from xos.config import Config
-from synchronizers.base.syncstep import SyncStep
-from core.models import Service
-from services.syndicate_storage.models import Volume
-
-import logging
-from logging import Logger
-logging.basicConfig( format='[%(levelname)s] [%(module)s:%(lineno)d] %(message)s' )
-logger = logging.getLogger()
-logger.setLevel( logging.INFO ,extra=o.tologdict())
-
-# point to planetstack
-if __name__ != "__main__":
- if os.getenv("OPENCLOUD_PYTHONPATH") is not None:
- sys.path.insert(0, os.getenv("OPENCLOUD_PYTHONPATH"))
- else:
- logger.warning("No OPENCLOUD_PYTHONPATH set; assuming your PYTHONPATH works")
-
-# syndicatelib will be in stes/..
-parentdir = os.path.join(os.path.dirname(__file__),"..")
-sys.path.insert(0,parentdir)
-
-import syndicatelib
-
-
-class SyncVolume(SyncStep):
- provides=[Volume]
- requested_interval=0
-
- def __init__(self, **args):
- SyncStep.__init__(self, **args)
-
- def sync_record(self, volume):
- """
- Synchronize a Volume record with Syndicate.
- """
-
- logger.info( "Sync Volume = %s\n\n" % volume.name ,extra=volume.tologdict())
-
- user_email = volume.owner_id.email
- config = syndicatelib.get_config()
-
- volume_principal_id = syndicatelib.make_volume_principal_id( user_email, volume.name )
-
- # get the observer secret
- try:
- observer_secret = config.SYNDICATE_OPENCLOUD_SECRET
- except Exception, e:
- traceback.print_exc()
- logger.error("config is missing SYNDICATE_OPENCLOUD_SECRET",extra=volume.tologdict())
- raise e
-
- # volume owner must exist as a Syndicate user...
- try:
- rc, user = syndicatelib.ensure_principal_exists( volume_principal_id, observer_secret, is_admin=False, max_UGs=1100, max_RGs=1)
- assert rc == True, "Failed to create or read volume principal '%s'" % volume_principal_id
- except Exception, e:
- traceback.print_exc()
- logger.error("Failed to ensure principal '%s' exists" % volume_principal_id ,extra=volume.tologdict())
- raise e
-
- # volume must exist
-
- # create or update the Volume
- try:
- new_volume = syndicatelib.ensure_volume_exists( volume_principal_id, volume, user=user )
- except Exception, e:
- traceback.print_exc()
- logger.error("Failed to ensure volume '%s' exists" % volume.name ,extra=volume.tologdict())
- raise e
-
- # did we create the Volume?
- if new_volume is not None:
- # we're good
- pass
-
- # otherwise, just update it
- else:
- try:
- rc = syndicatelib.update_volume( volume )
- except Exception, e:
- traceback.print_exc()
- logger.error("Failed to update volume '%s', exception = %s" % (volume.name, e.message),extra=volume.tologdict())
- raise e
-
- return True
-
- def delete_record(self, volume):
- try:
- volume_name = volume.name
- syndicatelib.ensure_volume_absent( volume_name )
- except Exception, e:
- traceback.print_exc()
- logger.exception("Failed to erase volume '%s'" % volume_name,extra=volume.tologdict())
- raise e
-
-
-
-
-
-if __name__ == "__main__":
- sv = SyncVolume()
-
-
- # first, set all volumes to not-enacted so we can test
- for v in Volume.objects.all():
- v.enacted = None
- v.save()
-
- # NOTE: for resetting only
- if len(sys.argv) > 1 and sys.argv[1] == "reset":
- sys.exit(0)
-
- recs = sv.fetch_pending()
-
- for rec in recs:
- rc = sv.sync_record( rec )
- if not rc:
- print "\n\nFailed to sync %s\n\n" % (rec.name)
-
diff --git a/xos/synchronizers/syndicate/steps/sync_volumeaccessright.py b/xos/synchronizers/syndicate/steps/sync_volumeaccessright.py
deleted file mode 100644
index 9fca2a4..0000000
--- a/xos/synchronizers/syndicate/steps/sync_volumeaccessright.py
+++ /dev/null
@@ -1,116 +0,0 @@
-#!/usr/bin/env python
-
-import os
-import sys
-import base64
-import traceback
-
-if __name__ == "__main__":
- # for testing
- if os.getenv("OPENCLOUD_PYTHONPATH"):
- sys.path.append( os.getenv("OPENCLOUD_PYTHONPATH") )
- else:
- print >> sys.stderr, "No OPENCLOUD_PYTHONPATH variable set. Assuming that OpenCloud is in PYTHONPATH"
-
- os.environ.setdefault("DJANGO_SETTINGS_MODULE", "xos.settings")
-
-from django.db.models import F, Q
-from xos.config import Config
-from synchronizers.base.syncstep import SyncStep
-from core.models import Service
-
-import logging
-from logging import Logger
-logging.basicConfig( format='[%(levelname)s] [%(module)s:%(lineno)d] %(message)s' )
-logger = logging.getLogger()
-logger.setLevel( logging.INFO ,extra=o.tologdict())
-
-# point to planetstack
-if __name__ != "__main__":
- if os.getenv("OPENCLOUD_PYTHONPATH") is not None:
- sys.path.insert(0, os.getenv("OPENCLOUD_PYTHONPATH"))
- else:
- logger.warning("No OPENCLOUD_PYTHONPATH set; assuming your PYTHONPATH works")
-
-from services.syndicate_storage.models import VolumeAccessRight
-
-# syndicatelib will be in stes/..
-parentdir = os.path.join(os.path.dirname(__file__),"..")
-sys.path.insert(0,parentdir)
-
-import syndicatelib
-
-class SyncVolumeAccessRight(SyncStep):
- provides=[VolumeAccessRight]
- requested_interval=0
-
- def __init__(self, **args):
- SyncStep.__init__(self, **args)
-
- def sync_record(self, vac):
-
- syndicate_caps = "UNKNOWN" # for exception handling
-
- # get arguments
- config = syndicatelib.get_config()
- user_email = vac.owner_id.email
- volume_name = vac.volume.name
- syndicate_caps = syndicatelib.opencloud_caps_to_syndicate_caps( vac.cap_read_data, vac.cap_write_data, vac.cap_host_data )
-
- logger.info( "Sync VolumeAccessRight for (%s, %s)" % (user_email, volume_name) ,extra=vac.tologdict())
-
- # validate config
- try:
- RG_port = config.SYNDICATE_RG_DEFAULT_PORT
- observer_secret = config.SYNDICATE_OPENCLOUD_SECRET
- except Exception, e:
- traceback.print_exc()
- logger.error("syndicatelib config is missing SYNDICATE_RG_DEFAULT_PORT, SYNDICATE_OPENCLOUD_SECRET",extra=vac.tologdict())
- raise e
-
- # ensure the user exists and has credentials
- try:
- rc, user = syndicatelib.ensure_principal_exists( user_email, observer_secret, is_admin=False, max_UGs=1100, max_RGs=1 )
- assert rc is True, "Failed to ensure principal %s exists (rc = %s,%s)" % (user_email, rc, user)
- except Exception, e:
- traceback.print_exc()
- logger.error("Failed to ensure user '%s' exists" % user_email ,extra=vac.tologdict())
- raise e
-
- # make the access right for the user to create their own UGs, and provision an RG for this user that will listen on localhost.
- # the user will have to supply their own RG closure.
- try:
- rc = syndicatelib.setup_volume_access( user_email, volume_name, syndicate_caps, RG_port, observer_secret )
- assert rc is True, "Failed to setup volume access for %s in %s" % (user_email, volume_name)
-
- except Exception, e:
- traceback.print_exc()
- logger.error("Faoed to ensure user %s can access Volume %s with rights %s" % (user_email, volume_name, syndicate_caps),extra=vac.tologdict())
- raise e
-
- return True
-
- # Jude: this will simply go on to purge the object from
- # OpenCloud. The previous 'deleter' version was a no-op also.
- def delete_record(self, obj):
- pass
-
-
-if __name__ == "__main__":
-
- # first, set all VolumeAccessRights to not-enacted so we can test
- for v in VolumeAccessRight.objects.all():
- v.enacted = None
- v.save()
-
- # NOTE: for resetting only
- if len(sys.argv) > 1 and sys.argv[1] == "reset":
- sys.exit(0)
-
-
- sv = SyncVolumeAccessRight()
- recs = sv.fetch_pending()
-
- for rec in recs:
- sv.sync_record( rec )
-
diff --git a/xos/synchronizers/syndicate/steps/sync_volumeslice.py b/xos/synchronizers/syndicate/steps/sync_volumeslice.py
deleted file mode 100644
index 9af97f3..0000000
--- a/xos/synchronizers/syndicate/steps/sync_volumeslice.py
+++ /dev/null
@@ -1,158 +0,0 @@
-#!/usr/bin/env python
-
-import os
-import sys
-import base64
-import traceback
-
-if __name__ == "__main__":
- # for testing
- if os.getenv("OPENCLOUD_PYTHONPATH"):
- sys.path.append( os.getenv("OPENCLOUD_PYTHONPATH") )
- else:
- print >> sys.stderr, "No OPENCLOUD_PYTHONPATH variable set. Assuming that OpenCloud is in PYTHONPATH"
-
- os.environ.setdefault("DJANGO_SETTINGS_MODULE", "xos.settings")
-
-from django.db.models import F, Q
-from xos.config import Config
-from synchronizers.base.syncstep import SyncStep
-from core.models import Service, Slice
-
-import logging
-from logging import Logger
-logging.basicConfig( format='[%(levelname)s] [%(module)s:%(lineno)d] %(message)s' )
-logger = logging.getLogger()
-logger.setLevel( logging.INFO ,extra=o.tologdict())
-
-# point to planetstack
-if __name__ != "__main__":
- if os.getenv("OPENCLOUD_PYTHONPATH") is not None:
- sys.path.insert(0, os.getenv("OPENCLOUD_PYTHONPATH"))
- else:
- logger.warning("No OPENCLOUD_PYTHONPATH set; assuming your PYTHONPATH works")
-
-from services.syndicate_storage.models import VolumeSlice,VolumeAccessRight,Volume
-
-# syndicatelib will be in stes/..
-parentdir = os.path.join(os.path.dirname(__file__),"..")
-sys.path.insert(0,parentdir)
-
-import syndicatelib
-
-
-class SyncVolumeSlice(SyncStep):
- provides=[VolumeSlice]
- requested_interval=0
-
- def __init__(self, **args):
- SyncStep.__init__(self, **args)
-
- def sync_record(self, vs):
-
- logger.info("Sync VolumeSlice for (%s, %s)" % (vs.volume_id.name, vs.slice_id.name),extra=vs.tologdict())
-
- # extract arguments...
- user_email = vs.slice_id.creator.email
- slice_name = vs.slice_id.name
- volume_name = vs.volume_id.name
- syndicate_caps = syndicatelib.opencloud_caps_to_syndicate_caps( vs.cap_read_data, vs.cap_write_data, vs.cap_host_data )
- RG_port = vs.RG_portnum
- UG_port = vs.UG_portnum
- slice_secret = None
-
- config = syndicatelib.get_config()
- try:
- observer_secret = config.SYNDICATE_OPENCLOUD_SECRET
- RG_closure = config.SYNDICATE_RG_CLOSURE
- observer_pkey_path = config.SYNDICATE_PRIVATE_KEY
- syndicate_url = config.SYNDICATE_SMI_URL
-
- except Exception, e:
- traceback.print_exc()
- logger.error("syndicatelib config is missing one or more of the following: SYNDICATE_OPENCLOUD_SECRET, SYNDICATE_RG_CLOSURE, SYNDICATE_PRIVATE_KEY, SYNDICATE_SMI_URL",extra=vs.tologdict())
- raise e
-
- # get secrets...
- try:
- observer_pkey_pem = syndicatelib.get_private_key_pem( observer_pkey_path )
- assert observer_pkey_pem is not None, "Failed to load Observer private key"
-
- # get/create the slice secret
- slice_secret = syndicatelib.get_or_create_slice_secret( observer_pkey_pem, slice_name )
- assert slice_secret is not None, "Failed to get or create slice secret for %s" % slice_name
-
- except Exception, e:
- traceback.print_exc()
- logger.error("Failed to load secret credentials",extra=vs.tologdict())
- raise e
-
- # make sure there's a slice-controlled Syndicate user account for the slice owner
- slice_principal_id = syndicatelib.make_slice_principal_id( user_email, slice_name )
-
- try:
- rc, user = syndicatelib.ensure_principal_exists( slice_principal_id, observer_secret, is_admin=False, max_UGs=1100, max_RGs=1 )
- assert rc is True, "Failed to ensure principal %s exists (rc = %s,%s)" % (slice_principal_id, rc, user)
- except Exception, e:
- traceback.print_exc()
- logger.error('Failed to ensure slice user %s exists' % slice_principal_id,extra=vs.tologdict())
- raise e
-
- # grant the slice-owning user the ability to provision UGs in this Volume, and also provision for the user the (single) RG the slice will instantiate in each VM.
- try:
- rc = syndicatelib.setup_volume_access( slice_principal_id, volume_name, syndicate_caps, RG_port, observer_secret, RG_closure=RG_closure )
- assert rc is True, "Failed to set up Volume access for slice %s in %s" % (slice_principal_id, volume_name)
-
- except Exception, e:
- traceback.print_exc()
- logger.error("Failed to set up Volume access for slice %s in %s" % (slice_principal_id, volume_name),extra=vs.tologdict())
- raise e
-
- # generate and save slice credentials....
- try:
- slice_cred = syndicatelib.save_slice_credentials( observer_pkey_pem, syndicate_url, slice_principal_id, volume_name, slice_name, observer_secret, slice_secret, UG_port, existing_user=user )
- assert slice_cred is not None, "Failed to generate slice credential for %s in %s" % (slice_principal_id, volume_name )
-
- except Exception, e:
- traceback.print_exc()
- logger.error("Failed to generate slice credential for %s in %s" % (slice_principal_id, volume_name),extra=vs.tologdict())
- raise e
-
- # ... and push them all out.
- try:
- rc = syndicatelib.push_credentials_to_slice( slice_name, slice_cred )
- assert rc is True, "Failed to push credentials to slice %s for volume %s" % (slice_name, volume_name)
-
- except Exception, e:
- traceback.print_exc()
- logger.error("Failed to push slice credentials to %s for volume %s" % (slice_name, volume_name),extra=vs.tologdict())
- raise e
-
- return True
-
- # This method will simply cause the object to be purged from OpenCloud
- def delete_record(self, volume_slice):
- pass
-
-
-if __name__ == "__main__":
- sv = SyncVolumeSlice()
-
- # first, set all VolumeSlice to not-enacted so we can test
- for v in VolumeSlice.objects.all():
- v.enacted = None
- v.save()
-
- # NOTE: for resetting only
- if len(sys.argv) > 1 and sys.argv[1] == "reset":
- sys.exit(0)
-
- recs = sv.fetch_pending()
-
- for rec in recs:
- if rec.slice_id.creator is None:
- print "Ignoring slice %s, since it has no creator" % (rec.slice_id)
- continue
-
- sv.sync_record( rec )
-
diff --git a/xos/synchronizers/syndicate/stop.sh b/xos/synchronizers/syndicate/stop.sh
deleted file mode 100644
index f4a8e28..0000000
--- a/xos/synchronizers/syndicate/stop.sh
+++ /dev/null
@@ -1 +0,0 @@
-pkill -9 -f syndicate-backend.py
diff --git a/xos/synchronizers/syndicate/syndicate-backend.py b/xos/synchronizers/syndicate/syndicate-backend.py
deleted file mode 100644
index 9b53c77..0000000
--- a/xos/synchronizers/syndicate/syndicate-backend.py
+++ /dev/null
@@ -1,10 +0,0 @@
-#!/usr/bin/env python
-import os
-os.environ.setdefault("DJANGO_SETTINGS_MODULE", "xos.settings")
-from synchronizers.base.backend import Backend
-
-if __name__ == '__main__':
-
- backend = Backend()
- backend.run()
-
diff --git a/xos/synchronizers/syndicate/syndicate_synchronizer_config b/xos/synchronizers/syndicate/syndicate_synchronizer_config
deleted file mode 100644
index 7c9d2d2..0000000
--- a/xos/synchronizers/syndicate/syndicate_synchronizer_config
+++ /dev/null
@@ -1,35 +0,0 @@
-
-[plc]
-name=plc
-deployment=VICCI
-
-[db]
-name=xos
-#user=plstackuser
-#password=2uMDYtJK
-user=postgres
-password=password
-host=localhost
-port=5432
-
-[api]
-host=128.112.171.237
-port=8000
-ssl_key=None
-ssl_cert=None
-ca_ssl_cert=None
-ratelimit_enabled=0
-omf_enabled=0
-mail_support_address=support@localhost
-nova_enabled=True
-
-[observer]
-dependency_graph=/opt/xos/synchronizers/syndicate/model-deps
-steps_dir=/opt/xos/synchronizers/syndicate/steps
-deleters_dir=/opt/xos/synchronizers/syndicate/deleters
-log_file=console
-driver=None
-
-[feefie]
-client_id='vicci_dev_central'
-user_id='pl'
diff --git a/xos/synchronizers/syndicate/syndicatelib.py b/xos/synchronizers/syndicate/syndicatelib.py
deleted file mode 100644
index 56bd120..0000000
--- a/xos/synchronizers/syndicate/syndicatelib.py
+++ /dev/null
@@ -1,1353 +0,0 @@
-#!/usr/bin/env python
-
-"""
-Define some common methods for the Syndicate observer.
-"""
-import os
-import sys
-import random
-import json
-import time
-import requests
-import traceback
-import base64
-import BaseHTTPServer
-import setproctitle
-import threading
-import urllib
-
-from Crypto.Hash import SHA256 as HashAlg
-from Crypto.PublicKey import RSA as CryptoKey
-from Crypto import Random
-from Crypto.Signature import PKCS1_PSS as CryptoSigner
-
-import logging
-from logging import Logger
-logging.basicConfig( format='[%(levelname)s] [%(module)s:%(lineno)d] %(message)s' )
-logger = logging.getLogger()
-logger.setLevel( logging.INFO )
-
-# get config package
-import syndicatelib_config.config as CONFIG
-
-# get the Syndicate modules
-import syndicate
-
-import syndicate.client.bin.syntool as syntool
-import syndicate.client.common.msconfig as msconfig
-import syndicate.client.common.api as api
-import syndicate.util.storage as syndicate_storage
-import syndicate.util.watchdog as syndicate_watchdog
-import syndicate.util.daemonize as syndicate_daemon
-import syndicate.util.crypto as syndicate_crypto
-import syndicate.util.provisioning as syndicate_provisioning
-import syndicate.syndicate as c_syndicate
-
-# for testing
-TESTING = False
-class FakeObject(object):
- def __init__(self):
- pass
-
-if os.getenv("OPENCLOUD_PYTHONPATH") is not None:
- sys.path.insert(0, os.getenv("OPENCLOUD_PYTHONPATH"))
-else:
- logger.warning("No OPENCLOUD_PYTHONPATH set. Assuming Syndicate models are in your PYTHONPATH")
-
-try:
- os.environ.setdefault("DJANGO_SETTINGS_MODULE", "xos.settings")
-
- # get our models
- import services.syndicate_storage.models as models
-
- # get OpenCloud models
- from core.models import Slice,Instance
-
- from django.core.exceptions import ObjectDoesNotExist
- from django.db import IntegrityError
-
-except ImportError, ie:
- logger.warning("Failed to import models; some tests will not work")
-
- # create a fake "models" package that has exactly the members we need for testing.
- models = FakeObject()
- models.Volume = FakeObject()
- models.Volume.CAP_READ_DATA = 1
- models.Volume.CAP_WRITE_DATA = 2
- models.Volume.CAP_HOST_DATA = 4
-
- TESTING = True
-
-
-#-------------------------------
-class SyndicateObserverError( Exception ):
- pass
-
-#-------------------------------
-def get_config():
- """
- Return the imported config
- """
- return CONFIG
-
-
-#-------------------------------
-def make_openid_url( email ):
- """
- Generate an OpenID identity URL from an email address.
- """
- return os.path.join( CONFIG.SYNDICATE_OPENID_TRUSTROOT, "id", email )
-
-
-#-------------------------------
-def connect_syndicate( username=CONFIG.SYNDICATE_OPENCLOUD_USER, password=CONFIG.SYNDICATE_OPENCLOUD_PASSWORD, user_pkey_pem=CONFIG.SYNDICATE_OPENCLOUD_PKEY ):
- """
- Connect to the OpenCloud Syndicate SMI, using the OpenCloud user credentials.
- """
- debug = True
- if hasattr(CONFIG, "DEBUG"):
- debug = CONFIG.DEBUG
-
- client = syntool.Client( username, CONFIG.SYNDICATE_SMI_URL,
- password=password,
- user_pkey_pem=user_pkey_pem,
- debug=debug )
-
- return client
-
-
-#-------------------------------
-def opencloud_caps_to_syndicate_caps( cap_read, cap_write, cap_host ):
- """
- Convert OpenCloud capability bits from the UI into Syndicate's capability bits.
- """
- syn_caps = 0
-
- if cap_read:
- syn_caps |= (msconfig.GATEWAY_CAP_READ_DATA | msconfig.GATEWAY_CAP_READ_METADATA)
- if cap_write:
- syn_caps |= (msconfig.GATEWAY_CAP_WRITE_DATA | msconfig.GATEWAY_CAP_WRITE_METADATA)
- if cap_host:
- syn_caps |= (msconfig.GATEWAY_CAP_COORDINATE)
-
- return syn_caps
-
-#-------------------------------
-def ensure_user_exists( user_email, **user_kw ):
- """
- Given an OpenCloud user, ensure that the corresponding
- Syndicate user exists on the MS. This method does NOT
- create any OpenCloud-specific data.
-
- Return the (created, user), where created==True if the user
- was created and created==False if the user was read.
- Raise an exception on error.
- """
-
- client = connect_syndicate()
- user_openid_url = make_openid_url( user_email )
-
- return syndicate_provisioning.ensure_user_exists( client, user_email, user_openid_url, **user_kw )
-
-
-#-------------------------------
-def ensure_user_absent( user_email ):
- """
- Ensure that a given OpenCloud user's associated Syndicate user record
- has been deleted. This method does NOT delete any OpenCloud-specific data.
-
- Returns True on success
- Raises an exception on error
- """
-
- client = connect_syndicate()
-
- return client.delete_user( user_email )
-
-
-#-------------------------------
-def make_volume_principal_id( user_email, volume_name ):
- """
- Create a principal id for a Volume owner.
- """
-
- volume_name_safe = urllib.quote( volume_name )
-
- return "volume_%s.%s" % (volume_name_safe, user_email)
-
-
-#-------------------------------
-def make_slice_principal_id( user_email, slice_name ):
- """
- Create a principal id for a slice owner.
- """
-
- slice_name_safe = urllib.quote( slice_name )
-
- return "slice_%s.%s" % (slice_name, user_email)
-
-
-#-------------------------------
-def ensure_principal_exists( user_email, observer_secret, **user_kw ):
- """
- Ensure that a Syndicate user exists, as well as its OpenCloud-specific data.
-
- Return (True, (None OR user)) on success. Returns a user if the user was created.
- Return (False, None) on error
- """
-
- try:
- created, new_user = ensure_user_exists( user_email, **user_kw )
- except Exception, e:
- traceback.print_exc()
- logger.error("Failed to ensure user '%s' exists" % user_email )
- return (False, None)
-
- # if we created a new user, then save its (sealed) credentials to the Django DB
- if created:
- try:
- rc = put_principal_data( user_email, observer_secret, new_user['signing_public_key'], new_user['signing_private_key'] )
- assert rc == True, "Failed to save SyndicatePrincipal"
- except Exception, e:
- traceback.print_exc()
- logger.error("Failed to save private key for principal %s" % (user_email))
- return (False, None)
-
- return (True, new_user)
-
-
-
-#-------------------------------
-def ensure_principal_absent( user_email ):
- """
- Ensure that a Syndicate user does not exists, and remove the OpenCloud-specific data.
-
- Return True on success.
- """
-
- ensure_user_absent( user_email )
- delete_principal_data( user_email )
- return True
-
-#-------------------------------
-def ensure_volume_exists( user_email, opencloud_volume, user=None ):
- """
- Given the email address of a user, ensure that the given
- Volume exists and is owned by that user.
- Do not try to ensure that the user exists.
-
- Return the Volume if we created it, or return None if we did not.
- Raise an exception on error.
- """
- client = connect_syndicate()
-
- try:
- volume = client.read_volume( opencloud_volume.name )
- except Exception, e:
- # transport error
- logger.exception(e)
- raise e
-
- if volume is None:
- # the volume does not exist....try to create it
- vol_name = opencloud_volume.name
- vol_blocksize = opencloud_volume.blocksize
- vol_description = opencloud_volume.description
- vol_private = opencloud_volume.private
- vol_archive = opencloud_volume.archive
- vol_default_gateway_caps = opencloud_caps_to_syndicate_caps( opencloud_volume.cap_read_data, opencloud_volume.cap_write_data, opencloud_volume.cap_host_data )
-
- try:
- vol_info = client.create_volume( user_email, vol_name, vol_description, vol_blocksize,
- private=vol_private,
- archive=vol_archive,
- active=True,
- default_gateway_caps=vol_default_gateway_caps,
- store_private_key=False,
- metadata_private_key="MAKE_METADATA_KEY" )
-
- except Exception, e:
- # transport error
- logger.exception(e)
- raise e
-
- else:
- # successfully created the volume!
- return vol_info
-
- else:
-
- # volume already exists. Verify its owned by this user.
- if user is None:
- try:
- user = client.read_user( volume['owner_id'] )
- except Exception, e:
- # transport error, or user doesn't exist (either is unacceptable)
- logger.exception(e)
- raise e
-
- if user is None or user['email'] != user_email:
- raise Exception("Volume '%s' already exists, but is NOT owned by '%s'" % (opencloud_volume.name, user_email) )
-
- # we're good!
- return None
-
-
-#-------------------------------
-def ensure_volume_absent( volume_name ):
- """
- Given an OpenCloud volume, ensure that the corresponding Syndicate
- Volume does not exist.
- """
-
- client = connect_syndicate()
-
- # this is idempotent, and returns True even if the Volume doesn't exist
- return client.delete_volume( volume_name )
-
-
-#-------------------------------
-def update_volume( opencloud_volume ):
- """
- Update a Syndicate Volume from an OpenCloud Volume model.
- Fails if the Volume does not exist in Syndicate.
- """
-
- client = connect_syndicate()
-
- vol_name = opencloud_volume.name
- vol_description = opencloud_volume.description
- vol_private = opencloud_volume.private
- vol_archive = opencloud_volume.archive
- vol_default_gateway_caps = opencloud_caps_to_syndicate_caps( opencloud_volume.cap_read_data, opencloud_volume.cap_write_data, opencloud_volume.cap_host_data )
-
- try:
- rc = client.update_volume( vol_name,
- description=vol_description,
- private=vol_private,
- archive=vol_archive,
- default_gateway_caps=vol_default_gateway_caps )
-
- if not rc:
- raise Exception("update_volume(%s) failed!" % vol_name )
-
- except Exception, e:
- # transort or method error
- logger.exception(e)
- return False
-
- else:
- return True
-
-
-#-------------------------------
-def ensure_volume_access_right_exists( user_email, volume_name, caps, allowed_gateways=[msconfig.GATEWAY_TYPE_UG] ):
- """
- Ensure that a particular user has particular access to a particular volume.
- Do not try to ensure that the user or volume exist, however!
- """
- client = connect_syndicate()
- return syndicate_provisioning.ensure_volume_access_right_exists( client, user_email, volume_name, caps, allowed_gateways )
-
-#-------------------------------
-def ensure_volume_access_right_absent( user_email, volume_name ):
- """
- Ensure that acess to a particular volume is revoked.
- """
- client = connect_syndicate()
- return syndicate_provisioning.ensure_volume_access_right_absent( client, user_email, volume_name )
-
-
-#-------------------------------
-def setup_volume_access( user_email, volume_name, caps, RG_port, slice_secret, RG_closure=None ):
- """
- Set up the Volume to allow the slice to provision UGs in it, and to fire up RGs.
- * create the Volume Access Right for the user, so (s)he can create Gateways.
- * provision a single Replica Gateway, serving on localhost.
- """
- client = connect_syndicate()
-
- try:
- rc = ensure_volume_access_right_exists( user_email, volume_name, caps )
- assert rc is True, "Failed to create access right for %s in %s" % (user_email, volume_name)
-
- except Exception, e:
- logger.exception(e)
- return False
-
- RG_name = syndicate_provisioning.make_gateway_name( "OpenCloud", "RG", volume_name, "localhost" )
- RG_key_password = syndicate_provisioning.make_gateway_private_key_password( RG_name, slice_secret )
-
- try:
- rc = syndicate_provisioning.ensure_RG_exists( client, user_email, volume_name, RG_name, "localhost", RG_port, RG_key_password, closure=RG_closure )
- except Exception, e:
- logger.exception(e)
- return False
-
- return True
-
-
-#-------------------------------
-def teardown_volume_access( user_email, volume_name ):
- """
- Revoke access to a Volume for a User.
- * remove the user's Volume Access Right
- * remove the use'rs gateways
- """
- client = connect_syndicate()
-
- # block the user from creating more gateways, and delete the gateways
- try:
- rc = client.remove_user_from_volume( user_email, volume_name )
- assert rc is True, "Failed to remove access right for %s in %s" % (user_email, volume_name)
-
- except Exception, e:
- logger.exception(e)
- return False
-
- return True
-
-
-#-------------------------------
-def create_sealed_and_signed_blob( private_key_pem, secret, data ):
- """
- Create a sealed and signed message.
- """
-
- # seal it with the password
- logger.info("Sealing credential data")
-
- rc, sealed_data = c_syndicate.password_seal( data, secret )
- if rc != 0:
- logger.error("Failed to seal data with the secret, rc = %s" % rc)
- return None
-
- msg = syndicate_crypto.sign_and_serialize_json( private_key_pem, sealed_data )
- if msg is None:
- logger.error("Failed to sign credential")
- return None
-
- return msg
-
-
-#-------------------------------
-def verify_and_unseal_blob( public_key_pem, secret, blob_data ):
- """
- verify and unseal a serialized string of JSON
- """
-
- # verify it
- rc, sealed_data = syndicate_crypto.verify_and_parse_json( public_key_pem, blob_data )
- if rc != 0:
- logger.error("Failed to verify and parse blob, rc = %s" % rc)
- return None
-
- logger.info("Unsealing credential data")
-
- rc, data = c_syndicate.password_unseal( sealed_data, secret )
- if rc != 0:
- logger.error("Failed to unseal blob, rc = %s" % rc )
- return None
-
- return data
-
-
-#-------------------------------
-def create_volume_list_blob( private_key_pem, slice_secret, volume_list ):
- """
- Create a sealed volume list, signed with the private key.
- """
- list_data = {
- "volumes": volume_list
- }
-
- list_data_str = json.dumps( list_data )
-
- msg = create_sealed_and_signed_blob( private_key_pem, slice_secret, list_data_str )
- if msg is None:
- logger.error("Failed to seal volume list")
- return None
-
- return msg
-
-
-#-------------------------------
-def create_slice_credential_blob( private_key_pem, slice_name, slice_secret, syndicate_url, volume_name, volume_owner, UG_port, user_pkey_pem ):
- """
- Create a sealed, signed, encoded slice credentials blob.
- """
-
- # create and serialize the data
- cred_data = {
- "syndicate_url": syndicate_url,
- "volume_name": volume_name,
- "volume_owner": volume_owner,
- "slice_name": slice_name,
- "slice_UG_port": UG_port,
- "principal_pkey_pem": user_pkey_pem,
- }
-
- cred_data_str = json.dumps( cred_data )
-
- msg = create_sealed_and_signed_blob( private_key_pem, slice_secret, cred_data_str )
- if msg is None:
- logger.error("Failed to seal volume list")
- return None
-
- return msg
-
-
-#-------------------------------
-def put_principal_data( user_email, observer_secret, public_key_pem, private_key_pem ):
- """
- Seal and store the principal's private key into the database, in a SyndicatePrincipal object,
- so the instance-side Syndicate daemon syndicated.py can get them later.
- Overwrite an existing principal if one exists.
- """
-
- sealed_private_key = create_sealed_and_signed_blob( private_key_pem, observer_secret, private_key_pem )
- if sealed_private_key is None:
- return False
-
- try:
- sp = models.SyndicatePrincipal( sealed_private_key=sealed_private_key, public_key_pem=public_key_pem, principal_id=user_email )
- sp.save()
- except IntegrityError, e:
- logger.error("WARN: overwriting existing principal %s" % user_email)
- sp.delete()
- sp.save()
-
- return True
-
-
-#-------------------------------
-def delete_principal_data( user_email ):
- """
- Delete an OpenCloud SyndicatePrincipal object.
- """
-
- sp = get_principal_data( user_email )
- if sp is not None:
- sp.delete()
-
- return True
-
-
-#-------------------------------
-def get_principal_data( user_email ):
- """
- Get a SyndicatePrincipal record from the database
- """
-
- try:
- sp = models.SyndicatePrincipal.objects.get( principal_id=user_email )
- return sp
- except ObjectDoesNotExist:
- logger.error("No SyndicatePrincipal record for %s" % user_email)
- return None
-
-
-
-#-------------------------------
-def get_principal_pkey( user_email, observer_secret ):
- """
- Fetch and unseal the private key of a SyndicatePrincipal.
- """
-
- sp = get_principal_data( user_email )
- if sp is None:
- logger.error("Failed to find private key for principal %s" % user_email )
- return None
-
- public_key_pem = sp.public_key_pem
- sealed_private_key_pem = sp.sealed_private_key
-
- # unseal
- private_key_pem = verify_and_unseal_blob(public_key_pem, observer_secret, sealed_private_key_pem)
- if private_key_pem is None:
- logger.error("Failed to unseal private key")
-
- return private_key_pem
-
-
-#-------------------------------
-def get_private_key_pem( pkey_path ):
- """
- Get a private key from storage, PEM-encoded.
- """
-
- # get the OpenCloud private key
- observer_pkey = syndicate_storage.read_private_key( pkey_path )
- if observer_pkey is None:
- logger.error("Failed to load Observer private key")
- return None
-
- observer_pkey_pem = observer_pkey.exportKey()
-
- return observer_pkey_pem
-
-
-#-------------------------------
-def encrypt_slice_secret( observer_pkey_pem, slice_secret ):
- """
- Encrypt and serialize the slice secret with the Observer private key
- """
-
- # get the public key
- try:
- observer_pubkey_pem = CryptoKey.importKey( observer_pkey_pem ).publickey().exportKey()
- except Exception, e:
- logger.exception(e)
- logger.error("Failed to derive public key from private key")
- return None
-
- # encrypt the data
- rc, sealed_slice_secret = c_syndicate.encrypt_data( observer_pkey_pem, observer_pubkey_pem, slice_secret )
-
- if rc != 0:
- logger.error("Failed to encrypt slice secret")
- return None
-
- sealed_slice_secret_b64 = base64.b64encode( sealed_slice_secret )
-
- return sealed_slice_secret_b64
-
-
-#-------------------------------
-def decrypt_slice_secret( observer_pkey_pem, sealed_slice_secret_b64 ):
- """
- Unserialize and decrypt a slice secret
- """
-
- # get the public key
- try:
- observer_pubkey_pem = CryptoKey.importKey( observer_pkey_pem ).publickey().exportKey()
- except Exception, e:
- logger.exception(e)
- logger.error("Failed to derive public key from private key")
- return None
-
- sealed_slice_secret = base64.b64decode( sealed_slice_secret_b64 )
-
- # decrypt it
- rc, slice_secret = c_syndicate.decrypt_data( observer_pubkey_pem, observer_pkey_pem, sealed_slice_secret )
-
- if rc != 0:
- logger.error("Failed to decrypt '%s', rc = %d" % (sealed_slice_secret_b64, rc))
- return None
-
- return slice_secret
-
-
-#--------------------------------
-def get_slice_secret( observer_pkey_pem, slice_name, slice_fk=None ):
- """
- Get the shared secret for a slice.
- """
-
- ss = None
-
- # get the sealed slice secret from Django
- try:
- if slice_fk is not None:
- ss = models.SliceSecret.objects.get( slice_id=slice_fk )
- else:
- ss = models.SliceSecret.objects.get( slice_id__name=slice_name )
- except ObjectDoesNotExist, e:
- logger.error("Failed to load slice secret for (%s, %s)" % (slice_fk, slice_name) )
- return None
-
- return ss.secret
-
-
-#-------------------------------
-def put_slice_secret( observer_pkey_pem, slice_name, slice_secret, slice_fk=None, opencloud_slice=None ):
- """
- Put the shared secret for a slice, encrypting it first.
- """
-
- ss = None
-
- if opencloud_slice is None:
- # look up the slice
- try:
- if slice_fk is None:
- opencloud_slice = models.Slice.objects.get( name=slice_name )
- else:
- opencloud_slice = models.Slice.objects.get( id=slice_fk.id )
- except Exception, e:
- logger.exception(e)
- logger.error("Failed to load slice (%s, %s)" % (slice_fk, slice_name) )
- return False
-
- ss = models.SliceSecret( slice_id=opencloud_slice, secret=slice_secret )
-
- ss.save()
-
- return True
-
-
-#-------------------------------
-def get_or_create_slice_secret( observer_pkey_pem, slice_name, slice_fk=None ):
- """
- Get a slice secret if it already exists, or generate a slice secret if one does not.
- """
-
- slice_secret = get_slice_secret( observer_pkey_pem, slice_name, slice_fk=slice_fk )
- if slice_secret is None or len(slice_secret) == 0:
-
- # generate a slice secret
- slice_secret = "".join( random.sample("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789", 32) )
-
- # store it
- rc = put_slice_secret( observer_pkey_pem, slice_name, slice_secret, slice_fk=slice_fk )
-
- if not rc:
- raise SyndicateObserverError("Failed to create slice secret for (%s, %s)" % (slice_fk, slice_name))
-
- return slice_secret
-
-
-#-------------------------------
-def generate_slice_credentials( observer_pkey_pem, syndicate_url, user_email, volume_name, slice_name, observer_secret, slice_secret, UG_port, existing_user=None ):
- """
- Generate and return the set of credentials to be sent off to the slice VMs.
- exisitng_user is a Syndicate user, as a dictionary.
-
- Return None on failure
- """
-
- # get the user's private key
- logger.info("Obtaining private key for %s" % user_email)
-
- # it might be in the existing_user...
- user_pkey_pem = None
- if existing_user is not None:
- user_pkey_pem = existing_user.get('signing_private_key', None)
-
- # no luck?
- if user_pkey_pem is None:
- try:
- # get it from Django DB
- user_pkey_pem = get_principal_pkey( user_email, observer_secret )
- assert user_pkey_pem is not None, "No private key for %s" % user_email
-
- except:
- traceback.print_exc()
- logger.error("Failed to get private key; cannot generate credentials for %s in %s" % (user_email, volume_name) )
- return None
-
- # generate a credetials blob
- logger.info("Generating credentials for %s's slice" % (user_email))
- try:
- creds = create_slice_credential_blob( observer_pkey_pem, slice_name, slice_secret, syndicate_url, volume_name, user_email, UG_port, user_pkey_pem )
- assert creds is not None, "Failed to create credentials for %s" % user_email
-
- except:
- traceback.print_exc()
- logger.error("Failed to generate credentials for %s in %s" % (user_email, volume_name))
- return None
-
- return creds
-
-
-#-------------------------------
-def save_slice_credentials( observer_pkey_pem, syndicate_url, user_email, volume_name, slice_name, observer_secret, slice_secret, UG_port, existing_user=None ):
- """
- Create and save a credentials blob to a VolumeSlice.
- Return the creds on success.
- Return None on failure
- """
-
- creds = generate_slice_credentials( observer_pkey_pem, syndicate_url, user_email, volume_name, slice_name, observer_secret, slice_secret, UG_port, existing_user=existing_user )
- ret = None
-
- if creds is not None:
- # save it
- vs = get_volumeslice( volume_name, slice_name )
-
- if vs is not None:
- vs.credentials_blob = creds
- vs.save()
-
- # success!
- ret = creds
- else:
- logger.error("Failed to look up VolumeSlice(%s, %s)" % (volume_name, slice_name))
-
- else:
- logger.error("Failed to generate credentials for %s, %s" % (volume_name, slice_name))
-
- return ret
-
-
-#-------------------------------
-def get_volumeslice_volume_names( slice_name ):
- """
- Get the list of Volume names from the datastore.
- """
- try:
- all_vs = models.VolumeSlice.objects.filter( slice_id__name = slice_name )
- volume_names = []
- for vs in all_vs:
- volume_names.append( vs.volume_id.name )
-
- return volume_names
- except Exception, e:
- logger.exception(e)
- logger.error("Failed to query datastore for volumes mounted in %s" % slice_name)
- return None
-
-
-#-------------------------------
-def get_volumeslice( volume_name, slice_name ):
- """
- Get a volumeslice record from the datastore.
- """
- try:
- vs = models.VolumeSlice.objects.get( volume_id__name = volume_name, slice_id__name = slice_name )
- return vs
- except Exception, e:
- logger.exception(e)
- logger.error("Failed to query datastore for volumes (mounted in %s)" % (slice_name if (slice_name is not None or len(slice_name) > 0) else "UNKNOWN"))
- return None
-
-
-#-------------------------------
-def do_push( instance_hosts, portnum, payload ):
- """
- Push a payload to a list of instances.
- NOTE: this has to be done in one go, since we can't import grequests
- into the global namespace (without wrecking havoc on the credential server),
- but it has to stick around for the push to work.
- """
-
- global TESTING, CONFIG
-
- from gevent import monkey
-
- if TESTING:
- monkey.patch_all()
-
- else:
- # make gevents runnabale from multiple threads (or Django will complain)
- monkey.patch_all(socket=True, dns=True, time=True, select=True, thread=False, os=True, ssl=True, httplib=False, aggressive=True)
-
- import grequests
-
- # fan-out
- requests = []
- for sh in instance_hosts:
- rs = grequests.post( "http://" + sh + ":" + str(portnum), data={"observer_message": payload}, timeout=getattr(CONFIG, "SYNDICATE_HTTP_PUSH_TIMEOUT", 60) )
- requests.append( rs )
-
- # fan-in
- responses = grequests.map( requests )
-
- assert len(responses) == len(requests), "grequests error: len(responses) != len(requests)"
-
- for i in xrange(0,len(requests)):
- resp = responses[i]
- req = requests[i]
-
- if resp is None:
- logger.error("Failed to connect to %s" % (req.url))
- continue
-
- # verify they all worked
- if resp.status_code != 200:
- logger.error("Failed to POST to %s, status code = %s" % (resp.url, resp.status_code))
- continue
-
- return True
-
-
-#-------------------------------
-def get_slice_hostnames( slice_name ):
- """
- Query the Django DB and get the list of hosts running in a slice.
- """
-
- openstack_slice = Slice.objects.get( name=slice_name )
- if openstack_slice is None:
- logger.error("No such slice '%s'" % slice_name)
- return None
-
- hostnames = [s.node.name for s in openstack_slice.instances.all()]
-
- return hostnames
-
-
-#-------------------------------
-def push_credentials_to_slice( slice_name, payload ):
- """
- Push a credentials payload to the VMs in a slice.
- """
- hostnames = get_slice_hostnames( slice_name )
- return do_push( hostnames, CONFIG.SYNDICATE_SLIVER_PORT, payload )
-
-
-#-------------------------------
-class CredentialServerHandler( BaseHTTPServer.BaseHTTPRequestHandler ):
- """
- HTTP server handler that allows syndicated.py instances to poll
- for volume state.
-
- NOTE: this is a fall-back mechanism. The observer should push new
- volume state to the slices' instances. However, if that fails, the
- instances are configured to poll for volume state periodically. This
- server allows them to do just that.
-
- Responses:
- GET /<slicename> -- Reply with the signed sealed list of volume names, encrypted by the slice secret
- GET /<slicename>/<volumename> -- Reply with the signed sealed volume access credentials, encrypted by the slice secret
-
- !!! TEMPORARY !!!
- GET /<slicename>/SYNDICATE_SLICE_SECRET -- Reply with the slice secret (TEMPORARY)
-
-
- NOTE: We want to limit who can learn which Volumes a slice can access, so we'll seal its instances'
- credentials with the SliceSecret secret. The instances (which have the slice-wide secret) can then decrypt it.
- However, sealing the listing is a time-consuming process (on the order of 10s), so we only want
- to do it when we have to. Since *anyone* can ask for the ciphertext of the volume list,
- we will cache the list ciphertext for each slice for a long-ish amount of time, so we don't
- accidentally DDoS this server. This necessarily means that the instance might see a stale
- volume listing, but that's okay, since the Observer is eventually consistent anyway.
- """
-
- cached_volumes_json = {} # map slice_name --> (volume name, timeout)
- cached_volumes_json_lock = threading.Lock()
-
- CACHED_VOLUMES_JSON_LIFETIME = 3600 # one hour
-
- SLICE_SECRET_NAME = "SYNDICATE_SLICE_SECRET"
-
- def parse_request_path( self, path ):
- """
- Parse the URL path into a slice name and (possibly) a volume name or SLICE_SECRET_NAME
- """
- path_parts = path.strip("/").split("/")
-
- if len(path_parts) == 0:
- # invalid
- return (None, None)
-
- if len(path_parts) > 2:
- # invalid
- return (None, None)
-
- slice_name = path_parts[0]
- if len(slice_name) == 0:
- # empty string is invalid
- return (None, None)
-
- volume_name = None
-
- if len(path_parts) > 1:
- volume_name = path_parts[1]
-
- return slice_name, volume_name
-
-
- def reply_data( self, data, datatype="application/json" ):
- """
- Give back a 200 response with data.
- """
- self.send_response( 200 )
- self.send_header( "Content-Type", datatype )
- self.send_header( "Content-Length", len(data) )
- self.end_headers()
-
- self.wfile.write( data )
- return
-
-
- def get_volumes_message( self, private_key_pem, observer_secret, slice_name ):
- """
- Get the json-ized list of volumes this slice is attached to.
- Check the cache, evict stale data if necessary, and on miss,
- regenerate the slice volume list.
- """
-
- # block the cache.
- # NOTE: don't release the lock until we've generated credentials.
- # Chances are, there's a thundering herd of instances coming online.
- # Block them all until we've generated their slice's credentials,
- # and then serve them the cached one.
-
- self.cached_volumes_json_lock.acquire()
-
- ret = None
- volume_list_json, cache_timeout = self.cached_volumes_json.get( slice_name, (None, None) )
-
- if (cache_timeout is not None) and cache_timeout < time.time():
- # expired
- volume_list_json = None
-
- if volume_list_json is None:
- # generate a new list and cache it.
-
- volume_names = get_volumeslice_volume_names( slice_name )
- if volume_names is None:
- # nothing to do...
- ret = None
-
- else:
- # get the slice secret
- slice_secret = get_slice_secret( private_key_pem, slice_name )
-
- if slice_secret is None:
- # no such slice
- logger.error("No slice secret for %s" % slice_name)
- ret = None
-
- else:
- # seal and sign
- ret = create_volume_list_blob( private_key_pem, slice_secret, volume_names )
-
- # cache this
- if ret is not None:
- self.cached_volumes_json[ slice_name ] = (ret, time.time() + self.CACHED_VOLUMES_JSON_LIFETIME )
-
- else:
- # hit the cache
- ret = volume_list_json
-
- self.cached_volumes_json_lock.release()
-
- return ret
-
-
- def do_GET( self ):
- """
- Handle one GET
- """
- slice_name, volume_name = self.parse_request_path( self.path )
-
- # valid request?
- if volume_name is None and slice_name is None:
- self.send_error( 400 )
-
- # slice secret request?
- elif volume_name == self.SLICE_SECRET_NAME and slice_name is not None:
-
- # get the slice secret
- ret = get_slice_secret( self.server.private_key_pem, slice_name )
-
- if ret is not None:
- self.reply_data( ret )
- return
- else:
- self.send_error( 404 )
-
- # volume list request?
- elif volume_name is None and slice_name is not None:
-
- # get the list of volumes for this slice
- ret = self.get_volumes_message( self.server.private_key_pem, self.server.observer_secret, slice_name )
-
- if ret is not None:
- self.reply_data( ret )
- return
- else:
- self.send_error( 404 )
-
- # volume credential request?
- elif volume_name is not None and slice_name is not None:
-
- # get the VolumeSlice record
- vs = get_volumeslice( volume_name, slice_name )
- if vs is None:
- # not found
- self.send_error( 404 )
- return
-
- else:
- ret = vs.credentials_blob
- if ret is not None:
- self.reply_data( vs.credentials_blob )
- else:
- # not generated???
- print ""
- print vs
- print ""
- self.send_error( 503 )
- return
-
- else:
- # shouldn't get here...
- self.send_error( 500 )
- return
-
-
-#-------------------------------
-class CredentialServer( BaseHTTPServer.HTTPServer ):
-
- def __init__(self, private_key_pem, observer_secret, server, req_handler ):
- self.private_key_pem = private_key_pem
- self.observer_secret = observer_secret
- BaseHTTPServer.HTTPServer.__init__( self, server, req_handler )
-
-
-#-------------------------------
-def credential_server_spawn( old_exit_status ):
- """
- Start our credential server (i.e. in a separate process, started by the watchdog)
- """
-
- setproctitle.setproctitle( "syndicate-credential-server" )
-
- private_key = syndicate_storage.read_private_key( CONFIG.SYNDICATE_PRIVATE_KEY )
- if private_key is None:
- # exit code 255 will be ignored...
- logger.error("Cannot load private key. Exiting...")
- sys.exit(255)
-
- logger.info("Starting Syndicate Observer credential server on port %s" % CONFIG.SYNDICATE_HTTP_PORT)
-
- srv = CredentialServer( private_key.exportKey(), CONFIG.SYNDICATE_OPENCLOUD_SECRET, ('', CONFIG.SYNDICATE_HTTP_PORT), CredentialServerHandler)
- srv.serve_forever()
-
-
-#-------------------------------
-def ensure_credential_server_running( foreground=False, run_once=False ):
- """
- Instantiate our credential server and keep it running.
- """
-
- # is the watchdog running?
- pids = syndicate_watchdog.find_by_attrs( "syndicate-credential-server-watchdog", {} )
- if len(pids) > 0:
- # it's running
- return True
-
- if foreground:
- # run in foreground
-
- if run_once:
- return credential_server_spawn( 0 )
-
- else:
- return syndicate_watchdog.main( credential_server_spawn, respawn_exit_statuses=range(1,254) )
-
-
- # not running, and not foregrounding. fork a new one
- try:
- watchdog_pid = os.fork()
- except OSError, oe:
- logger.error("Failed to fork, errno = %s" % oe.errno)
- return False
-
- if watchdog_pid != 0:
-
- # child--become watchdog
- setproctitle.setproctitle( "syndicate-credential-server-watchdog" )
-
- if run_once:
- syndicate_daemon.daemonize( lambda: credential_server_spawn(0), logfile_path=getattr(CONFIG, "SYNDICATE_HTTP_LOGFILE", None) )
-
- else:
- syndicate_daemon.daemonize( lambda: syndicate_watchdog.main( credential_server_spawn, respawn_exit_statuses=range(1,254) ), logfile_path=getattr(CONFIG, "SYNDICATE_HTTP_LOGFILE", None) )
-
-
-#-------------------------------
-# Begin functional tests.
-# Any method starting with ft_ is a functional test.
-#-------------------------------
-
-#-------------------------------
-def ft_syndicate_access():
- """
- Functional tests for ensuring objects exist and don't exist in Syndicate.
- """
-
- fake_user = FakeObject()
- fake_user.email = "fakeuser@opencloud.us"
-
- print "\nensure_user_exists(%s)\n" % fake_user.email
- ensure_user_exists( fake_user.email, is_admin=False, max_UGs=1100, max_RGs=1 )
-
- print "\nensure_user_exists(%s)\n" % fake_user.email
- ensure_user_exists( fake_user.email, is_admin=False, max_UGs=1100, max_RGs=1 )
-
- fake_volume = FakeObject()
- fake_volume.name = "fakevolume"
- fake_volume.description = "This is a fake volume, created for funtional testing"
- fake_volume.blocksize = 1024
- fake_volume.cap_read_data = True
- fake_volume.cap_write_data = True
- fake_volume.cap_host_data = False
- fake_volume.archive = False
- fake_volume.private = True
-
- # test idempotency
- print "\nensure_volume_exists(%s)\n" % fake_volume.name
- ensure_volume_exists( fake_user.email, fake_volume )
-
- print "\nensure_volume_exists(%s)\n" % fake_volume.name
- ensure_volume_exists( fake_user.email, fake_volume )
-
- print "\nensure_volume_access_right_exists(%s,%s)\n" % (fake_user.email, fake_volume.name)
- ensure_volume_access_right_exists( fake_user.email, fake_volume.name, 31 )
-
- print "\nensure_volume_access_right_exists(%s,%s)\n" % (fake_user.email, fake_volume.name)
- ensure_volume_access_right_exists( fake_user.email, fake_volume.name, 31 )
-
- print "\nensure_volume_access_right_absent(%s,%s)\n" % (fake_user.email, fake_volume.name)
- ensure_volume_access_right_absent( fake_user.email, fake_volume.name )
-
- print "\nensure_volume_access_right_absent(%s,%s)\n" % (fake_user.email, fake_volume.name)
- ensure_volume_access_right_absent( fake_user.email, fake_volume.name )
-
- print "\nensure_volume_absent(%s)\n" % fake_volume.name
- ensure_volume_absent( fake_volume.name )
-
- print "\nensure_volume_absent(%s)\n" % fake_volume.name
- ensure_volume_absent( fake_volume.name )
-
- print "\nensure_user_absent(%s)\n" % fake_user.email
- ensure_user_absent( fake_user.email )
-
- print "\nensure_user_absent(%s)\n" % fake_user.email
- ensure_user_absent( fake_user.email )
-
-
-
-
- print "\nensure_principal_exists(%s)\n" % fake_user.email
- ensure_principal_exists( fake_user.email, "asdf", is_admin=False, max_UGs=1100, max_RGs=1 )
-
- print "\nensure_principal_exists(%s)\n" % fake_user.email
- ensure_principal_exists( fake_user.email, "asdf", is_admin=False, max_UGs=1100, max_RGs=1 )
-
- print "\nensure_volume_exists(%s)\n" % fake_volume.name
- ensure_volume_exists( fake_user.email, fake_volume )
-
- print "\nsetup_volume_access(%s, %s)\n" % (fake_user.email, fake_volume.name)
- setup_volume_access( fake_user.email, fake_volume.name, 31, 38800, "abcdef" )
-
- print "\nsetup_volume_access(%s, %s)\n" % (fake_user.email, fake_volume.name)
- setup_volume_access( fake_user.email, fake_volume.name, 31, 38800, "abcdef" )
-
- print "\nteardown_volume_access(%s, %s)\n" % (fake_user.email, fake_volume.name )
- teardown_volume_access( fake_user.email, fake_volume.name )
-
- print "\nteardown_volume_access(%s, %s)\n" % (fake_user.email, fake_volume.name )
- teardown_volume_access( fake_user.email, fake_volume.name )
-
- print "\nensure_volume_absent(%s)\n" % fake_volume.name
- ensure_volume_absent( fake_volume.name )
-
- print "\nensure_principal_absent(%s)\n" % fake_user.email
- ensure_principal_absent( fake_user.email )
-
-
-
-#-------------------------------
-def ft_volumeslice( slice_name ):
- """
- Functional tests for reading VolumeSlice information
- """
- print "slice: %s" % slice_name
-
- volumes = get_volumeslice_volume_names( slice_name )
-
- print "volumes mounted in slice %s:" % slice_name
- for v in volumes:
- print " %s:" % v
-
- vs = get_volumeslice( v, slice_name )
-
- print " %s" % dir(vs)
-
-
-#-------------------------------
-def ft_get_slice_hostnames( slice_name ):
- """
- Functional tests for getting slice hostnames
- """
-
- print "Get slice hostnames for %s" % slice_name
-
- hostnames = get_slice_hostnames( slice_name )
- import pprint
-
- pp = pprint.PrettyPrinter()
-
- pp.pprint( hostnames )
-
-
-#-------------------------------
-def ft_syndicate_principal():
- """
- Functional tests for creating, reading, and deleting SyndicatePrincipals.
- """
- print "generating key pair"
- pubkey_pem, privkey_pem = api.generate_key_pair( 4096 )
-
- user_email = "fakeuser@opencloud.us"
-
- print "saving principal"
- put_principal_data( user_email, "asdf", pubkey_pem, privkey_pem )
-
- print "fetching principal private key"
- saved_privkey_pem = get_principal_pkey( user_email, "asdf" )
-
- assert saved_privkey_pem is not None, "Could not fetch saved private key"
- assert saved_privkey_pem == privkey_pem, "Saved private key does not match actual private key"
-
- print "delete principal"
-
- delete_principal_data( user_email )
-
- print "make sure its deleted..."
-
- saved_privkey_pem = get_principal_pkey( user_email, "asdf" )
-
- assert saved_privkey_pem is None, "Principal key not deleted"
-
-
-#-------------------------------
-def ft_credential_server():
- """
- Functional test for the credential server
- """
- ensure_credential_server_running( run_once=True, foreground=True )
-
-
-#-------------------------------
-def ft_seal_and_unseal():
- """
- Functional test for sealing/unsealing data
- """
- print "generating key pair"
- pubkey_pem, privkey_pem = api.generate_key_pair( 4096 )
-
- sealed_buf = create_sealed_and_signed_blob( privkey_pem, "foo", "hello world")
- print "sealed data is:\n\n%s\n\n" % sealed_buf
-
- buf = verify_and_unseal_blob( pubkey_pem, "foo", sealed_buf )
- print "unsealed data is: \n\n%s\n\n" % buf
-
-
-# run functional tests
-if __name__ == "__main__":
- sys.path.append("/opt/xos")
- os.environ.setdefault("DJANGO_SETTINGS_MODULE", "xos.settings")
-
- if len(sys.argv) < 2:
- print "Usage: %s testname [args]" % sys.argv[0]
-
- # call a method starting with ft_, and then pass the rest of argv as its arguments
- testname = sys.argv[1]
- ft_testname = "ft_%s" % testname
-
- test_call = "%s(%s)" % (ft_testname, ",".join(sys.argv[2:]))
-
- print "calling %s" % test_call
-
- rc = eval( test_call )
-
- print "result = %s" % rc
-
-
diff --git a/xos/synchronizers/syndicate/syndicatelib_config/__init__.py b/xos/synchronizers/syndicate/syndicatelib_config/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/xos/synchronizers/syndicate/syndicatelib_config/__init__.py
+++ /dev/null
diff --git a/xos/synchronizers/syndicate/syndicatelib_config/config-jude.py b/xos/synchronizers/syndicate/syndicatelib_config/config-jude.py
deleted file mode 100644
index 9e0f1fd..0000000
--- a/xos/synchronizers/syndicate/syndicatelib_config/config-jude.py
+++ /dev/null
@@ -1,21 +0,0 @@
-#!/usr/bin/env python
-
-# configuration for syndicatelib
-SYNDICATE_SMI_URL="http://localhost:8080"
-
-SYNDICATE_OPENID_TRUSTROOT="http://localhost:8081"
-
-SYNDICATE_OPENCLOUD_USER="judecn@gmail.com"
-SYNDICATE_OPENCLOUD_PASSWORD="nya"
-
-SYNDICATE_PYTHONPATH="/home/jude/Desktop/research/git/syndicate/build/out/python"
-
-SYNDICATE_PRIVATE_KEY="/home/jude/Desktop/research/git/syndicate/opencloud/observers/syndicate/syndicatelib_config/pollserver.pem"
-SYNDICATE_OPENCLOUD_SECRET="e4988309a5005edb8ea185f16f607938c0fb7657e4d7609853bcb7c4884d1c92"
-
-SYNDICATE_HTTP_PORT=65321
-
-SYNDICATE_RG_CLOSURE="/home/jude/Desktop/research/git/syndicate/build/out/python/syndicate/rg/drivers/disk"
-SYNDICATE_RG_DEFAULT_PORT=38800
-
-DEBUG=True
diff --git a/xos/synchronizers/syndicate/syndicatelib_config/config-opencloud.py b/xos/synchronizers/syndicate/syndicatelib_config/config-opencloud.py
deleted file mode 100644
index b3add16..0000000
--- a/xos/synchronizers/syndicate/syndicatelib_config/config-opencloud.py
+++ /dev/null
@@ -1,73 +0,0 @@
-#!/usr/bin/env python
-
-# ---------------------------------
-# This is the configuration file used by the Syndicate observer.
-# It is a well-formed Python file, and will be imported into the
-# observer as a Python module. This means you can run any config-
-# generation code here you like, but all of the following global
-# variables must be defined.
-# ---------------------------------
-
-# URL to the Syndicate SMI. For example, https://syndicate-metadata.appspot.com
-SYNDICATE_SMI_URL="http://localhost:8080"
-
-# If you are going to use OpenID to authenticate the Syndicate instance daemon,
-# this is the OpenID provider URL. It is currently used only to generate
-# identity pages for users, so you can put whatever you want here for now.
-SYNDICATE_OPENID_TRUSTROOT="http://localhost:8081"
-
-# This is the observer's user account on Syndicate. You must create it out-of-band
-# prior to using the observer, and it must be an admin user since it will
-# create other users (i.e. for slices).
-SYNDICATE_OPENCLOUD_USER="jcnelson@cs.princeton.edu"
-
-# This is the password for the observer to authenticate itself to Syndicate.
-SYNDICATE_OPENCLOUD_PASSWORD="nya"
-
-# If the observer uses public-key authentication with Syndicate, you will
-# need to identify the absolute path to its private key here. It must be
-# a 4096-bit PEM-encoded RSA key, and the Syndicate observer's user account
-# must have been given the public key on activation.
-SYNDICATE_OPENCLOUD_PKEY=None
-
-# This is the location on disk where Syndicate observer code can be found,
-# if it is not already in the Python path. This is optional.
-SYNDICATE_PYTHONPATH="/root/syndicate/build/out/python"
-
-# This is the location of the observer's private key. It must be an absolute
-# path, and refer to a 4096-bit PEM-encoded RSA key.
-SYNDICATE_PRIVATE_KEY="/opt/xos/observers/syndicate/syndicatelib_config/pollserver.pem"
-
-# This is the master secret used to generate secrets to seal sensitive information sent to the
-# Syndicate instance mount daemons. It is also used to seal sensitive information
-# stored to the Django database.
-# TODO: think of a way to not have to store this on disk. Maybe we feed into the
-# observer when it starts up?
-SYNDICATE_OPENCLOUD_SECRET="e4988309a5005edb8ea185f16f607938c0fb7657e4d7609853bcb7c4884d1c92"
-
-# This is the default port number on which a Syndicate Replica Gateway
-# will be provisioned. It's a well-known port, and can be the same across
-# instances, since in OpenCloud, an RG instance only listens to localhost.
-SYNDICATE_RG_DEFAULT_PORT=38800
-
-# This is the absolute path to the RG's storage driver (which will be automatically
-# pushed to instances by Syndicate). See https://github.com/jcnelson/syndicate/wiki/Replica-Gateways
-SYNDICATE_RG_CLOSURE=None
-
-# This is the port number the observer listens on for GETs from the Syndicate instance mount
-# daemons. Normally, the oserver pushes (encrypted) commands to the daemons, but if the
-# daemons are NAT'ed or temporarily partitioned, they will pull commands instead.
-SYNDICATE_HTTP_PORT=65321
-
-# This is the path to the logfile for the observer's HTTP server.
-SYNDICATE_HTTP_LOGFILE="/tmp/syndicate-observer.log"
-
-# This is the number of seconds to wait for pushing a slice credential before timing out.
-SYNDICATE_HTTP_PUSH_TIMEOUT=60
-
-# This is the port number the Syndicate instance mount daemons listen on. The observer will
-# push commands to them on this port.
-SYNDICATE_SLIVER_PORT=65322
-
-# If true, print verbose debug messages.
-DEBUG=True
diff --git a/xos/synchronizers/syndicate/syndicatelib_config/config.py b/xos/synchronizers/syndicate/syndicatelib_config/config.py
deleted file mode 100644
index b3add16..0000000
--- a/xos/synchronizers/syndicate/syndicatelib_config/config.py
+++ /dev/null
@@ -1,73 +0,0 @@
-#!/usr/bin/env python
-
-# ---------------------------------
-# This is the configuration file used by the Syndicate observer.
-# It is a well-formed Python file, and will be imported into the
-# observer as a Python module. This means you can run any config-
-# generation code here you like, but all of the following global
-# variables must be defined.
-# ---------------------------------
-
-# URL to the Syndicate SMI. For example, https://syndicate-metadata.appspot.com
-SYNDICATE_SMI_URL="http://localhost:8080"
-
-# If you are going to use OpenID to authenticate the Syndicate instance daemon,
-# this is the OpenID provider URL. It is currently used only to generate
-# identity pages for users, so you can put whatever you want here for now.
-SYNDICATE_OPENID_TRUSTROOT="http://localhost:8081"
-
-# This is the observer's user account on Syndicate. You must create it out-of-band
-# prior to using the observer, and it must be an admin user since it will
-# create other users (i.e. for slices).
-SYNDICATE_OPENCLOUD_USER="jcnelson@cs.princeton.edu"
-
-# This is the password for the observer to authenticate itself to Syndicate.
-SYNDICATE_OPENCLOUD_PASSWORD="nya"
-
-# If the observer uses public-key authentication with Syndicate, you will
-# need to identify the absolute path to its private key here. It must be
-# a 4096-bit PEM-encoded RSA key, and the Syndicate observer's user account
-# must have been given the public key on activation.
-SYNDICATE_OPENCLOUD_PKEY=None
-
-# This is the location on disk where Syndicate observer code can be found,
-# if it is not already in the Python path. This is optional.
-SYNDICATE_PYTHONPATH="/root/syndicate/build/out/python"
-
-# This is the location of the observer's private key. It must be an absolute
-# path, and refer to a 4096-bit PEM-encoded RSA key.
-SYNDICATE_PRIVATE_KEY="/opt/xos/observers/syndicate/syndicatelib_config/pollserver.pem"
-
-# This is the master secret used to generate secrets to seal sensitive information sent to the
-# Syndicate instance mount daemons. It is also used to seal sensitive information
-# stored to the Django database.
-# TODO: think of a way to not have to store this on disk. Maybe we feed into the
-# observer when it starts up?
-SYNDICATE_OPENCLOUD_SECRET="e4988309a5005edb8ea185f16f607938c0fb7657e4d7609853bcb7c4884d1c92"
-
-# This is the default port number on which a Syndicate Replica Gateway
-# will be provisioned. It's a well-known port, and can be the same across
-# instances, since in OpenCloud, an RG instance only listens to localhost.
-SYNDICATE_RG_DEFAULT_PORT=38800
-
-# This is the absolute path to the RG's storage driver (which will be automatically
-# pushed to instances by Syndicate). See https://github.com/jcnelson/syndicate/wiki/Replica-Gateways
-SYNDICATE_RG_CLOSURE=None
-
-# This is the port number the observer listens on for GETs from the Syndicate instance mount
-# daemons. Normally, the oserver pushes (encrypted) commands to the daemons, but if the
-# daemons are NAT'ed or temporarily partitioned, they will pull commands instead.
-SYNDICATE_HTTP_PORT=65321
-
-# This is the path to the logfile for the observer's HTTP server.
-SYNDICATE_HTTP_LOGFILE="/tmp/syndicate-observer.log"
-
-# This is the number of seconds to wait for pushing a slice credential before timing out.
-SYNDICATE_HTTP_PUSH_TIMEOUT=60
-
-# This is the port number the Syndicate instance mount daemons listen on. The observer will
-# push commands to them on this port.
-SYNDICATE_SLIVER_PORT=65322
-
-# If true, print verbose debug messages.
-DEBUG=True
diff --git a/xos/synchronizers/syndicate/syndicatelib_config/pollserver.pem b/xos/synchronizers/syndicate/syndicatelib_config/pollserver.pem
deleted file mode 100644
index cb50de7..0000000
--- a/xos/synchronizers/syndicate/syndicatelib_config/pollserver.pem
+++ /dev/null
@@ -1,51 +0,0 @@
------BEGIN RSA PRIVATE KEY-----
-MIIJKwIBAAKCAgEA6yeoVsm+Yf4sgW/9kzk2NfwkbHloIKXUOpi5x5LkEbnohNRC
-RIGjiMJJp/OefduU3c14h/K6Qefi9j4dw4pyvh2QP36K4lJObIpKAdohHjVjxHqK
-12bVXvCpCAbJHkiX8VK5HGJPDr6sJke1vPTrP6RSWxT7ZRawIBInKuT/OVshskzh
-kOwVXb5ct0AXjRZ6FBdXvNcJnNONKRCIuFHOx2roWLsdgTlPI3+bJim2dQ0JKyHh
-uaTPguZ4s23sCuSKyXCh/X9yVloxIraY6KdaAKQZLyANLfXQdsyyH69kQvvBEZ2R
-EXD0c1qIZwuIm68UH+60BwTPdXGWSL73C0Zsq36vZOadFPb0pmz/o4CuguILNA3i
-560MKcVvQ8HVqA56z+v8pE0TRp0ajTDtpW2ee+t1cXE8VzCwthkIxmneFk+ityoD
-o1N+fBUu4lXJ3kl2qGk+0KECqJ6sc/QN+Ft97JTTRshSzn1kqIlKQoZo3u0Jeo/G
-PFZ0b13/DxYA7nvjt2az48h0VL4mNf5tzDr8GxOK4lYoWIGzKjZLDeJRyxLOCK5j
-F/AvbbSnegT0O/vamn5EoN8HfooH5qiJdPDerriPsgN3HlcY6QjrY5phmAFiGa5T
-X1j1VNb5oamRslgPv3rwTvioTaY/wUmrHLBuU6Sqg/WGrLO2Bocg0USMbG8CAwEA
-AQKCAgEArxuO7WG5lXsSZSih6Rm3Vqf175jQg085JJFJ9mVZ1CFeFluBJUZsIpCb
-DKgLI6l5x1kUIhgLvrwQdFF5FH1qSEv3eHCgtzuXDphD1/E4rCgRrOObtB7tUI9h
-L4ruBNEF5Dw3f/1s5Yvy4WaQ3K58551TfmO3eGVWresWo4h2zZ0hEIbTiXljx7TT
-kdn2L6fHLGLdgM+YZuHZwfR/+tFga3sencRoiivE1KhXPindpngYlbfbQMSLiexZ
-gTOfi9T3zF1FI2HeIJN092aFounLyaJo5oC1j732iCCRm6qdvIuAD8AHoLc+MQ//
-dsxN47CSCd1Uzc01Nz1oLa+WgxzkGbsGNO2eVkRj/xzB0Rld/+Q1YQvn1TnWZFuG
-nXXKi+VwSX8htpDaZL5+hWVy39YXUQcBkTzBIS69tdfd7HCZS0JnKeOMYQFgvANH
-0/J529l8m0oHpex4DdW1scHXgcBOq6oD6KVkiNurfWZu/mWwdEnxAKtRFZc/xFfh
-a4kbTgNk3unGL+gZzeWL1YuIM843Ia4J8V0PYH7GueeZBaXyiT7r4x5hgQ57ObkX
-K9wlgrvSHBNq4OhzygTNs37vJwu38ry2AGmA8LuiFBeVCsVhMk3yVz4A6fXjwWH8
-266gNuODIiHelahvz/IBGLdrjnbA4SYaYQh1SYKfCRdlA2yNlBECggEBAPcqiWP/
-XUCsCy9vMbCvZNCUyr4nqTSvWkGsie6t+yIC0pYCynKdrL9WNF5zee9SVmtcBO0Q
-z+aff8TZAax3tWvD5GzlQOQh1l4QBj30uatklQ0nvwbw+gf5EFG2QerPakwyfC59
-dSagxchzpjloBGniq7jqc6vL3xlZ62vnOLHf+nOQXzDcZ7QK/uLRKj2r01D5+9lh
-08Ah42QID5VQL/NMyg2lylXaPXx6TnSMjJVjzNmLRCIRlOstAOt3isBJ21sT0LOk
-lCGvuF//cwS7VABRMa0TspSEkuMbgFw0XEZStkh68eEUVqax+HHfa1rlxobSIwib
-1Oa9s7KbQNaozgUCggEBAPOPOSLazItJ4OOFu8/69M33Ag8hpPZNkMw1fw99p2fD
-KnZYlEWHgF4Z76gkozHh+nk8HhohvYfIhmFYEHQOXfmgljsz3jFJKwTEnfN7IsZA
-C3TVl6OVuY2rjhBOe3137CYHn9d8KRaJAKdyd038LK29Yy+FvUVw6LD4QUoRiA21
-9sOrhO/Wgcohnqk5yVnXtBkp7j7qGN9d+GLZVAVOqKjniQqy9ir3IdLYmB801t9P
-TcbucmgEzs/cmx7d/jv1kx9/O0HHIm959Ox66hPkcG3bssJk41F6PDMOVEWiC/hc
-E5a7Mlr6M4YhuDjO1zoyQpy4Sj/MKpasnotNSL51JuMCggEBALhYkYBzximmJ/GJ
-DZaqOpcXYt/Q1PLmlnrFJVtPiC8ly8r26efykhVjRkvr9NX6o1oPl9z43Rc1fyZi
-dE0eO8HUqVpO4sdENY6ShRVQoeqjakgVjPSwZsvrh7BqL1/is3WBcf16tRXKc7m+
-CAxo+GHBHjMdKojH1e4ikuQ34KFKXJI068qVmQM/8DtbphW5QjLzQFQyEq0KmX7S
-RE0pMZpVe54SOYcu7w0Ya8uhyHjjprXamUaPtnJxbm4xCtvAOksDzHUwGwvE888l
-x7OPxGc4J8TfHCKJfsTEjkg3BVut9Sa6DA3EDZzmwFauPHPfTOLheB/Dmlc+xfhA
-s2tnG8ECggEBAKiLbFaaYwHg1iec3CNI3y/IxzwBZE6tzo4CVzM5GSfM/w12ruSO
-qF52REpvUB+s6dALsikTQD0+nv+uGXS2nIGqh0vg0Nn6cDKUfVmI1L+sgkEPrigd
-7JIFLgJKzVo+KsUGca6E1Uoq9LDrnXPyFlkEviacviXXxK7ynPvMtgIG8gTmJNBz
-+M0QBuPEgXoSsybWxW/0P9ITDVgaXPJvRHfeAg/NWFzTOCzYhizSO/+8uW34hGNH
-MHbXiuEJbm2/u1gIi9ExJLtQAhXD2Uh6xPLBHis39bbkh9QtDlRBl1b/IO8mC+q5
-Sf6ARyPIv1gef8pEHd2YQ8CRJAXyLWzfVVECggEBANrvnE2hQaYqe/BM9QGN9Cam
-CUTTBhvQDTBnpBYv8iQCgy0nmmVZ07j0yjR/I5wipcWAB7Bskv1rfh/3VpWUGCcR
-2MnPobZnvL1Dl22G7P8HBUiIA+NGWNdR5FyIL/yLy2BVEs7dNeK5WolD8IQP+fTw
-E9Mvd6ns2TIveXMZFtqRja3H426iv38QqWg0RmmhcmnSkD7SqAZWGI+OoRsUJ2Et
-bg4N9Cb46Gjqdh8SQF+rXYfL1AWnjMM7/AhJLMoWWb0sBzqA7UeJxLlAt1Lgnnsl
-P2nszH+Ia9V0dSlr79haGo80FALM8TiKBAQ/bTktqP5vOWSlCzHj7K30Bil36TQ=
------END RSA PRIVATE KEY-----