CORD-1242 delete old synchronizer framework
Change-Id: Idfe6042f4663568f6ef329cd454f6b1f7bc0d08b
diff --git a/xos/synchronizers/base/SyncInstanceUsingAnsible.py b/xos/synchronizers/base/SyncInstanceUsingAnsible.py
deleted file mode 100644
index 406cfa1..0000000
--- a/xos/synchronizers/base/SyncInstanceUsingAnsible.py
+++ /dev/null
@@ -1,361 +0,0 @@
-import hashlib
-import os
-import socket
-import sys
-import base64
-import time
-from django.db.models import F, Q
-from xos.config import Config
-from synchronizers.base.syncstep import SyncStep
-from synchronizers.base.ansible_helper import run_template_ssh
-from core.models import Service, Slice, ControllerSlice, ControllerUser, ModelLink, ServiceDependency, Tenant, ServiceMonitoringAgentInfo
-from xos.logger import Logger, logging
-
-logger = Logger(level=logging.INFO)
-
-class SyncInstanceUsingAnsible(SyncStep):
- # All of the following should be defined for classes derived from this
- # base class. Examples below use VSGTenant.
-
- # provides=[VSGTenant]
- # observes=VSGTenant
- # requested_interval=0
- # template_name = "sync_vcpetenant.yaml"
-
- def __init__(self, **args):
- SyncStep.__init__(self, **args)
-
- def skip_ansible_fields(self, o):
- # Return True if the instance processing and get_ansible_fields stuff
- # should be skipped. This hook is primarily for the OnosApp
- # sync step, so it can do its external REST API sync thing.
- return False
-
- def defer_sync(self, o, reason):
- # zdw, 2017-02-18 - is raising the exception here necessary? - seems like
- # it's just logging the same thing twice
- logger.info("defer object %s due to %s" % (str(o), reason),extra=o.tologdict())
- raise Exception("defer object %s due to %s" % (str(o), reason))
-
- def get_extra_attributes(self, o):
- # This is a place to include extra attributes that aren't part of the
- # object itself.
-
- return {}
-
- def get_instance(self, o):
- # We need to know what instance is associated with the object. Let's
- # assume 'o' has a field called 'instance'. If the field is called
- # something else, or if custom logic is needed, then override this
- # method.
-
- return o.instance
-
- def get_external_sync(self, o):
- hostname = getattr(o, "external_hostname", None)
- container = getattr(o, "external_container", None)
- if hostname and container:
- return (hostname, container)
- else:
- return None
-
- def run_playbook(self, o, fields, template_name=None):
- if not template_name:
- template_name = self.template_name
- tStart = time.time()
- run_template_ssh(template_name, fields, object=o)
- logger.info("playbook execution time %d" % int(time.time()-tStart),extra=o.tologdict())
-
- def pre_sync_hook(self, o, fields):
- pass
-
- def post_sync_hook(self, o, fields):
- pass
-
- def sync_fields(self, o, fields):
- self.run_playbook(o, fields)
-
- def prepare_record(self, o):
- pass
-
- def get_node(self,o):
- return o.node
-
- def get_node_key(self, node):
- return getattr(Config(), "observer_node_key", "/opt/cord_profile/node_key")
-
- def get_key_name(self, instance):
- if instance.isolation=="vm":
- if (instance.slice) and (instance.slice.service) and (instance.slice.service.private_key_fn):
- key_name = instance.slice.service.private_key_fn
- else:
- raise Exception("Make sure to set private_key_fn in the service")
- elif instance.isolation=="container":
- node = self.get_node(instance)
- key_name = self.get_node_key(node)
- else:
- # container in VM
- key_name = instance.parent.slice.service.private_key_fn
-
- return key_name
-
- def get_ansible_fields(self, instance):
- # return all of the fields that tell Ansible how to talk to the context
- # that's setting up the container.
-
- if (instance.isolation == "vm"):
- # legacy where container was configured by sync_vcpetenant.py
-
- fields = { "instance_name": instance.name,
- "hostname": instance.node.name,
- "instance_id": instance.instance_id,
- "username": "ubuntu",
- "ssh_ip": instance.get_ssh_ip(),
- }
-
- elif (instance.isolation == "container"):
- # container on bare metal
- node = self.get_node(instance)
- hostname = node.name
- fields = { "hostname": hostname,
- "baremetal_ssh": True,
- "instance_name": "rootcontext",
- "username": "root",
- "container_name": "%s-%s" % (instance.slice.name, str(instance.id))
- # ssh_ip is not used for container-on-metal
- }
- else:
- # container in a VM
- if not instance.parent:
- raise Exception("Container-in-VM has no parent")
- if not instance.parent.instance_id:
- raise Exception("Container-in-VM parent is not yet instantiated")
- if not instance.parent.slice.service:
- raise Exception("Container-in-VM parent has no service")
- if not instance.parent.slice.service.private_key_fn:
- raise Exception("Container-in-VM parent service has no private_key_fn")
- fields = { "hostname": instance.parent.node.name,
- "instance_name": instance.parent.name,
- "instance_id": instance.parent.instance_id,
- "username": "ubuntu",
- "ssh_ip": instance.parent.get_ssh_ip(),
- "container_name": "%s-%s" % (instance.slice.name, str(instance.id))
- }
-
- key_name = self.get_key_name(instance)
- if not os.path.exists(key_name):
- raise Exception("Node key %s does not exist" % key_name)
-
- key = file(key_name).read()
-
- fields["private_key"] = key
-
- # Now the ceilometer stuff
- # Only do this if the instance is not being deleted.
- if not instance.deleted:
- cslice = ControllerSlice.objects.get(slice=instance.slice)
- if not cslice:
- raise Exception("Controller slice object for %s does not exist" % instance.slice.name)
-
- cuser = ControllerUser.objects.get(user=instance.creator)
- if not cuser:
- raise Exception("Controller user object for %s does not exist" % instance.creator)
-
- fields.update({"keystone_tenant_id": cslice.tenant_id,
- "keystone_user_id": cuser.kuser_id,
- "rabbit_user": getattr(instance.controller,"rabbit_user", None),
- "rabbit_password": getattr(instance.controller, "rabbit_password", None),
- "rabbit_host": getattr(instance.controller, "rabbit_host", None)})
-
- return fields
-
- def sync_record(self, o):
- logger.info("sync'ing object %s" % str(o),extra=o.tologdict())
-
- self.prepare_record(o)
-
- if self.skip_ansible_fields(o):
- fields = {}
- else:
- if self.get_external_sync(o):
- # sync to some external host
-
- # UNTESTED
-
- (hostname, container_name) = self.get_external_sync(o)
- fields = { "hostname": hostname,
- "baremetal_ssh": True,
- "instance_name": "rootcontext",
- "username": "root",
- "container_name": container_name
- }
- key_name = self.get_node_key(node)
- if not os.path.exists(key_name):
- raise Exception("Node key %s does not exist" % key_name)
-
- key = file(key_name).read()
-
- fields["private_key"] = key
- # TO DO: Ceilometer stuff
- else:
- instance = self.get_instance(o)
- # sync to an XOS instance
- if not instance:
- self.defer_sync(o, "waiting on instance")
- return
-
- if not instance.instance_name:
- self.defer_sync(o, "waiting on instance.instance_name")
- return
-
- fields = self.get_ansible_fields(instance)
-
- fields["ansible_tag"] = o.__class__.__name__ + "_" + str(o.id)
-
- # If 'o' defines a 'sync_attributes' list, then we'll copy those
- # attributes into the Ansible recipe's field list automatically.
- if hasattr(o, "sync_attributes"):
- for attribute_name in o.sync_attributes:
- fields[attribute_name] = getattr(o, attribute_name)
-
- fields.update(self.get_extra_attributes(o))
-
- self.sync_fields(o, fields)
-
- o.save()
-
- def delete_record(self, o):
- try:
- controller = o.get_controller()
- controller_register = json.loads(o.node.site_deployment.controller.backend_register)
-
- if (controller_register.get('disabled',False)):
- raise InnocuousException('Controller %s is disabled'%o.node.site_deployment.controller.name)
- except AttributeError:
- pass
-
- instance = self.get_instance(o)
-
- if not instance:
- # the instance is gone. There's nothing left for us to do.
- return
-
- if instance.deleted:
- # the instance is being deleted. There's nothing left for us to do.
- return
-
- if isinstance(instance, basestring):
- # sync to some external host
-
- # XXX - this probably needs more work...
-
- fields = { "hostname": instance,
- "instance_id": "ubuntu", # this is the username to log into
- "private_key": service.key,
- }
- else:
- # sync to an XOS instance
- fields = self.get_ansible_fields(instance)
-
- fields["ansible_tag"] = o.__class__.__name__ + "_" + str(o.id)
-
- # If 'o' defines a 'sync_attributes' list, then we'll copy those
- # attributes into the Ansible recipe's field list automatically.
- if hasattr(o, "sync_attributes"):
- for attribute_name in o.sync_attributes:
- fields[attribute_name] = getattr(o, attribute_name)
-
- if hasattr(self, "map_delete_inputs"):
- fields.update(self.map_delete_inputs(o))
-
- fields['delete']=True
- res = self.run_playbook(o,fields)
-
- if hasattr(self, "map_delete_outputs"):
- self.map_delete_outputs(o,res)
-
- #In order to enable the XOS watcher functionality for a synchronizer, define the 'watches' attribute
- #in the derived class: eg. watches = [ModelLink(ServiceDependency,via='servicedependency')]
- #This base class implements the notification handler for handling ServiceDependency model notifications
- #If a synchronizer need to watch on multiple objects, the additional handlers need to be implemented
- #in the derived class and override the below handle_watched_object() method to route the notifications
- #accordingly
- def handle_watched_object(self, o):
- logger.info("handle_watched_object is invoked for object %s" % (str(o)),extra=o.tologdict())
- if (type(o) is ServiceDependency):
- self.handle_service_composition_watch_notification(o)
- elif (type(o) is ServiceMonitoringAgentInfo):
- self.handle_service_monitoringagentinfo_watch_notification(o)
- pass
-
- def handle_service_composition_watch_notification(self, coarse_tenant):
- cls_obj = self.observes
- if (type(cls_obj) is list):
- cls_obj = cls_obj[0]
- logger.info("handle_watched_object observed model %s" % (cls_obj))
-
- objs = cls_obj.objects.filter(kind=cls_obj.KIND).all()
-
- for obj in objs:
- self.handle_service_composition_for_object(obj, coarse_tenant)
- pass
-
- def handle_service_monitoringagentinfo_watch_notification(self, monitoring_agent_info):
- pass
-
- def handle_service_composition_for_object(self, obj, coarse_tenant):
- try:
- instance = self.get_instance(obj)
- valid_instance = True
- except:
- valid_instance = False
-
- if not valid_instance:
- logger.warn("handle_watched_object: No valid instance found for object %s" % (str(obj)))
- return
-
- provider_service = coarse_tenant.provider_service
- subscriber_service = coarse_tenant.subscriber_service
-
- if isinstance(obj,Service):
- if obj.id == provider_service.id:
- matched_service = provider_service
- other_service = subscriber_service
- elif obj.id == subscriber_service.id:
- matched_service = subscriber_service
- other_service = provider_service
- else:
- logger.info("handle_watched_object: Service object %s does not match with any of composed services" % (str(obj)))
- return
- elif isinstance(obj,Tenant):
- if obj.provider_service.id == provider_service.id:
- matched_service = provider_service
- other_service = subscriber_service
- elif obj.provider_service.id == subscriber_service.id:
- matched_service = subscriber_service
- other_service = provider_service
- else:
- logger.info("handle_watched_object: Tenant object %s does not match with any of composed services" % (str(obj)))
- return
- else:
- logger.warn("handle_watched_object: Model object %s is of neither Service nor Tenant type" % (str(obj)))
-
- src_networks = matched_service.get_composable_networks()
- target_networks = other_service.get_composable_networks()
- if src_networks and target_networks:
- src_network = src_networks[0] #Only one composable network should present per service
- target_network = target_networks[0]
- src_ip = instance.get_network_ip(src_network.name)
- target_subnet = target_network.controllernetworks.all()[0].subnet
-
- #Run ansible playbook to update the routing table entries in the instance
- fields = self.get_ansible_fields(instance)
- fields["ansible_tag"] = obj.__class__.__name__ + "_" + str(obj.id) + "_service_composition"
- fields["src_intf_ip"] = src_ip
- fields["target_subnet"] = target_subnet
- #Template file is available under .../synchronizers/shared_templates
- service_composition_template_name = "sync_service_composition.yaml"
- logger.info("handle_watched_object: Updating routing tables in the instance associated with object %s: target_subnet:%s src_ip:%s" % (str(obj), target_subnet, src_ip))
- SyncInstanceUsingAnsible.run_playbook(self, obj, fields, service_composition_template_name)
- else:
- logger.info("handle_watched_object: No intersection of composable networks between composed services %s" % (str(coarse_tenant)))
diff --git a/xos/synchronizers/base/__init__.py b/xos/synchronizers/base/__init__.py
deleted file mode 100644
index e56cd39..0000000
--- a/xos/synchronizers/base/__init__.py
+++ /dev/null
@@ -1,36 +0,0 @@
-from xos.config import Config
-
-try:
- observer_disabled = Config().observer_disabled
-except:
- observer_disabled = False
-
-def EnableObserver(x):
- """ used for manage.py --noobserver """
- global observer_disabled
- observer_disabled = not x
-
-print_once = True
-
-def notify_observer(model=None, delete=False, pk=None, model_dict={}):
- if (observer_disabled):
- global print_once
- if (print_once):
- print "The observer is disabled"
- print_once = False
- return
-
- try:
- from .event_manager import EventSender
- if (model and delete):
- if hasattr(model,"__name__"):
- modelName = model.__name__
- else:
- modelName = model.__class__.__name__
- EventSender().fire(delete_flag = delete, model = modelName, pk = pk, model_dict=model_dict)
- else:
- EventSender().fire()
- except Exception,e:
- print "Exception in Observer. This should not disrupt the front end. %s"%str(e)
-
-
diff --git a/xos/synchronizers/base/ansible_helper.py b/xos/synchronizers/base/ansible_helper.py
deleted file mode 100644
index 5bc79b4..0000000
--- a/xos/synchronizers/base/ansible_helper.py
+++ /dev/null
@@ -1,266 +0,0 @@
-#!/usr/bin/env python
-
-import jinja2
-import tempfile
-import os
-import json
-import pickle
-import pdb
-import string
-import random
-import re
-import traceback
-import subprocess
-import threading
-from xos.config import Config, XOS_DIR
-from xos.logger import observer_logger as logger
-from multiprocessing import Process, Queue
-
-
-step_dir = Config().observer_steps_dir
-sys_dir = Config().observer_sys_dir
-
-os_template_loader = jinja2.FileSystemLoader( searchpath=[step_dir, "/opt/xos/synchronizers/shared_templates"])
-os_template_env = jinja2.Environment(loader=os_template_loader)
-
-def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
- return ''.join(random.choice(chars) for _ in range(size))
-
-def shellquote(s):
- return "'" + s.replace("'", "'\\''") + "'"
-
-def get_playbook_fn(opts, path):
- if not opts.get("ansible_tag", None):
- # if no ansible_tag is in the options, then generate a unique one
- objname= id_generator()
- opts = opts.copy()
- opts["ansible_tag"] = objname
-
- objname = opts["ansible_tag"]
-
- pathed_sys_dir = os.path.join(sys_dir, path)
- if not os.path.isdir(pathed_sys_dir):
- os.makedirs(pathed_sys_dir)
-
- # symlink steps/roles into sys/roles so that playbooks can access roles
- roledir = os.path.join(step_dir,"roles")
- rolelink = os.path.join(pathed_sys_dir, "roles")
- if os.path.isdir(roledir) and not os.path.islink(rolelink):
- os.symlink(roledir,rolelink)
-
- return (opts, os.path.join(pathed_sys_dir,objname))
-
-def run_playbook(ansible_hosts, ansible_config, fqp, opts):
- args = {"ansible_hosts": ansible_hosts,
- "ansible_config": ansible_config,
- "fqp": fqp,
- "opts": opts}
-
- keep_temp_files = getattr(Config(), "observer_keep_temp_files", False)
-
- dir = tempfile.mkdtemp()
- args_fn = None
- result_fn = None
- try:
- logger.info("creating args file in %s" % dir)
-
- args_fn = os.path.join(dir, "args")
- result_fn = os.path.join(dir, "result")
-
- open(args_fn, "w").write(pickle.dumps(args))
-
- ansible_main_fn = os.path.join(os.path.dirname(__file__), "ansible_main.py")
-
- os.system("python %s %s %s" % (ansible_main_fn, args_fn, result_fn))
-
- result = pickle.loads(open(result_fn).read())
-
- if hasattr(result, "exception"):
- logger.log_error("Exception in playbook: %s" % result["exception"])
-
- stats = result.get("stats", None)
- aresults = result.get("aresults", None)
- except:
- logger.log_exc("Exception running ansible_main")
- stats = None
- aresults = None
- finally:
- if not keep_temp_files:
- if args_fn and os.path.exists(args_fn):
- os.remove(args_fn)
- if result_fn and os.path.exists(result_fn):
- os.remove(result_fn)
- os.rmdir(dir)
-
- return (stats, aresults)
-
-def run_template(name, opts, path='', expected_num=None, ansible_config=None, ansible_hosts=None, run_ansible_script=None, object=None):
- template = os_template_env.get_template(name)
- buffer = template.render(opts)
-
- (opts, fqp) = get_playbook_fn(opts, path)
-
- f = open(fqp,'w')
- f.write(buffer)
- f.flush()
-
- """
- q = Queue()
- p = Process(target=run_playbook, args=(ansible_hosts, ansible_config, fqp, opts, q,))
- p.start()
- stats,aresults = q.get()
- p.join()
- """
- stats,aresults = run_playbook(ansible_hosts,ansible_config,fqp,opts)
-
- error_msg = []
-
- output_file = fqp + '.out'
- try:
- if (aresults is None):
- raise ValueError("Error executing playbook %s"%fqp)
-
- ok_results = []
- total_unreachable = 0
- failed = 0
-
- ofile = open(output_file, 'w')
-
- for x in aresults:
- if not x.is_failed() and not x.is_unreachable() and not x.is_skipped():
- ok_results.append(x)
- elif x.is_unreachable():
- failed+=1
- total_unreachable+=1
- try:
- error_msg.append(x._result['msg'])
- except:
- pass
- elif x.is_failed():
- failed+=1
- try:
- error_msg.append(x._result['msg'])
- except:
- pass
-
- # FIXME (zdw, 2017-02-19) - may not be needed with new callback logging
-
- ofile.write('%s: %s\n'%(x._task, str(x._result)))
-
- if (object):
- oprops = object.tologdict()
- ansible = x._result
- oprops['xos_type']='ansible'
- oprops['ansible_result']=json.dumps(ansible)
-
- if failed == 0:
- oprops['ansible_status']='OK'
- else:
- oprops['ansible_status']='FAILED'
-
- logger.info(x._task, extra=oprops)
-
-
- ofile.close()
-
- if (expected_num is not None) and (len(ok_results) != expected_num):
- raise ValueError('Unexpected num %s!=%d' % (str(expected_num), len(ok_results)) )
-
- #total_unreachable = stats.unreachable
-
- if (failed):
- raise ValueError('Ansible playbook failed.')
-
- except ValueError,e:
- if error_msg:
- try:
- error = ' // '.join(error_msg)
- except:
- error = "failed to join error_msg"
- raise Exception(error)
- else:
- raise
-
-
-
- processed_results = map(lambda x:x._result, ok_results)
- return processed_results[1:] # 0 is setup
-
-def run_template_ssh(name, opts, path='', expected_num=None, object=None):
- instance_name = opts["instance_name"]
- hostname = opts["hostname"]
- private_key = opts["private_key"]
- baremetal_ssh = opts.get("baremetal_ssh",False)
- if baremetal_ssh:
- # no instance_id or ssh_ip for baremetal
- # we never proxy to baremetal
- proxy_ssh = False
- else:
- instance_id = opts["instance_id"]
- ssh_ip = opts["ssh_ip"]
- try:
- proxy_ssh = Config().observer_proxy_ssh
- except:
- proxy_ssh = True
-
- if (not ssh_ip):
- raise Exception('IP of ssh proxy not available. Synchronization deferred')
-
- (opts, fqp) = get_playbook_fn(opts, path)
- private_key_pathname = fqp + ".key"
- config_pathname = fqp + ".config"
- hosts_pathname = fqp + ".hosts"
-
- f = open(private_key_pathname, "w")
- f.write(private_key)
- f.close()
-
- f = open(config_pathname, "w")
- f.write("[ssh_connection]\n")
- if proxy_ssh:
- proxy_ssh_key = getattr(Config(), "observer_proxy_ssh_key", None)
- proxy_ssh_user = getattr(Config(), "observer_proxy_ssh_user", "root")
- if proxy_ssh_key:
- # If proxy_ssh_key is known, then we can proxy into the compute
- # node without needing to have the OpenCloud sshd machinery in
- # place.
- proxy_command = "ProxyCommand ssh -q -i %s -o StrictHostKeyChecking=no %s@%s nc %s 22" % (proxy_ssh_key, proxy_ssh_user, hostname, ssh_ip)
- else:
- proxy_command = "ProxyCommand ssh -q -i %s -o StrictHostKeyChecking=no %s@%s" % (private_key_pathname, instance_id, hostname)
- f.write('ssh_args = -o "%s"\n' % proxy_command)
- f.write('scp_if_ssh = True\n')
- f.write('pipelining = True\n')
- f.write('\n[defaults]\n')
- f.write('host_key_checking = False\n')
- f.write('timeout = 30\n')
- f.close()
-
- f = open(hosts_pathname, "w")
- f.write("[%s]\n" % instance_name)
- if proxy_ssh or baremetal_ssh:
- f.write("%s ansible_ssh_private_key_file=%s\n" % (hostname, private_key_pathname))
- else:
- # acb: Login user is hardcoded, this is not great
- f.write("%s ansible_ssh_private_key_file=%s ansible_ssh_user=ubuntu\n" % (ssh_ip, private_key_pathname))
- f.close()
-
- # SSH will complain if private key is world or group readable
- os.chmod(private_key_pathname, 0600)
-
- print "ANSIBLE_CONFIG=%s" % config_pathname
- print "ANSIBLE_HOSTS=%s" % hosts_pathname
-
- return run_template(name, opts, path, ansible_config = config_pathname, ansible_hosts = hosts_pathname, run_ansible_script="/opt/xos/synchronizers/base/run_ansible_verbose", object=object)
-
-
-
-def main():
- run_template('ansible/sync_user_deployments.yaml',{ "endpoint" : "http://172.31.38.128:5000/v2.0/",
- "name" : "Sapan Bhatia",
- "email": "gwsapan@gmail.com",
- "password": "foobar",
- "admin_user":"admin",
- "admin_password":"6a789bf69dd647e2",
- "admin_tenant":"admin",
- "tenant":"demo",
- "roles":['user','admin'] })
diff --git a/xos/synchronizers/base/ansible_main.py b/xos/synchronizers/base/ansible_main.py
deleted file mode 100644
index 2df23b9..0000000
--- a/xos/synchronizers/base/ansible_main.py
+++ /dev/null
@@ -1,57 +0,0 @@
-import os
-import pickle
-import sys
-#import json
-import traceback
-
-sys.path.append("/opt/xos")
-
-def run_playbook(ansible_hosts, ansible_config, fqp, opts):
- try:
- if ansible_config:
- os.environ["ANSIBLE_CONFIG"] = ansible_config
- else:
- try:
- del os.environ["ANSIBLE_CONFIG"]
- except KeyError:
- pass
-
- if ansible_hosts:
- os.environ["ANSIBLE_HOSTS"] = ansible_hosts
- else:
- try:
- del os.environ["ANSIBLE_HOSTS"]
- except KeyError:
- pass
-
- import ansible_runner
- reload(ansible_runner)
-
- # Dropped support for observer_pretend - to be redone
- runner = ansible_runner.Runner(
- playbook=fqp,
- run_data=opts,
- host_file=ansible_hosts)
-
- stats,aresults = runner.run()
- except Exception, e:
- return {"stats": None, "aresults": None, "exception": traceback.format_exc()}
-
- return {"stats": stats, "aresults": aresults}
-
-def main():
- input_fn = sys.argv[1]
- result_fn = sys.argv[2]
-
- args = pickle.loads(open(input_fn).read())
- ansible_hosts = args["ansible_hosts"]
- ansible_config = args["ansible_config"]
- fqp = args["fqp"]
- opts = args["opts"]
-
- result = run_playbook(ansible_hosts, ansible_config, fqp, opts)
-
- open(result_fn, "w").write(pickle.dumps(result))
-
-if __name__ == "__main__":
- main()
diff --git a/xos/synchronizers/base/ansible_runner.py b/xos/synchronizers/base/ansible_runner.py
deleted file mode 100644
index a00fca8..0000000
--- a/xos/synchronizers/base/ansible_runner.py
+++ /dev/null
@@ -1,325 +0,0 @@
-#!/usr/bin/env python
-
-import os
-import sys
-import pdb
-import json
-import uuid
-
-from ansible import constants
-constants = reload(constants)
-
-from tempfile import NamedTemporaryFile
-from ansible.inventory import Inventory
-from ansible.vars import VariableManager
-from ansible.parsing.dataloader import DataLoader
-from ansible.executor import playbook_executor
-from ansible.utils.display import Display
-from ansible.plugins.callback import CallbackBase
-from xos.logger import observer_logger as logger
-
-
-class ResultCallback(CallbackBase):
-
- CALLBACK_VERSION = 2.0
- CALLBACK_NAME = 'resultcallback'
- CALLBACK_TYPE = 'programmatic'
-
- def __init__(self):
- super(ResultCallback, self).__init__()
- self.results = []
- self.uuid = str(uuid.uuid1())
- self.playbook_status = 'OK'
-
- def v2_playbook_on_start(self, playbook):
- self.playbook = playbook._file_name
- log_extra = {
- 'xos_type': "ansible",
- 'ansible_uuid': self.uuid,
- 'ansible_type': "playbook start",
- 'ansible_status': "OK",
- 'ansible_playbook': self.playbook
- }
- logger.info("PLAYBOOK START [%s]" % self.playbook, extra=log_extra)
-
- def v2_playbook_on_stats(self, stats):
- host_stats = {}
- for host in stats.processed.keys():
- host_stats[host] = stats.summarize(host)
-
- log_extra = {
- 'xos_type': "ansible",
- 'ansible_uuid': self.uuid,
- 'ansible_type': "playbook stats",
- 'ansible_status': self.playbook_status,
- 'ansible_playbook': self.playbook,
- 'ansible_result': json.dumps(host_stats)
- }
-
- if self.playbook_status == 'OK':
- logger.info("PLAYBOOK END [%s]" % self.playbook, extra=log_extra)
- else:
- logger.error("PLAYBOOK END [%s]" % self.playbook, extra=log_extra)
-
- def v2_playbook_on_play_start(self, play):
- log_extra = {
- 'xos_type': "ansible",
- 'ansible_uuid': self.uuid,
- 'ansible_type': "play start",
- 'ansible_status': self.playbook_status,
- 'ansible_playbook': self.playbook
- }
- logger.debug("PLAY START [%s]" % play.name, extra=log_extra)
-
- def v2_runner_on_ok(self, result, **kwargs):
- log_extra = {
- 'xos_type': "ansible",
- 'ansible_uuid': self.uuid,
- 'ansible_type': "task",
- 'ansible_status': "OK",
- 'ansible_result': json.dumps(result._result),
- 'ansible_task': result._task,
- 'ansible_playbook': self.playbook,
- 'ansible_host': result._host.get_name()
- }
- logger.debug("OK [%s]" % str(result._task), extra=log_extra)
- self.results.append(result)
-
- def v2_runner_on_failed(self, result, **kwargs):
- self.playbook_status = "FAILED"
- log_extra = {
- 'xos_type': "ansible",
- 'ansible_uuid': self.uuid,
- 'ansible_type': "task",
- 'ansible_status': "FAILED",
- 'ansible_result': json.dumps(result._result),
- 'ansible_task': result._task,
- 'ansible_playbook': self.playbook,
- 'ansible_host': result._host.get_name()
- }
- logger.error("FAILED [%s]" % str(result._task), extra=log_extra)
- self.results.append(result)
-
- def v2_runner_on_async_failed(self, result, **kwargs):
- self.playbook_status = "FAILED"
- log_extra = {
- 'xos_type': "ansible",
- 'ansible_uuid': self.uuid,
- 'ansible_type': "task",
- 'ansible_status': "ASYNC FAILED",
- 'ansible_result': json.dumps(result._result),
- 'ansible_task': result._task,
- 'ansible_playbook': self.playbook,
- 'ansible_host': result._host.get_name()
- }
- logger.error("ASYNC FAILED [%s]" % str(result._task), extra=log_extra)
-
- def v2_runner_on_skipped(self, result, **kwargs):
- log_extra = {
- 'xos_type': "ansible",
- 'ansible_uuid': self.uuid,
- 'ansible_type': "task",
- 'ansible_status': "SKIPPED",
- 'ansible_result': json.dumps(result._result),
- 'ansible_task': result._task,
- 'ansible_playbook': self.playbook,
- 'ansible_host': result._host.get_name()
- }
- logger.debug("SKIPPED [%s]" % str(result._task), extra=log_extra)
- self.results.append(result)
-
- def v2_runner_on_unreachable(self, result, **kwargs):
- log_extra = {
- 'xos_type': "ansible",
- 'ansible_uuid': self.uuid,
- 'ansible_type': "task",
- 'ansible_status': "UNREACHABLE",
- 'ansible_result': json.dumps(result._result),
- 'ansible_task': result._task,
- 'ansible_playbook': self.playbook,
- 'ansible_host': result._host.get_name()
- }
- logger.error("UNREACHABLE [%s]" % str(result._task), extra=log_extra)
- self.results.append(result)
-
- def v2_runner_retry(self, result, **kwargs):
- log_extra = {
- 'xos_type': "ansible",
- 'ansible_uuid': self.uuid,
- 'ansible_type': "task",
- 'ansible_status': "RETRY",
- 'ansible_result': json.dumps(result._result),
- 'ansible_task': result._task,
- 'ansible_playbook': self.playbook,
- 'ansible_host': result._host.get_name()
- }
- logger.warning("RETRYING [%s] - attempt %d" % (str(result._task), result._result['attempts']), extra=log_extra)
- self.results.append(result)
-
- def v2_playbook_on_handler_task_start(self, task, **kwargs):
- log_extra = {
- 'xos_type': "ansible",
- 'ansible_uuid': self.uuid,
- 'ansible_type': "task",
- 'ansible_status': "HANDLER",
- 'ansible_task': task.get_name().strip(),
- 'ansible_playbook': self.playbook,
- 'ansible_host': result._host.get_name()
- }
- logger.debug("HANDLER [%s]" % task.get_name().strip(), extra=log_extra)
-
- def v2_playbook_on_import_for_host(self, result, imported_file):
- log_extra = {
- 'xos_type': "ansible",
- 'ansible_uuid': self.uuid,
- 'ansible_type': "import",
- 'ansible_status': "IMPORT",
- 'ansible_result': json.dumps(result._result),
- 'ansible_playbook': self.playbook,
- 'ansible_host': result._host.get_name()
- }
- logger.debug("IMPORT [%s]" % imported_file, extra=log_extra)
- self.results.append(result)
-
- def v2_playbook_on_not_import_for_host(self, result, missing_file):
- log_extra = {
- 'xos_type': "ansible",
- 'ansible_uuid': self.uuid,
- 'ansible_type': "import",
- 'ansible_status': "MISSING IMPORT",
- 'ansible_result': json.dumps(result._result),
- 'ansible_playbook': self.playbook,
- 'ansible_host': result._host.get_name()
- }
- logger.debug("MISSING IMPORT [%s]" % missing_file, extra=log_extra)
- self.results.append(result)
-
-class Options(object):
- """
- Options class to replace Ansible OptParser
- """
- def __init__(self, verbosity=None, inventory=None, listhosts=None, subset=None, module_paths=None, extra_vars=None,
- forks=None, ask_vault_pass=None, vault_password_files=None, new_vault_password_file=None,
- output_file=None, tags=None, skip_tags=None, one_line=None, tree=None, ask_sudo_pass=None, ask_su_pass=None,
- sudo=None, sudo_user=None, become=None, become_method=None, become_user=None, become_ask_pass=None,
- ask_pass=None, private_key_file=None, remote_user=None, connection=None, timeout=None, ssh_common_args=None,
- sftp_extra_args=None, scp_extra_args=None, ssh_extra_args=None, poll_interval=None, seconds=None, check=None,
- syntax=None, diff=None, force_handlers=None, flush_cache=None, listtasks=None, listtags=None, module_path=None):
- self.verbosity = verbosity
- self.inventory = inventory
- self.listhosts = listhosts
- self.subset = subset
- self.module_paths = module_paths
- self.extra_vars = extra_vars
- self.forks = forks
- self.ask_vault_pass = ask_vault_pass
- self.vault_password_files = vault_password_files
- self.new_vault_password_file = new_vault_password_file
- self.output_file = output_file
- self.tags = tags
- self.skip_tags = skip_tags
- self.one_line = one_line
- self.tree = tree
- self.ask_sudo_pass = ask_sudo_pass
- self.ask_su_pass = ask_su_pass
- self.sudo = sudo
- self.sudo_user = sudo_user
- self.become = become
- self.become_method = become_method
- self.become_user = become_user
- self.become_ask_pass = become_ask_pass
- self.ask_pass = ask_pass
- self.private_key_file = private_key_file
- self.remote_user = remote_user
- self.connection = connection
- self.timeout = timeout
- self.ssh_common_args = ssh_common_args
- self.sftp_extra_args = sftp_extra_args
- self.scp_extra_args = scp_extra_args
- self.ssh_extra_args = ssh_extra_args
- self.poll_interval = poll_interval
- self.seconds = seconds
- self.check = check
- self.syntax = syntax
- self.diff = diff
- self.force_handlers = force_handlers
- self.flush_cache = flush_cache
- self.listtasks = listtasks
- self.listtags = listtags
- self.module_path = module_path
-
-
-class Runner(object):
-
- def __init__(self, playbook, run_data, private_key_file=None, verbosity=0, host_file=None):
-
- self.playbook = playbook
- self.run_data = run_data
-
- self.options = Options()
- self.options.output_file = playbook + '.result'
- self.options.private_key_file = private_key_file
- self.options.verbosity = verbosity
- self.options.connection = 'ssh' # Need a connection type "smart" or "ssh"
- #self.options.become = True
- self.options.become_method = 'sudo'
- self.options.become_user = 'root'
-
- # Set global verbosity
- self.display = Display()
- self.display.verbosity = self.options.verbosity
- # Executor appears to have it's own
- # verbosity object/setting as well
- playbook_executor.verbosity = self.options.verbosity
-
- # Become Pass Needed if not logging in as user root
- #passwords = {'become_pass': become_pass}
-
- # Gets data from YAML/JSON files
- self.loader = DataLoader()
- try:
- self.loader.set_vault_password(os.environ['VAULT_PASS'])
- except KeyError:
- pass
-
- # All the variables from all the various places
- self.variable_manager = VariableManager()
- self.variable_manager.extra_vars = {} # self.run_data
-
- # Set inventory, using most of above objects
- if (host_file):
- self.inventory = Inventory(loader=self.loader, variable_manager=self.variable_manager, host_list = host_file)
- else:
- self.inventory = Inventory(loader=self.loader, variable_manager=self.variable_manager)
-
- self.variable_manager.set_inventory(self.inventory)
-
- # Setup playbook executor, but don't run until run() called
- self.pbex = playbook_executor.PlaybookExecutor(
- playbooks=[playbook],
- inventory=self.inventory,
- variable_manager=self.variable_manager,
- loader=self.loader,
- options=self.options,
- passwords={})
-
- def run(self):
- os.environ['REQUESTS_CA_BUNDLE'] = '/usr/local/share/ca-certificates/local_certs.crt'
- callback = ResultCallback()
- self.pbex._tqm._stdout_callback = callback
-
- self.pbex.run()
- stats = self.pbex._tqm._stats
-
- # Test if success for record_logs
- run_success = True
- hosts = sorted(stats.processed.keys())
- for h in hosts:
- t = stats.summarize(h)
- if t['unreachable'] > 0 or t['failures'] > 0:
- run_success = False
-
- #os.remove(self.hosts.name)
-
- return stats,callback.results
-
diff --git a/xos/synchronizers/base/backend.py b/xos/synchronizers/base/backend.py
deleted file mode 100644
index a23efdf..0000000
--- a/xos/synchronizers/base/backend.py
+++ /dev/null
@@ -1,84 +0,0 @@
-import os
-import inspect
-import imp
-import sys
-import threading
-import time
-from syncstep import SyncStep
-from synchronizers.base.event_loop import XOSObserver
-from xos.logger import Logger, logging
-from xos.config import Config
-from django.utils import timezone
-from diag import update_diag
-
-
-watchers_enabled = getattr(Config(), "observer_enable_watchers", None)
-
-if (watchers_enabled):
- from synchronizers.base.watchers import XOSWatcher
-
-logger = Logger(level=logging.INFO)
-
-class Backend:
-
- def load_sync_step_modules(self, step_dir=None):
- sync_steps = []
- if step_dir is None:
- try:
- step_dir = Config().observer_steps_dir
- except:
- step_dir = '/opt/xos/synchronizers/openstack/steps'
-
-
- for fn in os.listdir(step_dir):
- pathname = os.path.join(step_dir,fn)
- if os.path.isfile(pathname) and fn.endswith(".py") and (fn!="__init__.py"):
- module = imp.load_source(fn[:-3],pathname)
- for classname in dir(module):
- c = getattr(module, classname, None)
-
- # make sure 'c' is a descendent of SyncStep and has a
- # provides field (this eliminates the abstract base classes
- # since they don't have a provides)
-
- if inspect.isclass(c) and issubclass(c, SyncStep) and hasattr(c,"provides") and (c not in sync_steps):
- sync_steps.append(c)
- return sync_steps
-
- def run(self):
- update_diag(sync_start=time.time(), backend_status="0 - Synchronizer Start")
-
- sync_steps = self.load_sync_step_modules()
-
- # start the observer
- observer = XOSObserver(sync_steps)
- observer_thread = threading.Thread(target=observer.run,name='synchronizer')
- observer_thread.start()
-
- # start the watcher thread
- if (watchers_enabled):
- watcher = XOSWatcher(sync_steps)
- watcher_thread = threading.Thread(target=watcher.run,name='watcher')
- watcher_thread.start()
-
- # start model policies thread
- policies_dir = getattr(Config(), "observer_model_policies_dir", None)
- if policies_dir:
- from synchronizers.model_policy import run_policy
- model_policy_thread = threading.Thread(target=run_policy)
- model_policy_thread.start()
- else:
- model_policy_thread = None
- logger.info("Skipping model policies thread due to no model_policies dir.")
-
- while True:
- try:
- time.sleep(1000)
- except KeyboardInterrupt:
- print "exiting due to keyboard interrupt"
- # TODO: See about setting the threads as daemons
- observer_thread._Thread__stop()
- if model_policy_thread:
- model_policy_thread._Thread__stop()
- sys.exit(1)
-
diff --git a/xos/synchronizers/base/deleter.py b/xos/synchronizers/base/deleter.py
deleted file mode 100644
index 93fa572..0000000
--- a/xos/synchronizers/base/deleter.py
+++ /dev/null
@@ -1,16 +0,0 @@
-import os
-import base64
-from xos.config import Config
-
-class Deleter:
- model=None # Must be overridden
-
- def __init__(self, *args, **kwargs):
- pass
-
- def call(self, pk, model_dict):
- # Fetch object from XOS db and delete it
- pass
-
- def __call__(self, *args, **kwargs):
- return self.call(*args, **kwargs)
diff --git a/xos/synchronizers/base/diag.py b/xos/synchronizers/base/diag.py
deleted file mode 100644
index 86874d6..0000000
--- a/xos/synchronizers/base/diag.py
+++ /dev/null
@@ -1,37 +0,0 @@
-import os
-import time
-import sys
-import traceback
-import json
-
-from core.models import Diag
-from xos.config import Config, XOS_DIR
-from xos.logger import Logger, logging, logger
-
-logger = Logger(level=logging.INFO)
-
-def update_diag(loop_end=None, loop_start=None, syncrecord_start=None, sync_start=None, backend_status=None):
- observer_name = Config().observer_name
-
- try:
- diag = Diag.objects.filter(name=observer_name).first()
- if (not diag):
- diag = Diag(name=observer_name)
- br_str = diag.backend_register
- br = json.loads(br_str)
- if loop_end:
- br['last_run'] = loop_end
- if loop_end and loop_start:
- br['last_duration'] = loop_end - loop_start
- if syncrecord_start:
- br['last_syncrecord_start'] = syncrecord_start
- if sync_start:
- br['last_synchronizer_start'] = sync_start
- if backend_status:
- diag.backend_status = backend_status
- diag.backend_register = json.dumps(br)
- diag.save()
- except:
- logger.log_exc("Exception in update_diag")
- traceback.print_exc()
-
diff --git a/xos/synchronizers/base/error_mapper.py b/xos/synchronizers/base/error_mapper.py
deleted file mode 100644
index 9eb878d..0000000
--- a/xos/synchronizers/base/error_mapper.py
+++ /dev/null
@@ -1,25 +0,0 @@
-from xos.config import Config
-from xos.logger import Logger, logging, logger
-
-class ErrorMapper:
- def __init__(self, error_map_file):
- self.error_map = {}
- try:
- error_map_lines = open(error_map_file).read().splitlines()
- for l in error_map_lines:
- if (not l.startswith('#')):
- splits = l.split('->')
- k,v = map(lambda i:i.rstrip(),splits)
- self.error_map[k]=v
- except:
- logging.info('Could not read error map')
-
-
- def map(self, error):
- return self.error_map[error]
-
-
-
-
-
-
diff --git a/xos/synchronizers/base/event_loop.py b/xos/synchronizers/base/event_loop.py
deleted file mode 100644
index 6e61051..0000000
--- a/xos/synchronizers/base/event_loop.py
+++ /dev/null
@@ -1,624 +0,0 @@
-import os
-import imp
-import inspect
-import time
-import sys
-import traceback
-import commands
-import threading
-import json
-import pdb
-import pprint
-import traceback
-
-
-from datetime import datetime
-from collections import defaultdict
-from core.models import *
-from django.db.models import F, Q
-from django.db import connection
-from django.db import reset_queries
-from xos.logger import Logger, logging, logger
-#from timeout import timeout
-from xos.config import Config, XOS_DIR
-from synchronizers.base.steps import *
-from syncstep import SyncStep
-from toposort import toposort
-from synchronizers.base.error_mapper import *
-from synchronizers.base.steps.sync_object import SyncObject
-from django.utils import timezone
-from diag import update_diag
-
-# Load app models
-
-try:
- app_module_names = Config().observer_applist.split(',')
-except AttributeError:
- app_module_names = []
-
-if (not isinstance(app_module_names, list)):
- app_module_names = [app_module_names]
-
-app_modules = []
-
-for m in app_module_names:
- model_path = m + '.models'
- module = __import__(model_path, fromlist=[m])
- app_modules.append(module)
-
-
-debug_mode = False
-
-
-class bcolors:
- HEADER = '\033[95m'
- OKBLUE = '\033[94m'
- OKGREEN = '\033[92m'
- WARNING = '\033[93m'
- FAIL = '\033[91m'
- ENDC = '\033[0m'
- BOLD = '\033[1m'
- UNDERLINE = '\033[4m'
-
-logger = Logger(level=logging.INFO)
-
-
-class StepNotReady(Exception):
- pass
-
-
-class NoOpDriver:
-
- def __init__(self):
- self.enabled = True
- self.dependency_graph = None
-
-# Everyone gets NoOpDriver by default. To use a different driver, call
-# set_driver() below.
-
-DRIVER = NoOpDriver()
-
-
-def set_driver(x):
- global DRIVER
- DRIVER = x
-
-STEP_STATUS_WORKING = 1
-STEP_STATUS_OK = 2
-STEP_STATUS_KO = 3
-
-
-def invert_graph(g):
- ig = {}
- for k, v in g.items():
- for v0 in v:
- try:
- ig[v0].append(k)
- except:
- ig[v0] = [k]
- return ig
-
-
-class XOSObserver:
- sync_steps = []
-
- def __init__(self, sync_steps):
- # The Condition object that gets signalled by Feefie events
- self.step_lookup = {}
- # self.load_sync_step_modules()
- self.sync_steps = sync_steps
- self.load_sync_steps()
- self.event_cond = threading.Condition()
-
- self.driver = DRIVER
- self.observer_name = getattr(Config(), "observer_name", "")
-
- def consolePrint(self, what):
- if getattr(Config(), "observer_console_print", True):
- print what
-
- def wait_for_event(self, timeout):
- self.event_cond.acquire()
- self.event_cond.wait(timeout)
- self.event_cond.release()
-
- def wake_up(self):
- logger.info('Wake up routine called. Event cond %r' % self.event_cond)
- self.event_cond.acquire()
- self.event_cond.notify()
- self.event_cond.release()
-
- def load_sync_step_modules(self, step_dir=None):
- if step_dir is None:
- step_dir = Config().observer_steps_dir
-
- for fn in os.listdir(step_dir):
- pathname = os.path.join(step_dir, fn)
- if os.path.isfile(pathname) and fn.endswith(
- ".py") and (fn != "__init__.py"):
- module = imp.load_source(fn[:-3], pathname)
- for classname in dir(module):
- c = getattr(module, classname, None)
-
- # make sure 'c' is a descendent of SyncStep and has a
- # provides field (this eliminates the abstract base classes
- # since they don't have a provides)
-
- if inspect.isclass(c) and issubclass(
- c, SyncStep) and hasattr(
- c, "provides") and (
- c not in self.sync_steps):
- self.sync_steps.append(c)
- logger.info('loaded sync steps: %s' %
- ",".join([x.__name__ for x in self.sync_steps]))
-
- def load_sync_steps(self):
- dep_path = Config().observer_dependency_graph
- logger.info('Loading model dependency graph from %s' % dep_path)
- try:
- # This contains dependencies between records, not sync steps
- self.model_dependency_graph = json.loads(open(dep_path).read())
- for left, lst in self.model_dependency_graph.items():
- new_lst = []
- for k in lst:
- try:
- tup = (k, k.lower())
- new_lst.append(tup)
- deps = self.model_dependency_graph[k]
- except:
- self.model_dependency_graph[k] = []
-
- self.model_dependency_graph[left] = new_lst
- except Exception as e:
- raise e
-
- try:
- backend_path = Config().observer_pl_dependency_graph
- logger.info(
- 'Loading backend dependency graph from %s' %
- backend_path)
- # This contains dependencies between backend records
- self.backend_dependency_graph = json.loads(
- open(backend_path).read())
- for k, v in self.backend_dependency_graph.items():
- try:
- self.model_dependency_graph[k].extend(v)
- except KeyError:
- self.model_dependency_graphp[k] = v
-
- except Exception as e:
- logger.info('Backend dependency graph not loaded')
- # We can work without a backend graph
- self.backend_dependency_graph = {}
-
- provides_dict = {}
- for s in self.sync_steps:
- self.step_lookup[s.__name__] = s
- for m in s.provides:
- try:
- provides_dict[m.__name__].append(s.__name__)
- except KeyError:
- provides_dict[m.__name__] = [s.__name__]
-
- step_graph = {}
- phantom_steps = []
- for k, v in self.model_dependency_graph.items():
- try:
- for source in provides_dict[k]:
- if (not v):
- step_graph[source] = []
-
- for m, _ in v:
- try:
- for dest in provides_dict[m]:
- # no deps, pass
- try:
- if (dest not in step_graph[source]):
- step_graph[source].append(dest)
- except:
- step_graph[source] = [dest]
- except KeyError:
- if (m not in provides_dict):
- try:
- step_graph[source] += ['#%s' % m]
- except:
- step_graph[source] = ['#%s' % m]
-
- phantom_steps += ['#%s' % m]
- pass
-
- except KeyError:
- pass
- # no dependencies, pass
-
- self.dependency_graph = step_graph
- self.deletion_dependency_graph = invert_graph(step_graph)
-
- pp = pprint.PrettyPrinter(indent=4)
- logger.debug(pp.pformat(step_graph))
- self.ordered_steps = toposort(
- self.dependency_graph, phantom_steps + map(lambda s: s.__name__, self.sync_steps))
- self.ordered_steps = [
- i for i in self.ordered_steps if i != 'SyncObject']
-
- logger.info("Order of steps=%s" % self.ordered_steps)
-
- self.load_run_times()
-
- def check_duration(self, step, duration):
- try:
- if (duration > step.deadline):
- logger.info(
- 'Sync step %s missed deadline, took %.2f seconds' %
- (step.name, duration))
- except AttributeError:
- # S doesn't have a deadline
- pass
-
- def update_run_time(self, step, deletion):
- if (not deletion):
- self.last_run_times[step.__name__] = time.time()
- else:
- self.last_deletion_run_times[step.__name__] = time.time()
-
- def check_schedule(self, step, deletion):
- last_run_times = self.last_run_times if not deletion else self.last_deletion_run_times
-
- time_since_last_run = time.time() - last_run_times.get(step.__name__, 0)
- try:
- if (time_since_last_run < step.requested_interval):
- raise StepNotReady
- except AttributeError:
- logger.info(
- 'Step %s does not have requested_interval set' %
- step.__name__)
- raise StepNotReady
-
- def load_run_times(self):
- try:
- jrun_times = open(
- '/tmp/%sobserver_run_times' %
- self.observer_name).read()
- self.last_run_times = json.loads(jrun_times)
- except:
- self.last_run_times = {}
- for e in self.ordered_steps:
- self.last_run_times[e] = 0
- try:
- jrun_times = open(
- '/tmp/%sobserver_deletion_run_times' %
- self.observer_name).read()
- self.last_deletion_run_times = json.loads(jrun_times)
- except:
- self.last_deletion_run_times = {}
- for e in self.ordered_steps:
- self.last_deletion_run_times[e] = 0
-
- def lookup_step_class(self, s):
- if ('#' in s):
- return SyncObject
- else:
- step = self.step_lookup[s]
- return step
-
- def lookup_step(self, s):
- if ('#' in s):
- objname = s[1:]
- so = SyncObject()
-
- try:
- obj = globals()[objname]
- except:
- for m in app_modules:
- if (hasattr(m, objname)):
- obj = getattr(m, objname)
-
- so.provides = [obj]
- so.observes = [obj]
- step = so
- else:
- step_class = self.step_lookup[s]
- step = step_class(driver=self.driver, error_map=self.error_mapper)
- return step
-
- def save_run_times(self):
- run_times = json.dumps(self.last_run_times)
- open(
- '/tmp/%sobserver_run_times' %
- self.observer_name,
- 'w').write(run_times)
-
- deletion_run_times = json.dumps(self.last_deletion_run_times)
- open('/tmp/%sobserver_deletion_run_times' %
- self.observer_name, 'w').write(deletion_run_times)
-
- def check_class_dependency(self, step, failed_steps):
- step.dependenices = []
- for obj in step.provides:
- lst = self.model_dependency_graph.get(obj.__name__, [])
- nlst = map(lambda a_b1: a_b1[1], lst)
- step.dependenices.extend(nlst)
- for failed_step in failed_steps:
- if (failed_step in step.dependencies):
- raise StepNotReady
-
- def sync(self, S, deletion):
- try:
- step = self.lookup_step_class(S)
- start_time = time.time()
-
- logger.debug(
- "Starting to work on step %s, deletion=%s" %
- (step.__name__, str(deletion)))
-
- dependency_graph = self.dependency_graph if not deletion else self.deletion_dependency_graph
- # if not deletion else self.deletion_step_conditions
- step_conditions = self.step_conditions
- step_status = self.step_status # if not deletion else self.deletion_step_status
-
- # Wait for step dependencies to be met
- try:
- deps = dependency_graph[S]
- has_deps = True
- except KeyError:
- has_deps = False
-
- go = True
-
- failed_dep = None
- if (has_deps):
- for d in deps:
- if d == step.__name__:
- logger.debug(
- " step %s self-wait skipped" %
- step.__name__)
- go = True
- continue
-
- cond = step_conditions[d]
- cond.acquire()
- if (step_status[d] is STEP_STATUS_WORKING):
- logger.debug(
- " step %s wait on dep %s" %
- (step.__name__, d))
- cond.wait()
- logger.debug(
- " step %s wait on dep %s cond returned" %
- (step.__name__, d))
- elif step_status[d] == STEP_STATUS_OK:
- go = True
- else:
- logger.debug(
- " step %s has failed dep %s" %
- (step.__name__, d))
- go = False
- failed_dep = d
- cond.release()
- if (not go):
- break
- else:
- go = True
-
- if (not go):
- logger.debug("Step %s skipped" % step.__name__)
- self.consolePrint(
- bcolors.FAIL + "Step %r skipped on %r" %
- (step, failed_dep) + bcolors.ENDC)
- # SMBAKER: sync_step was not defined here, so I changed
- # this from 'sync_step' to 'step'. Verify.
- self.failed_steps.append(step)
- my_status = STEP_STATUS_KO
- else:
- sync_step = self.lookup_step(S)
- sync_step. __name__ = step.__name__
- sync_step.dependencies = []
- try:
- mlist = sync_step.provides
-
- try:
- for m in mlist:
- lst = self.model_dependency_graph[m.__name__]
- nlst = map(lambda a_b: a_b[1], lst)
- sync_step.dependencies.extend(nlst)
- except Exception as e:
- raise e
-
- except KeyError:
- pass
- sync_step.debug_mode = debug_mode
-
- should_run = False
- try:
- # Various checks that decide whether
- # this step runs or not
- self.check_class_dependency(
- sync_step, self.failed_steps) # dont run Slices if Sites failed
- # dont run sync_network_routes if time since last run < 1
- # hour
- self.check_schedule(sync_step, deletion)
- should_run = True
- except StepNotReady:
- logger.info('Step not ready: %s' % sync_step.__name__)
- self.failed_steps.append(sync_step)
- my_status = STEP_STATUS_KO
- except Exception as e:
- logger.error('%r' % e)
- logger.log_exc(
- "sync step failed: %r. Deletion: %r" %
- (sync_step, deletion))
- self.failed_steps.append(sync_step)
- my_status = STEP_STATUS_KO
-
- if (should_run):
- try:
- duration = time.time() - start_time
-
- logger.debug(
- 'Executing step %s, deletion=%s' %
- (sync_step.__name__, deletion))
-
- self.consolePrint(
- bcolors.OKBLUE + "Executing step %s" %
- sync_step.__name__ + bcolors.ENDC)
- failed_objects = sync_step(
- failed=list(
- self.failed_step_objects),
- deletion=deletion)
-
- self.check_duration(sync_step, duration)
-
- if failed_objects:
- self.failed_step_objects.update(failed_objects)
-
- logger.debug(
- "Step %r succeeded, deletion=%s" %
- (sync_step.__name__, deletion))
- self.consolePrint(
- bcolors.OKGREEN + "Step %r succeeded" %
- sync_step.__name__ + bcolors.ENDC)
- my_status = STEP_STATUS_OK
- self.update_run_time(sync_step, deletion)
- except Exception as e:
- self.consolePrint(
- bcolors.FAIL + "Model step %r failed" %
- (sync_step.__name__) + bcolors.ENDC)
- logger.error(
- 'Model step %r failed. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!' %
- (sync_step.__name__, e))
- logger.log_exc("Exception in sync step")
- self.failed_steps.append(S)
- my_status = STEP_STATUS_KO
- else:
- logger.info("Step %r succeeded due to non-run" % step)
- my_status = STEP_STATUS_OK
-
- try:
- my_cond = step_conditions[S]
- my_cond.acquire()
- step_status[S] = my_status
- my_cond.notify_all()
- my_cond.release()
- except KeyError as e:
- logger.debug('Step %r is a leaf' % step)
- pass
- finally:
- try:
- reset_queries()
- except:
- # this shouldn't happen, but in case it does, catch it...
- logger.log_exc("exception in reset_queries")
-
- connection.close()
-
- def run(self):
- if not self.driver.enabled:
- return
-
- while True:
- logger.debug('Waiting for event')
- self.wait_for_event(timeout=5)
- logger.debug('Observer woke up')
-
- self.run_once()
-
- def check_db_connection_okay(self):
- # django implodes if the database connection is closed by
- # docker-compose
- try:
- diag = Diag.objects.filter(name="foo").first()
- except Exception as e:
- from django import db
- if "connection already closed" in traceback.format_exc():
- logger.error("XXX connection already closed")
- try:
- # if db.connection:
- # db.connection.close()
- db.close_old_connections()
- except:
- logger.log_exc("XXX we failed to fix the failure")
- else:
- logger.log_exc("XXX some other error")
-
- def run_once(self):
- try:
- self.check_db_connection_okay()
-
- loop_start = time.time()
- error_map_file = getattr(
- Config(),
- "error_map_path",
- XOS_DIR +
- "/error_map.txt")
- self.error_mapper = ErrorMapper(error_map_file)
-
- # Two passes. One for sync, the other for deletion.
- for deletion in [False, True]:
- # Set of individual objects within steps that failed
- self.failed_step_objects = set()
-
- # Set up conditions and step status
- # This is needed for steps to run in parallel
- # while obeying dependencies.
-
- providers = set()
- dependency_graph = self.dependency_graph if not deletion else self.deletion_dependency_graph
-
- for v in dependency_graph.values():
- if (v):
- providers.update(v)
-
- self.step_conditions = {}
- self.step_status = {}
-
- for p in list(providers):
- self.step_conditions[p] = threading.Condition()
-
- self.step_status[p] = STEP_STATUS_WORKING
-
- self.failed_steps = []
-
- threads = []
- logger.debug('Deletion=%r...' % deletion)
- schedule = self.ordered_steps if not deletion else reversed(
- self.ordered_steps)
-
- for S in schedule:
- thread = threading.Thread(
- target=self.sync, name='synchronizer', args=(
- S, deletion))
-
- logger.debug('Deletion=%r...' % deletion)
- threads.append(thread)
-
- # Start threads
- for t in threads:
- t.start()
-
- # another spot to clean up debug state
- try:
- reset_queries()
- except:
- # this shouldn't happen, but in case it does, catch it...
- logger.log_exc("exception in reset_queries")
-
- # Wait for all threads to finish before continuing with the run
- # loop
- for t in threads:
- t.join()
-
- self.save_run_times()
-
- loop_end = time.time()
-
- update_diag(
- loop_end=loop_end,
- loop_start=loop_start,
- backend_status="1 - Bottom Of Loop")
-
- except Exception as e:
- logger.error(
- 'Core error. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!' %
- e)
- logger.log_exc("Exception in observer run loop")
- traceback.print_exc()
- update_diag(backend_status="2 - Exception in Event Loop")
diff --git a/xos/synchronizers/base/event_manager.py b/xos/synchronizers/base/event_manager.py
deleted file mode 100644
index fce2b68..0000000
--- a/xos/synchronizers/base/event_manager.py
+++ /dev/null
@@ -1,120 +0,0 @@
-import threading
-import requests, json
-
-from xos.config import Config, XOS_DIR
-
-import uuid
-import os
-import imp
-import inspect
-import base64
-import json
-import traceback
-
-if getattr(Config(),"observer_fofum_disabled", False) != True:
- from fofum import Fofum
- fofum_enabled = True
-else:
- fofum_enabled = False
-
-random_client_id=None
-def get_random_client_id():
- global random_client_id
-
- if (random_client_id is None) and os.path.exists(XOS_DIR + "/random_client_id"):
- # try to use the last one we used, if we saved it
- try:
- random_client_id = open(XOS_DIR+"/random_client_id","r").readline().strip()
- print "get_random_client_id: loaded %s" % random_client_id
- except:
- print "get_random_client_id: failed to read " + XOS_DIR + "/random_client_id"
-
- if random_client_id is None:
- random_client_id = base64.urlsafe_b64encode(os.urandom(12))
- print "get_random_client_id: generated new id %s" % random_client_id
-
- # try to save it for later (XXX: could race with another client here)
- try:
- open(XOS_DIR + "/random_client_id","w").write("%s\n" % random_client_id)
- except:
- print "get_random_client_id: failed to write " + XOS_DIR + "/random_client_id"
-
- return random_client_id
-
-# decorator that marks dispatachable event methods
-def event(func):
- setattr(func, 'event', func.__name__)
- return func
-
-class EventHandler:
- # This code is currently not in use.
- def __init__(self):
- pass
-
- @staticmethod
- def get_events():
- events = []
- for name in dir(EventHandler):
- attribute = getattr(EventHandler, name)
- if hasattr(attribute, 'event'):
- events.append(getattr(attribute, 'event'))
- return events
-
- def dispatch(self, event, *args, **kwds):
- if hasattr(self, event):
- return getattr(self, event)(*args, **kwds)
-
-
-class EventSender:
- def __init__(self,user=None,clientid=None):
- try:
- user = Config().feefie_client_user
- except:
- user = 'pl'
-
- try:
- clid = Config().feefie_client_id
- except:
- clid = get_random_client_id()
- print "EventSender: no feefie_client_id configured. Using random id %s" % clid
-
- if fofum_enabled:
- self.fofum = Fofum(user=user)
- self.fofum.make(clid)
-
- def fire(self,**kwargs):
- kwargs["uuid"] = str(uuid.uuid1())
- if fofum_enabled:
- self.fofum.fire(json.dumps(kwargs))
-
-class EventListener:
- def __init__(self,wake_up=None):
- self.handler = EventHandler()
- self.wake_up = wake_up
-
- def handle_event(self, payload):
- payload_dict = json.loads(payload)
-
- if (self.wake_up):
- self.wake_up()
-
- def run(self):
- # This is our unique client id, to be used when firing and receiving events
- # It needs to be generated once and placed in the config file
-
- try:
- user = Config().feefie_client_user
- except:
- user = 'pl'
-
- try:
- clid = Config().feefie_client_id
- except:
- clid = get_random_client_id()
- print "EventListener: no feefie_client_id configured. Using random id %s" % clid
-
- if fofum_enabled:
- f = Fofum(user=user)
-
- listener_thread = threading.Thread(target=f.listen_for_event,args=(clid,self.handle_event))
- listener_thread.start()
diff --git a/xos/synchronizers/base/run_ansible b/xos/synchronizers/base/run_ansible
deleted file mode 100755
index 662f798..0000000
--- a/xos/synchronizers/base/run_ansible
+++ /dev/null
@@ -1,4 +0,0 @@
-#!/bin/bash
-
-export REQUESTS_CA_BUNDLE=/usr/local/share/ca-certificates/local_certs.crt
-ansible-playbook -v "$@"
diff --git a/xos/synchronizers/base/run_ansible_verbose b/xos/synchronizers/base/run_ansible_verbose
deleted file mode 100755
index d72b12d..0000000
--- a/xos/synchronizers/base/run_ansible_verbose
+++ /dev/null
@@ -1,4 +0,0 @@
-#!/bin/bash
-
-export REQUESTS_CA_BUNDLE=/usr/local/share/ca-certificates/local_certs.crt
-ansible-playbook -vvv "$@"
diff --git a/xos/synchronizers/base/steps/__init__.py b/xos/synchronizers/base/steps/__init__.py
deleted file mode 100644
index c70b0c0..0000000
--- a/xos/synchronizers/base/steps/__init__.py
+++ /dev/null
@@ -1,6 +0,0 @@
-#from .sync_controller_sites import SyncControllerSites
-#from .sync_controller_slices import SyncControllerSlices
-#from .sync_controller_users import SyncControllerUsers
-#from .sync_controller_site_privileges import SyncControllerSitePrivileges
-#from .sync_controller_slice_privileges import SyncControllerSlicePrivileges
-#from .sync_controller_networks import SyncControllerNetworks
diff --git a/xos/synchronizers/base/steps/sync_object.py b/xos/synchronizers/base/steps/sync_object.py
deleted file mode 100644
index 5049325..0000000
--- a/xos/synchronizers/base/steps/sync_object.py
+++ /dev/null
@@ -1,18 +0,0 @@
-import os
-import base64
-from collections import defaultdict
-from django.db.models import F, Q
-from xos.config import Config
-from synchronizers.base.syncstep import *
-from core.models import *
-from synchronizers.base.ansible_helper import *
-from xos.logger import observer_logger as logger
-import json
-
-class SyncObject(SyncStep):
- provides=[] # Caller fills this in
- requested_interval=0
- observes=[] # Caller fills this in
-
- def sync_record(self, r):
- raise DeferredException('Waiting for Service dependency: %r'%r)
diff --git a/xos/synchronizers/base/syncstep-portal.py b/xos/synchronizers/base/syncstep-portal.py
deleted file mode 100644
index 07abb0b..0000000
--- a/xos/synchronizers/base/syncstep-portal.py
+++ /dev/null
@@ -1,222 +0,0 @@
-import os
-import base64
-from datetime import datetime
-from xos.config import Config
-from xos.logger import Logger, logging
-from synchronizers.base.steps import *
-from django.db.models import F, Q
-from django.utils import timezone
-from core.models import *
-from django.db import reset_queries
-import json
-import time
-import pdb
-import traceback
-
-logger = Logger(level=logging.INFO)
-
-def f7(seq):
- seen = set()
- seen_add = seen.add
- return [ x for x in seq if not (x in seen or seen_add(x))]
-
-def elim_dups(backend_str):
- strs = backend_str.split('/')
- strs = map(lambda x:x.split('(')[0],strs)
- strs2 = f7(strs)
- return '/'.join(strs2)
-
-def deepgetattr(obj, attr):
- return reduce(getattr, attr.split('.'), obj)
-
-
-class InnocuousException(Exception):
- pass
-
-class FailedDependency(Exception):
- pass
-
-class SyncStep(object):
- """ An XOS Sync step.
-
- Attributes:
- psmodel Model name the step synchronizes
- dependencies list of names of models that must be synchronized first if the current model depends on them
- """
- slow=False
- def get_prop(prop):
- try:
- sync_config_dir = Config().sync_config_dir
- except:
- sync_config_dir = '/etc/xos/sync'
- prop_config_path = '/'.join(sync_config_dir,self.name,prop)
- return open(prop_config_path).read().rstrip()
-
- def __init__(self, **args):
- """Initialize a sync step
- Keyword arguments:
- name -- Name of the step
- provides -- XOS models sync'd by this step
- """
- dependencies = []
- self.driver = args.get('driver')
- self.error_map = args.get('error_map')
-
- try:
- self.soft_deadline = int(self.get_prop('soft_deadline_seconds'))
- except:
- self.soft_deadline = 5 # 5 seconds
-
- return
-
- def fetch_pending(self, deletion=False):
- # This is the most common implementation of fetch_pending
- # Steps should override it if they have their own logic
- # for figuring out what objects are outstanding.
- main_obj = self.observes
- if (not deletion):
- objs = main_obj.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None),Q(lazy_blocked=False))
- else:
- objs = main_obj.deleted_objects.all()
-
- return objs
- #return Sliver.objects.filter(ip=None)
-
- def check_dependencies(self, obj, failed):
- for dep in self.dependencies:
- peer_name = dep[0].lower() + dep[1:] # django names are camelCased with the first letter lower
-
- try:
- peer_object = deepgetattr(obj, peer_name)
- try:
- peer_objects = peer_object.all()
- except AttributeError:
- peer_objects = [peer_object]
- except:
- peer_objects = []
-
- if (hasattr(obj,'controller')):
- try:
- peer_objects = filter(lambda o:o.controller==obj.controller, peer_objects)
- except AttributeError:
- pass
-
- if (failed in peer_objects):
- if (obj.backend_status!=failed.backend_status):
- obj.backend_status = failed.backend_status
- obj.save(update_fields=['backend_status'])
- raise FailedDependency("Failed dependency for %s:%s peer %s:%s failed %s:%s" % (obj.__class__.__name__, str(getattr(obj,"pk","no_pk")), peer_object.__class__.__name__, str(getattr(peer_object,"pk","no_pk")), failed.__class__.__name__, str(getattr(failed,"pk","no_pk"))))
-
- def call(self, failed=[], deletion=False):
- pending = self.fetch_pending(deletion)
- for o in pending:
- # another spot to clean up debug state
- try:
- reset_queries()
- except:
- # this shouldn't happen, but in case it does, catch it...
- logger.log_exc("exception in reset_queries",extra=o.tologdict())
-
- sync_failed = False
- try:
- backoff_disabled = Config().observer_backoff_disabled
- except:
- backoff_disabled = 0
-
- try:
- scratchpad = json.loads(o.backend_register)
- if (scratchpad):
- next_run = scratchpad['next_run']
- if (not backoff_disabled and next_run>time.time()):
- sync_failed = True
- except:
- logger.log_exc("Exception while loading scratchpad",extra=o.tologdict())
- pass
-
- if (not sync_failed):
- try:
- for f in failed:
- self.check_dependencies(o,f) # Raises exception if failed
- if (deletion):
- self.delete_record(o)
- o.delete(purge=True)
- else:
- self.sync_record(o)
- o.enacted = timezone.now() # Is this the same timezone? XXX
- scratchpad = {'next_run':0, 'exponent':0}
- o.backend_register = json.dumps(scratchpad)
- o.backend_status = "1 - OK"
- o.save(update_fields=['enacted','backend_status','backend_register'])
- except (InnocuousException,Exception) as e:
- logger.log_exc("Syncstep caught exception",extra=o.tologdict())
-
- force_error = False
- try:
- if (o.backend_status.startswith('2 - ')):
- force_error = False # Already in error state
- str_e = '%s/%s'%(o.backend_status[4:],str(e))
- str_e = elim_dups(str_e)
- else:
- str_e = str(e)
- except:
- str_e = str(e)
-
- if (not str_e):
- str_e = 'Unknown'
-
- try:
- error = self.error_map.map(str_e)
- except:
- error = str_e
-
- if isinstance(e, InnocuousException) and not force_error:
- o.backend_status = '1 - %s'%error
- else:
- o.backend_status = '2 - %s'%error
-
- cmd = 'wget -O /dev/null -q "http://xoslnprof.appspot.com/command?action=pushlog&node=1&log_path=/%s/%s"'%(self.__class__.__name__,error)
- os.system(cmd)
-
- try:
- scratchpad = json.loads(o.backend_register)
- scratchpad['exponent']
- except:
- logger.log_exc("Exception while updating scratchpad",extra=o.tologdict())
- scratchpad = {'next_run':0, 'exponent':0}
-
- # Second failure
- if (scratchpad['exponent']):
- delay = scratchpad['exponent'] * 600 # 10 minutes
- if (delay<1440):
- delay = 1440
- scratchpad['next_run'] = time.time() + delay
-
- scratchpad['exponent']+=1
-
- o.backend_register = json.dumps(scratchpad)
-
- # TOFIX:
- # DatabaseError: value too long for type character varying(140)
- if (o.pk):
- try:
- o.backend_status = o.backend_status[:1024]
- o.save(update_fields=['backend_status','backend_register','updated'])
- except:
- print "Could not update backend status field!"
- pass
- sync_failed = True
-
-
- if (sync_failed):
- failed.append(o)
-
- return failed
-
- def sync_record(self, o):
- return
-
- def delete_record(self, o):
- return
-
- def __call__(self, **args):
- return self.call(**args)
diff --git a/xos/synchronizers/base/syncstep.py b/xos/synchronizers/base/syncstep.py
deleted file mode 100644
index 2c22db4..0000000
--- a/xos/synchronizers/base/syncstep.py
+++ /dev/null
@@ -1,335 +0,0 @@
-import os
-import base64
-from datetime import datetime
-from django.utils import timezone
-from xos.config import Config
-from xos.logger import Logger, logging
-from synchronizers.base.steps import *
-from django.db.models import F, Q
-from core.models import *
-from django.db import reset_queries
-from synchronizers.base.ansible_helper import *
-from generate.dependency_walker import *
-from diag import update_diag
-
-import json
-import time
-import pdb
-
-logger = Logger(level=logging.DEBUG)
-
-def f7(seq):
- seen = set()
- seen_add = seen.add
- return [ x for x in seq if not (x in seen or seen_add(x))]
-
-def elim_dups(backend_str):
- strs = backend_str.split(' // ')
- strs2 = f7(strs)
- return ' // '.join(strs2)
-
-def deepgetattr(obj, attr):
- return reduce(getattr, attr.split('.'), obj)
-
-
-class InnocuousException(Exception):
- pass
-
-class DeferredException(Exception):
- pass
-
-class FailedDependency(Exception):
- pass
-
-class SyncStep(object):
- """ An XOS Sync step.
-
- Attributes:
- psmodel Model name the step synchronizes
- dependencies list of names of models that must be synchronized first if the current model depends on them
- """
-
- # map_sync_outputs can return this value to cause a step to be marked
- # successful without running ansible. Used for sync_network_controllers
- # on nat networks.
- SYNC_WITHOUT_RUNNING = "sync_without_running"
-
- slow=False
- def get_prop(self, prop):
- try:
- sync_config_dir = Config().sync_config_dir
- except:
- sync_config_dir = '/etc/xos/sync'
- prop_config_path = '/'.join(sync_config_dir,self.name,prop)
- return open(prop_config_path).read().rstrip()
-
- def __init__(self, **args):
- """Initialize a sync step
- Keyword arguments:
- name -- Name of the step
- provides -- XOS models sync'd by this step
- """
- dependencies = []
- self.driver = args.get('driver')
- self.error_map = args.get('error_map')
-
- try:
- self.soft_deadline = int(self.get_prop('soft_deadline_seconds'))
- except:
- self.soft_deadline = 5 # 5 seconds
-
- return
-
- def fetch_pending(self, deletion=False):
- # This is the most common implementation of fetch_pending
- # Steps should override it if they have their own logic
- # for figuring out what objects are outstanding.
-
- main_objs = self.observes
- if (type(main_objs) is not list):
- main_objs=[main_objs]
-
- objs = []
- for main_obj in main_objs:
- if (not deletion):
- lobjs = main_obj.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None),Q(lazy_blocked=False),Q(no_sync=False))
- else:
- lobjs = main_obj.deleted_objects.all()
- objs.extend(lobjs)
-
- return objs
- #return Instance.objects.filter(ip=None)
-
- def check_dependencies(self, obj, failed):
- for dep in self.dependencies:
- peer_name = dep[0].lower() + dep[1:] # django names are camelCased with the first letter lower
-
- peer_objects=[]
- try:
- peer_names = plural(peer_name)
- peer_object_list=[]
-
- try:
- peer_object_list.append(deepgetattr(obj, peer_name))
- except:
- pass
-
- try:
- peer_object_list.append(deepgetattr(obj, peer_names))
- except:
- pass
-
- for peer_object in peer_object_list:
- try:
- peer_objects.extend(peer_object.all())
- except AttributeError:
- peer_objects.append(peer_object)
- except:
- peer_objects = []
-
- if (hasattr(obj,'controller')):
- try:
- peer_objects = filter(lambda o:o.controller==obj.controller, peer_objects)
- except AttributeError:
- pass
-
- if (failed in peer_objects):
- if (obj.backend_status!=failed.backend_status):
- obj.backend_status = failed.backend_status
- obj.save(update_fields=['backend_status'])
- raise FailedDependency("Failed dependency for %s:%s peer %s:%s failed %s:%s" % (obj.__class__.__name__, str(getattr(obj,"pk","no_pk")), peer_object.__class__.__name__, str(getattr(peer_object,"pk","no_pk")), failed.__class__.__name__, str(getattr(failed,"pk","no_pk"))))
-
-
- def sync_record(self, o):
- logger.debug("Sync_record called for %s %s" % (o.__class__.__name__, str(o)))
-
- try:
- controller = o.get_controller()
- controller_register = json.loads(controller.backend_register)
-
- if (controller_register.get('disabled',False)):
- raise InnocuousException('Controller %s is disabled'%controller.name)
- except AttributeError:
- pass
-
- tenant_fields = self.map_sync_inputs(o)
- if tenant_fields == SyncStep.SYNC_WITHOUT_RUNNING:
- return
- main_objs=self.observes
- if (type(main_objs) is list):
- main_objs=main_objs[0]
-
- path = ''.join(main_objs.__name__).lower()
- res = run_template(self.playbook,tenant_fields,path=path, object=o)
-
- if hasattr(self, "map_sync_outputs"):
- self.map_sync_outputs(o,res)
-
- def delete_record(self, o):
- try:
- controller = o.get_controller()
- controller_register = json.loads(o.node.site_deployment.controller.backend_register)
-
- if (controller_register.get('disabled',False)):
- raise InnocuousException('Controller %s is disabled'%sliver.node.site_deployment.controller.name)
- except AttributeError:
- pass
-
- tenant_fields = self.map_delete_inputs(o)
-
- main_objs=self.observes
- if (type(main_objs) is list):
- main_objs=main_objs[0]
-
- path = ''.join(main_objs.__name__).lower()
-
- tenant_fields['delete']=True
- res = run_template(self.playbook,tenant_fields,path=path)
- try:
- self.map_delete_outputs(o,res)
- except AttributeError:
- pass
-
- def call(self, failed=[], deletion=False):
- #if ('Instance' in self.__class__.__name__):
- # pdb.set_trace()
-
- pending = self.fetch_pending(deletion)
-
- for o in pending:
- # another spot to clean up debug state
- try:
- reset_queries()
- except:
- # this shouldn't happen, but in case it does, catch it...
- logger.log_exc("exception in reset_queries",extra=o.tologdict())
-
- sync_failed = False
- try:
- backoff_disabled = Config().observer_backoff_disabled
- except:
- backoff_disabled = 0
-
- try:
- scratchpad = json.loads(o.backend_register)
- if (scratchpad):
- next_run = scratchpad['next_run']
- if (not backoff_disabled and next_run>time.time()):
- sync_failed = True
- except:
- logger.log_exc("Exception while loading scratchpad",extra=o.tologdict())
- pass
-
- if (not sync_failed):
- try:
- for f in failed:
- self.check_dependencies(o,f) # Raises exception if failed
- if (deletion):
- if getattr(o, "backend_need_reap", False):
- # the object has already been deleted and marked for reaping
- journal_object(o,"syncstep.call.already_marked_reap")
- else:
- journal_object(o,"syncstep.call.delete_record")
- self.delete_record(o)
- journal_object(o,"syncstep.call.delete_set_reap")
- o.backend_need_reap = True
- o.save(update_fields=['backend_need_reap'])
- #o.delete(purge=True)
- else:
- new_enacted = timezone.now()
- try:
- run_always = self.run_always
- except AttributeError:
- run_always = False
-
- # Mark this as an object that will require delete. Do
- # this now rather than after the syncstep,
- if not (o.backend_need_delete):
- o.backend_need_delete = True
- o.save(update_fields=['backend_need_delete'])
-
- journal_object(o,"syncstep.call.sync_record")
- self.sync_record(o)
-
-# if (not run_always):
-# o.enacted = new_enacted
-
- update_diag(syncrecord_start = time.time(), backend_status="1 - Synced Record")
- o.enacted = new_enacted
- scratchpad = {'next_run':0, 'exponent':0, 'last_success':time.time()}
- o.backend_register = json.dumps(scratchpad)
- o.backend_status = "1 - OK"
- journal_object(o,"syncstep.call.save_update")
- o.save(update_fields=['enacted','backend_status','backend_register'])
- except (InnocuousException,Exception,DeferredException) as e:
- logger.log_exc("sync step failed!",extra=o.tologdict())
- try:
- if (o.backend_status.startswith('2 - ')):
- str_e = '%s // %r'%(o.backend_status[4:],e)
- str_e = elim_dups(str_e)
- else:
- str_e = '%r'%e
- except:
- str_e = '%r'%e
-
- try:
- error = self.error_map.map(str_e)
- except:
- error = '%s'%str_e
-
- if isinstance(e, InnocuousException):
- o.backend_status = '1 - %s'%error
- else:
- o.backend_status = '2 - %s'%error
-
- try:
- scratchpad = json.loads(o.backend_register)
- scratchpad['exponent']
- except:
- logger.log_exc("Exception while updating scratchpad",extra=o.tologdict())
- scratchpad = {'next_run':0, 'exponent':0, 'last_success':time.time(),'failures':0}
-
- # Second failure
- if (scratchpad['exponent']):
- if isinstance(e,DeferredException):
- delay = scratchpad['exponent'] * 60 # 1 minute
- else:
- delay = scratchpad['exponent'] * 600 # 10 minutes
- # cap delays at 8 hours
- if (delay>8*60*60):
- delay=8*60*60
- scratchpad['next_run'] = time.time() + delay
-
- try:
- scratchpad['exponent']+=1
- except:
- scratchpad['exponent']=1
-
- try:
- scratchpad['failures']+=1
- except KeyError:
- scratchpad['failures']=1
-
- scratchpad['last_failure']=time.time()
-
- o.backend_register = json.dumps(scratchpad)
-
- # TOFIX:
- # DatabaseError: value too long for type character varying(140)
- if (o.pk):
- try:
- o.backend_status = o.backend_status[:1024]
- o.save(update_fields=['backend_status','backend_register','updated'])
- except:
- print "Could not update backend status field!"
- pass
- sync_failed = True
-
-
- if (sync_failed):
- failed.append(o)
-
- return failed
-
- def __call__(self, **args):
- return self.call(**args)
diff --git a/xos/synchronizers/base/templates/container.conf.j2 b/xos/synchronizers/base/templates/container.conf.j2
deleted file mode 100644
index 7cbb880..0000000
--- a/xos/synchronizers/base/templates/container.conf.j2
+++ /dev/null
@@ -1,14 +0,0 @@
-# Upstart script for container
-description "container"
-author "smbaker@gmail.com"
-start on filesystem and started docker
-stop on runlevel [!2345]
-respawn
-
-script
- /usr/local/sbin/start-container-{{ container_name }}.sh ATTACH
-end script
-
-post-stop script
- /usr/local/sbin/stop-container-{{ container_name }}.sh
-end script
\ No newline at end of file
diff --git a/xos/synchronizers/base/templates/container.service.j2 b/xos/synchronizers/base/templates/container.service.j2
deleted file mode 100644
index 817d6d7..0000000
--- a/xos/synchronizers/base/templates/container.service.j2
+++ /dev/null
@@ -1,11 +0,0 @@
-[Unit]
-Description={{ container_name }}
-After=docker.service
-
-[Service]
-ExecStart=/bin/bash -c "/usr/local/sbin/start-container-{{ container_name }}.sh ATTACH"
-ExecStop=/bin/bash -c "/usr/local/sbin/stop-container-{{ container_name }}.sh"
-SuccessExitStatus=0 137
-
-[Install]
-WantedBy=multi-user.target
diff --git a/xos/synchronizers/base/templates/start-container.sh.j2 b/xos/synchronizers/base/templates/start-container.sh.j2
deleted file mode 100644
index 2fbf478..0000000
--- a/xos/synchronizers/base/templates/start-container.sh.j2
+++ /dev/null
@@ -1,136 +0,0 @@
-#!/bin/bash
-
-iptables -L > /dev/null
-ip6tables -L > /dev/null
-
-CONTAINER={{ container_name }}
-IMAGE={{ docker_image }}
-
-function mac_to_iface {
- PARENT_MAC=$1
- ifconfig|grep $PARENT_MAC| awk '{print $1}'|grep -v '\.'
-}
-
-function encapsulate_stag {
- LAN_IFACE=$1
- STAG=$2
- ifconfig $LAN_IFACE >> /dev/null
- if [ "$?" == 0 ]; then
- STAG_IFACE=$LAN_IFACE.$STAG
- ifconfig $LAN_IFACE up
- ifconfig $STAG_IFACE
- if [ "$?" == 0 ]; then
- echo $STAG_IFACE is already created
- else
- ifconfig $STAG_IFACE >> /dev/null || ip link add link $LAN_IFACE name $STAG_IFACE type vlan id $STAG
- fi
- ifconfig $STAG_IFACE up
- else
- echo There is no $LAN_IFACE. Aborting.
- exit -1
- fi
-}
-
-
-{% if volumes %}
-{% for volume in volumes %}
-DEST_DIR=/var/container_volumes/$CONTAINER/{{ volume }}
-mkdir -p $DEST_DIR
-VOLUME_ARGS="$VOLUME_ARGS -v $DEST_DIR:{{ volume }}"
-{% endfor %}
-{% endif %}
-
-docker inspect $CONTAINER > /dev/null 2>&1
-if [ "$?" == 1 ]
-then
- docker pull $IMAGE
-{% if network_method=="host" %}
- docker run -d --name=$CONTAINER --privileged=true --net=host $VOLUME_ARGS $IMAGE
-{% elif network_method=="bridged" %}
- docker run -d --name=$CONTAINER --privileged=true --net=bridge $VOLUME_ARGS $IMAGE
-{% else %}
- docker run -d --name=$CONTAINER --privileged=true --net=none $VOLUME_ARGS $IMAGE
-{% endif %}
-else
- docker start $CONTAINER
-fi
-
-{% if ports %}
-{% for port in ports %}
-
-{% if port.next_hop %}
-NEXTHOP_ARG="@{{ port.next_hop }}"
-{% else %}
-NEXTHOP_ARG=""
-{% endif %}
-
-{% if port.c_tag %}
-CTAG_ARG="@{{ port.c_tag }}"
-{% else %}
-CTAG_ARG=""
-{% endif %}
-
-{% if port.parent_mac %}
-# container-in-VM
-SRC_DEV=$( mac_to_iface "{{ port.parent_mac }}" )
-CMD="docker exec $CONTAINER ifconfig $SRC_DEV >> /dev/null || pipework $SRC_DEV -i {{ port.device }} $CONTAINER {{ port.ip }}/24$NEXTHOP_ARG {{ port.mac }} $CTAG_ARG"
-echo $CMD
-eval $CMD
-
-{% else %}
-# container-on-metal
-IP="{{ port.ip }}"
-{% if port.mac %}
-MAC="{{ port.mac }}"
-{% else %}
-MAC=""
-{% endif %}
-
-DEVICE="{{ port.device }}"
-BRIDGE="{{ port.bridge }}"
-{% if port.s_tag %}
-# This is intended for lan_network. Assume that BRIDGE is set to br_lan. We
-# create a device that strips off the S-TAG.
-STAG="{{ port.s_tag }}"
-encapsulate_stag $BRIDGE $STAG
-SRC_DEV=$STAG_IFACE
-{% else %}
-# This is for a standard neutron private network. We use a donor VM to setup
-# openvswitch for us, and we snoop at its devices and create a tap using the
-# same settings.
-XOS_NETWORK_ID="{{ port.xos_network_id }}"
-INSTANCE_MAC="{{ port.snoop_instance_mac }}"
-INSTANCE_ID="{{ port.snoop_instance_id }}"
-INSTANCE_TAP=`virsh domiflist $INSTANCE_ID | grep -i $INSTANCE_MAC | awk '{print $1}'`
-INSTANCE_TAP=${INSTANCE_TAP:3}
-VLAN_ID=`ovs-vsctl show | grep -i -A 1 port.*$INSTANCE_TAP | grep -i tag | awk '{print $2}'`
-# One tap for all containers per XOS/neutron network. Included the VLAN_ID in the
-# hash, to cover the case where XOS is reinstalled and the XOS network ids
-# get reused.
-TAP="con`echo ${XOS_NETWORK_ID}_$VLAN_ID|md5sum|awk '{print $1}'`"
-TAP=${TAP:0:10}
-echo im=$INSTANCE_MAC ii=$INSTANCE_ID it=$INSTANCE_TAP vlan=$VLAN_ID tap=$TAP con=$CONTAINER dev=$DEVICE mac=$MAC
-ovs-vsctl show | grep -i $TAP
-if [[ $? == 1 ]]; then
- echo creating tap
- ovs-vsctl add-port $BRIDGE $TAP tag=$VLAN_ID -- set interface $TAP type=internal
-else
- echo tap exists
-fi
-SRC_DEV=$TAP
-{% endif %}
-
-CMD="docker exec $CONTAINER ifconfig $DEVICE >> /dev/null || pipework $SRC_DEV -i $DEVICE $CONTAINER $IP/24$NEXTHOP_ARG $MAC $CTAG_ARG"
-echo $CMD
-eval $CMD
-{% endif %}
-{% endfor %}
-{% endif %}
-
-# Attach to container
-# (this is only done when using upstart, since upstart expects to be attached
-# to a running service)
-if [[ "$1" == "ATTACH" ]]; then
- docker start -a $CONTAINER
-fi
-
diff --git a/xos/synchronizers/base/templates/stop-container.sh.j2 b/xos/synchronizers/base/templates/stop-container.sh.j2
deleted file mode 100644
index 9cabb00..0000000
--- a/xos/synchronizers/base/templates/stop-container.sh.j2
+++ /dev/null
@@ -1,4 +0,0 @@
-CONTAINER={{ container_name }}
-
-docker stop $CONTAINER
-docker rm $CONTAINER
diff --git a/xos/synchronizers/base/toposort.py b/xos/synchronizers/base/toposort.py
deleted file mode 100644
index 6839861..0000000
--- a/xos/synchronizers/base/toposort.py
+++ /dev/null
@@ -1,72 +0,0 @@
-#!/usr/bin/env python
-
-import time
-import traceback
-import commands
-import threading
-import json
-import pdb
-
-from datetime import datetime
-from collections import defaultdict
-
-# Topological sort
-# Notes:
-# - Uses a stack instead of recursion
-# - Forfeits optimization involving tracking currently visited nodes
-def toposort(g, steps=None):
- # Get set of all nodes, including those without outgoing edges
- keys = set(g.keys())
- values = set({})
- for v in g.values():
- values=values | set(v)
-
- all_nodes=list(keys|values)
- if (not steps):
- steps = all_nodes
-
- # Final order
- order = []
-
- # DFS stack, not using recursion
- stack = []
-
- # Unmarked set
- unmarked = all_nodes
-
- # visiting = [] - skip, don't expect 1000s of nodes, |E|/|V| is small
-
- while unmarked:
- stack.insert(0,unmarked[0]) # push first unmarked
-
- while (stack):
- n = stack[0]
- add = True
- try:
- for m in g[n]:
- if (m in unmarked):
- add = False
- stack.insert(0,m)
- except KeyError:
- pass
- if (add):
- if (n in steps and n not in order):
- order.append(n)
- item = stack.pop(0)
- try:
- unmarked.remove(item)
- except ValueError:
- pass
-
- noorder = list(set(steps) - set(order))
- return order + noorder
-
-def main():
- graph_file=open('xos.deps').read()
- g = json.loads(graph_file)
- print toposort(g)
-
-if (__name__=='__main__'):
- main()
-
-#print toposort({'a':'b','b':'c','c':'d','d':'c'},['d','c','b','a'])
diff --git a/xos/synchronizers/base/watchers.py b/xos/synchronizers/base/watchers.py
deleted file mode 100644
index 753cebe..0000000
--- a/xos/synchronizers/base/watchers.py
+++ /dev/null
@@ -1,93 +0,0 @@
-import os
-import inspect
-import imp
-import time
-import sys
-import traceback
-import commands
-import threading
-import json
-import pprint
-import traceback
-
-
-from datetime import datetime
-from collections import defaultdict
-from core.models import *
-from django.db.models import F, Q
-from django.db import connection
-from django.db import reset_queries
-from xos.logger import Logger, logging, logger
-from xos.config import Config, XOS_DIR
-from synchronizers.base.steps import *
-from syncstep import SyncStep
-from synchronizers.base.error_mapper import *
-from synchronizers.base.steps.sync_object import SyncObject
-from django.utils import timezone
-from diag import update_diag
-import redis
-
-logger = Logger(level=logging.INFO)
-
-class XOSWatcher:
- def load_sync_step_modules(self, step_dir=None):
- if step_dir is None:
- try:
- step_dir = Config().observer_steps_dir
- except:
- step_dir = '/opt/xos/synchronizers/openstack/steps'
-
-
- for fn in os.listdir(step_dir):
- pathname = os.path.join(step_dir,fn)
- if os.path.isfile(pathname) and fn.endswith(".py") and (fn!="__init__.py"):
- module = imp.load_source(fn[:-3],pathname)
- for classname in dir(module):
- c = getattr(module, classname, None)
-
- # make sure 'c' is a descendent of SyncStep and has a
- # provides field (this eliminates the abstract base classes
- # since they don't have a provides)
-
- if inspect.isclass(c) and issubclass(c, SyncStep) and hasattr(c,"provides") and (c not in self.sync_steps):
- self.sync_steps.append(c)
-
- def load_sync_steps(self):
- for s in self.sync_steps:
- if hasattr(s,'watches'):
- for w in s.watches:
- w.source = s
- try:
- self.watch_map[w.dest.__name__].append(w)
- except:
- self.watch_map[w.dest.__name__]=[w]
-
- def __init__(self,sync_steps):
- self.watch_map = {}
- self.sync_steps = sync_steps
- #self.load_sync_step_modules()
- self.load_sync_steps()
- r = redis.Redis("redis")
- channels = self.watch_map.keys()
- self.redis = r
- self.pubsub = self.redis.pubsub()
- self.pubsub.subscribe(channels)
- logger.info("XOS watcher initialized")
-
- def run(self):
- for item in self.pubsub.listen():
- channel = item['channel']
- try:
- entry = self.watch_map[channel]
- data = json.loads(item['data'])
- pk = data['pk']
- changed_fields = data['changed_fields']
- for w in entry:
- if w.into in changed_fields or not w.into:
- if (hasattr(w.source, 'handle_watched_object')):
- o = w.dest.objects.get(pk=data['pk'])
- step = w.source()
- step.handle_watched_object(o)
- except Exception as e:
- logger.warn("XOS watcher: exception %s while processing object: %s" % (type(e),e))
- pass
diff --git a/xos/synchronizers/base/xos-synchronizer.py b/xos/synchronizers/base/xos-synchronizer.py
deleted file mode 100644
index 493b94a..0000000
--- a/xos/synchronizers/base/xos-synchronizer.py
+++ /dev/null
@@ -1,85 +0,0 @@
-#!/usr/bin/env python
-import os
-import argparse
-import sys
-
-sys.path.append('/opt/xos')
-
-os.environ.setdefault("DJANGO_SETTINGS_MODULE", "xos.settings")
-from synchronizers.base.backend import Backend
-from xos.config import Config, DEFAULT_CONFIG_FN
-from core.models import Instance,NetworkTemplate
-from xos.logger import Logger, logging, logger
-from django.db import ProgrammingError
-import time
-
-try:
- from django import setup as django_setup # django 1.7
-except:
- django_setup = False
-
-config = Config()
-
-# after http://www.erlenstar.demon.co.uk/unix/faq_2.html
-def daemon():
- """Daemonize the current process."""
- if os.fork() != 0: os._exit(0)
- os.setsid()
- if os.fork() != 0: os._exit(0)
- os.umask(0)
- devnull = os.open(os.devnull, os.O_RDWR)
- os.dup2(devnull, 0)
- # xxx fixme - this is just to make sure that nothing gets stupidly lost - should use devnull
- logdir=os.path.dirname(config.observer_logfile)
- # when installed in standalone we might not have httpd installed
- if not os.path.isdir(logdir): os.mkdir(logdir)
- crashlog = os.open('%s'%config.observer_logfile, os.O_RDWR | os.O_APPEND | os.O_CREAT, 0644)
- os.dup2(crashlog, 1)
- os.dup2(crashlog, 2)
-
- if hasattr(config, "observer_pidfile"):
- pidfile = config.get("observer_pidfile")
- else:
- pidfile = "/var/run/xosobserver.pid"
- try:
- file(pidfile,"w").write(str(os.getpid()))
- except:
- print "failed to create pidfile %s" % pidfile
-
-def main():
- # Generate command line parser
- parser = argparse.ArgumentParser(usage='%(prog)s [options]')
- parser.add_argument('-d', '--daemon', dest='daemon', action='store_true', default=False,
- help='Run as daemon.')
- # smbaker: util/config.py parses sys.argv[] directly to get config file name; include the option here to avoid
- # throwing unrecognized argument exceptions
- parser.add_argument('-C', '--config', dest='config_file', action='store', default=DEFAULT_CONFIG_FN,
- help='Name of config file.')
- args = parser.parse_args()
-
- if args.daemon: daemon()
-
- if django_setup: # 1.7
- django_setup()
-
- models_active = False
- wait = False
- while not models_active:
- try:
- _ = Instance.objects.first()
- _ = NetworkTemplate.objects.first()
- models_active = True
- except Exception,e:
- logger.info(str(e))
- logger.info('Waiting for data model to come up before starting...')
- time.sleep(10)
- wait = True
-
- if (wait):
- time.sleep(60) # Safety factor, seeing that we stumbled waiting for the data model to come up.
- backend = Backend()
- backend.run()
-
-if __name__ == '__main__':
-
- main()