+# Copyright 2017-present Open Networking Foundation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from .synchronizer import Synchronizer
+__all__ = ["Synchronizer"]
+#!/usr/bin/env python
+# Copyright 2017-present Open Networking Foundation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from __future__ import print_function
+import jinja2
+import tempfile
+import os
+import json
+import pickle
+import pdb
+import string
+import random
+import re
+import traceback
+import subprocess
+import threading
+from multiprocessing import Process, Queue
+from xosconfig import Config
+from multistructlog import create_logger
+log = create_logger(Config().get("logging"))
+step_dir = Config.get("steps_dir")
+sys_dir = Config.get("sys_dir")
+os_template_loader = jinja2.FileSystemLoader(
+    searchpath=[step_dir, "/opt/xos/synchronizers/shared_templates"]
+os_template_env = jinja2.Environment(loader=os_template_loader)
+def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
+    return "".join(random.choice(chars) for _ in range(size))
+def shellquote(s):
+    return "'" + s.replace("'", "'\\''") + "'"
+def get_playbook_fn(opts, path):
+    if not opts.get("ansible_tag", None):
+        # if no ansible_tag is in the options, then generate a unique one
+        objname = id_generator()
+        opts = opts.copy()
+        opts["ansible_tag"] = objname
+    objname = opts["ansible_tag"]
+    pathed_sys_dir = os.path.join(sys_dir, path)
+    if not os.path.isdir(pathed_sys_dir):
+        os.makedirs(pathed_sys_dir)
+    # symlink steps/roles into sys/roles so that playbooks can access roles
+    roledir = os.path.join(step_dir, "roles")
+    rolelink = os.path.join(pathed_sys_dir, "roles")
+    if os.path.isdir(roledir) and not os.path.islink(rolelink):
+        os.symlink(roledir, rolelink)
+    return (opts, os.path.join(pathed_sys_dir, objname))
+def run_playbook(ansible_hosts, ansible_config, fqp, opts):
+    args = {
+        "ansible_hosts": ansible_hosts,
+        "ansible_config": ansible_config,
+        "fqp": fqp,
+        "opts": opts,
+        "config_file": Config.get_config_file(),
+    }
+    keep_temp_files = Config.get("keep_temp_files")
+    dir = tempfile.mkdtemp()
+    args_fn = None
+    result_fn = None
+    try:
+"creating args file", dir=dir)
+        args_fn = os.path.join(dir, "args")
+        result_fn = os.path.join(dir, "result")
+        open(args_fn, "w").write(pickle.dumps(args))
+        ansible_main_fn = os.path.join(os.path.dirname(__file__), "")
+        os.system("python %s %s %s" % (ansible_main_fn, args_fn, result_fn))
+        result = pickle.loads(open(result_fn).read())
+        if hasattr(result, "exception"):
+            log.error("Exception in playbook", exception=result["exception"])
+        stats = result.get("stats", None)
+        aresults = result.get("aresults", None)
+    except Exception as e:
+        log.exception("Exception running ansible_main")
+        stats = None
+        aresults = None
+    finally:
+        if not keep_temp_files:
+            if args_fn and os.path.exists(args_fn):
+                os.remove(args_fn)
+            if result_fn and os.path.exists(result_fn):
+                os.remove(result_fn)
+            os.rmdir(dir)
+    return (stats, aresults)
+def run_template(
+    name,
+    opts,
+    path="",
+    expected_num=None,
+    ansible_config=None,
+    ansible_hosts=None,
+    run_ansible_script=None,
+    object=None,
+    template = os_template_env.get_template(name)
+    buffer = template.render(opts)
+    (opts, fqp) = get_playbook_fn(opts, path)
+    f = open(fqp, "w")
+    f.write(buffer)
+    f.flush()
+    """
+    q = Queue()
+    p = Process(target=run_playbook, args=(ansible_hosts, ansible_config, fqp, opts, q,))
+    p.start()
+    stats,aresults = q.get()
+    p.join()
+    """
+    stats, aresults = run_playbook(ansible_hosts, ansible_config, fqp, opts)
+    error_msg = []
+    output_file = fqp + ".out"
+    try:
+        if aresults is None:
+            raise ValueError("Error executing playbook %s" % fqp)
+        ok_results = []
+        total_unreachable = 0
+        failed = 0
+        ofile = open(output_file, "w")
+        for x in aresults:
+            if not x.is_failed() and not x.is_unreachable() and not x.is_skipped():
+                ok_results.append(x)
+            elif x.is_unreachable():
+                failed += 1
+                total_unreachable += 1
+                try:
+                    error_msg.append(x._result["msg"])
+                except BaseException:
+                    pass
+            elif x.is_failed():
+                failed += 1
+                try:
+                    error_msg.append(x._result["msg"])
+                except BaseException:
+                    pass
+            # FIXME (zdw, 2017-02-19) - may not be needed with new callback logging
+            ofile.write("%s: %s\n" % (x._task, str(x._result)))
+            if object:
+                oprops = object.tologdict()
+                ansible = x._result
+                oprops["xos_type"] = "ansible"
+                oprops["ansible_result"] = json.dumps(ansible)
+                if failed == 0:
+                    oprops["ansible_status"] = "OK"
+                else:
+                    oprops["ansible_status"] = "FAILED"
+      "Ran Ansible task", task=x._task, **oprops)
+        ofile.close()
+        if (expected_num is not None) and (len(ok_results) != expected_num):
+            raise ValueError(
+                "Unexpected num %s!=%d" % (str(expected_num), len(ok_results))
+            )
+        if failed:
+            raise ValueError("Ansible playbook failed.")
+        # NOTE(smbaker): Playbook errors are slipping through where `aresults` does not show any failed tasks, but
+        # `stats` does show them. See CORD-3169.
+        hosts = sorted(stats.processed.keys())
+        for h in hosts:
+            t = stats.summarize(h)
+            if t["unreachable"] > 0:
+                raise ValueError(
+                    "Ansible playbook reported unreachable for host %s" % h
+                )
+            if t["failures"] > 0:
+                raise ValueError("Ansible playbook reported failures for host %s" % h)
+    except ValueError as e:
+        if error_msg:
+            try:
+                error = " // ".join(error_msg)
+            except BaseException:
+                error = "failed to join error_msg"
+            raise Exception(error)
+        else:
+            raise
+    processed_results = map(lambda x: x._result, ok_results)
+    return processed_results[1:]  # 0 is setup
+def run_template_ssh(name, opts, path="", expected_num=None, object=None):
+    instance_name = opts["instance_name"]
+    hostname = opts["hostname"]
+    private_key = opts["private_key"]
+    baremetal_ssh = opts.get("baremetal_ssh", False)
+    if baremetal_ssh:
+        # no instance_id or ssh_ip for baremetal
+        # we never proxy to baremetal
+        proxy_ssh = False
+    else:
+        instance_id = opts["instance_id"]
+        ssh_ip = opts["ssh_ip"]
+        proxy_ssh = Config.get("proxy_ssh.enabled")
+        if not ssh_ip:
+            raise Exception("IP of ssh proxy not available. Synchronization deferred")
+    (opts, fqp) = get_playbook_fn(opts, path)
+    private_key_pathname = fqp + ".key"
+    config_pathname = fqp + ".cfg"
+    hosts_pathname = fqp + ".hosts"
+    f = open(private_key_pathname, "w")
+    f.write(private_key)
+    f.close()
+    f = open(config_pathname, "w")
+    f.write("[ssh_connection]\n")
+    if proxy_ssh:
+        proxy_ssh_key = Config.get("proxy_ssh.key")
+        proxy_ssh_user = Config.get("proxy_ssh.user")
+        if proxy_ssh_key:
+            # If proxy_ssh_key is known, then we can proxy into the compute
+            # node without needing to have the OpenCloud sshd machinery in
+            # place.
+            proxy_command = (
+                "ProxyCommand ssh -q -i %s -o StrictHostKeyChecking=no %s@%s nc %s 22"
+                % (proxy_ssh_key, proxy_ssh_user, hostname, ssh_ip)
+            )
+        else:
+            proxy_command = (
+                "ProxyCommand ssh -q -i %s -o StrictHostKeyChecking=no %s@%s"
+                % (private_key_pathname, instance_id, hostname)
+            )
+        f.write('ssh_args = -o "%s"\n' % proxy_command)
+    f.write("scp_if_ssh = True\n")
+    f.write("pipelining = True\n")
+    f.write("\n[defaults]\n")
+    f.write("host_key_checking = False\n")
+    f.write("timeout = 30\n")
+    f.close()
+    f = open(hosts_pathname, "w")
+    f.write("[%s]\n" % instance_name)
+    f.write("%s ansible_ssh_private_key_file=%s\n" % (ssh_ip, private_key_pathname))
+    f.close()
+    # SSH will complain if private key is world or group readable
+    os.chmod(private_key_pathname, 0o600)
+    print("ANSIBLE_CONFIG=%s" % config_pathname)
+    print("ANSIBLE_HOSTS=%s" % hosts_pathname)
+    return run_template(
+        name,
+        opts,
+        path,
+        ansible_config=config_pathname,
+        ansible_hosts=hosts_pathname,
+        run_ansible_script="/opt/xos/synchronizers/base/run_ansible_verbose",
+        object=object,
+    )
+def main():
+    run_template(
+        "ansible/sync_user_deployments.yaml",
+        {
+            "endpoint": "",
+            "name": "Sapan Bhatia",
+            "email": "",
+            "password": "foobar",
+            "admin_user": "admin",
+            "admin_password": "6a789bf69dd647e2",
+            "admin_tenant": "admin",
+            "tenant": "demo",
+            "roles": ["user", "admin"],
+        },
+    )
+# Copyright 2017-present Open Networking Foundation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import os
+import pickle
+import sys
+# import json
+import traceback
+from xosconfig import Config
+def run_playbook(ansible_hosts, ansible_config, fqp, opts):
+    try:
+        if ansible_config:
+            os.environ["ANSIBLE_CONFIG"] = ansible_config
+        else:
+            try:
+                del os.environ["ANSIBLE_CONFIG"]
+            except KeyError:
+                pass
+        if ansible_hosts:
+            os.environ["ANSIBLE_HOSTS"] = ansible_hosts
+        else:
+            try:
+                del os.environ["ANSIBLE_HOSTS"]
+            except KeyError:
+                pass
+        import ansible_runner
+        reload(ansible_runner)
+        # Dropped support for observer_pretend - to be redone
+        runner = ansible_runner.Runner(
+            playbook=fqp, run_data=opts, host_file=ansible_hosts
+        )
+        stats, aresults =
+    except Exception as e:
+        return {"stats": None, "aresults": None, "exception": traceback.format_exc()}
+    return {"stats": stats, "aresults": aresults}
+def main():
+    input_fn = sys.argv[1]
+    result_fn = sys.argv[2]
+    args = pickle.loads(open(input_fn).read())
+    Config.init(args["config_file"], "synchronizer-config-schema.yaml")
+    ansible_hosts = args["ansible_hosts"]
+    ansible_config = args["ansible_config"]
+    fqp = args["fqp"]
+    opts = args["opts"]
+    result = run_playbook(ansible_hosts, ansible_config, fqp, opts)
+    open(result_fn, "w").write(pickle.dumps(result))
+if __name__ == "__main__":
+    main()
+#!/usr/bin/env python
+# Copyright 2017-present Open Networking Foundation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from multistructlog import create_logger
+from xosconfig import Config
+from ansible.plugins.callback import CallbackBase
+from ansible.utils.display import Display
+from ansible.executor import playbook_executor
+from ansible.parsing.dataloader import DataLoader
+from ansible.vars.manager import VariableManager
+from ansible.inventory.manager import InventoryManager
+from tempfile import NamedTemporaryFile
+import os
+import sys
+import pdb
+import json
+import uuid
+from ansible import constants
+constants = reload(constants)
+log = create_logger(Config().get("logging"))
+class ResultCallback(CallbackBase):
+    CALLBACK_NAME = "resultcallback"
+    CALLBACK_TYPE = "programmatic"
+    def __init__(self):
+        super(ResultCallback, self).__init__()
+        self.results = []
+        self.uuid = str(uuid.uuid1())
+        self.playbook_status = "OK"
+    def v2_playbook_on_start(self, playbook):
+        self.playbook = playbook._file_name
+        log_extra = {
+            "xos_type": "ansible",
+            "ansible_uuid": self.uuid,
+            "ansible_type": "playbook start",
+            "ansible_status": "OK",
+            "ansible_playbook": self.playbook,
+        }
+"PLAYBOOK START", playbook=self.playbook, **log_extra)
+    def v2_playbook_on_stats(self, stats):
+        host_stats = {}
+        for host in stats.processed.keys():
+            host_stats[host] = stats.summarize(host)
+        log_extra = {
+            "xos_type": "ansible",
+            "ansible_uuid": self.uuid,
+            "ansible_type": "playbook stats",
+            "ansible_status": self.playbook_status,
+            "ansible_playbook": self.playbook,
+            "ansible_result": json.dumps(host_stats),
+        }
+        if self.playbook_status == "OK":
+  "PLAYBOOK END", playbook=self.playbook, **log_extra)
+        else:
+            log.error("PLAYBOOK END", playbook=self.playbook, **log_extra)
+    def v2_playbook_on_play_start(self, play):
+        log_extra = {
+            "xos_type": "ansible",
+            "ansible_uuid": self.uuid,
+            "ansible_type": "play start",
+            "ansible_status": self.playbook_status,
+            "ansible_playbook": self.playbook,
+        }
+        log.debug("PLAY START",, **log_extra)
+    def v2_runner_on_ok(self, result, **kwargs):
+        log_extra = {
+            "xos_type": "ansible",
+            "ansible_uuid": self.uuid,
+            "ansible_type": "task",
+            "ansible_status": "OK",
+            "ansible_result": json.dumps(result._result),
+            "ansible_task": result._task,
+            "ansible_playbook": self.playbook,
+            "ansible_host": result._host.get_name(),
+        }
+        log.debug("OK", task=str(result._task), **log_extra)
+        self.results.append(result)
+    def v2_runner_on_failed(self, result, **kwargs):
+        self.playbook_status = "FAILED"
+        log_extra = {
+            "xos_type": "ansible",
+            "ansible_uuid": self.uuid,
+            "ansible_type": "task",
+            "ansible_status": "FAILED",
+            "ansible_result": json.dumps(result._result),
+            "ansible_task": result._task,
+            "ansible_playbook": self.playbook,
+            "ansible_host": result._host.get_name(),
+        }
+        log.error("FAILED", task=str(result._task), **log_extra)
+        self.results.append(result)
+    def v2_runner_on_async_failed(self, result, **kwargs):
+        self.playbook_status = "FAILED"
+        log_extra = {
+            "xos_type": "ansible",
+            "ansible_uuid": self.uuid,
+            "ansible_type": "task",
+            "ansible_status": "ASYNC FAILED",
+            "ansible_result": json.dumps(result._result),
+            "ansible_task": result._task,
+            "ansible_playbook": self.playbook,
+            "ansible_host": result._host.get_name(),
+        }
+        log.error("ASYNC FAILED", task=str(result._task), **log_extra)
+    def v2_runner_on_skipped(self, result, **kwargs):
+        log_extra = {
+            "xos_type": "ansible",
+            "ansible_uuid": self.uuid,
+            "ansible_type": "task",
+            "ansible_status": "SKIPPED",
+            "ansible_result": json.dumps(result._result),
+            "ansible_task": result._task,
+            "ansible_playbook": self.playbook,
+            "ansible_host": result._host.get_name(),
+        }
+        log.debug("SKIPPED", task=str(result._task), **log_extra)
+        self.results.append(result)
+    def v2_runner_on_unreachable(self, result, **kwargs):
+        log_extra = {
+            "xos_type": "ansible",
+            "ansible_uuid": self.uuid,
+            "ansible_type": "task",
+            "ansible_status": "UNREACHABLE",
+            "ansible_result": json.dumps(result._result),
+            "ansible_task": result._task,
+            "ansible_playbook": self.playbook,
+            "ansible_host": result._host.get_name(),
+        }
+        log.error("UNREACHABLE", task=str(result._task), **log_extra)
+        self.results.append(result)
+    def v2_runner_retry(self, result, **kwargs):
+        log_extra = {
+            "xos_type": "ansible",
+            "ansible_uuid": self.uuid,
+            "ansible_type": "task",
+            "ansible_status": "RETRY",
+            "ansible_result": json.dumps(result._result),
+            "ansible_task": result._task,
+            "ansible_playbook": self.playbook,
+            "ansible_host": result._host.get_name(),
+        }
+        log.warning(
+            "RETRYING - attempt",
+            task=str(result._task),
+            attempt=result._result["attempts"],
+            **log_extra
+        )
+        self.results.append(result)
+    def v2_playbook_on_handler_task_start(self, task, **kwargs):
+        log_extra = {
+            "xos_type": "ansible",
+            "ansible_uuid": self.uuid,
+            "ansible_type": "task",
+            "ansible_status": "HANDLER",
+            "ansible_task": task.get_name().strip(),
+            "ansible_playbook": self.playbook,
+            # 'ansible_host': result._host.get_name()
+        }
+        log.debug("HANDLER", task=task.get_name().strip(), **log_extra)
+    def v2_playbook_on_import_for_host(self, result, imported_file):
+        log_extra = {
+            "xos_type": "ansible",
+            "ansible_uuid": self.uuid,
+            "ansible_type": "import",
+            "ansible_status": "IMPORT",
+            "ansible_result": json.dumps(result._result),
+            "ansible_playbook": self.playbook,
+            "ansible_host": result._host.get_name(),
+        }
+        log.debug("IMPORT", imported_file=imported_file, **log_extra)
+        self.results.append(result)
+    def v2_playbook_on_not_import_for_host(self, result, missing_file):
+        log_extra = {
+            "xos_type": "ansible",
+            "ansible_uuid": self.uuid,
+            "ansible_type": "import",
+            "ansible_status": "MISSING IMPORT",
+            "ansible_result": json.dumps(result._result),
+            "ansible_playbook": self.playbook,
+            "ansible_host": result._host.get_name(),
+        }
+        log.debug("MISSING IMPORT", missing=missing_file, **log_extra)
+        self.results.append(result)
+class Options(object):
+    """
+    Options class to replace Ansible OptParser
+    """
+    def __init__(
+        self,
+        ask_pass=None,
+        ask_su_pass=None,
+        ask_sudo_pass=None,
+        become=None,
+        become_ask_pass=None,
+        become_method=None,
+        become_user=None,
+        check=None,
+        connection=None,
+        diff=None,
+        flush_cache=None,
+        force_handlers=None,
+        forks=1,
+        listtags=None,
+        listtasks=None,
+        module_path=None,
+        new_vault_password_file=None,
+        one_line=None,
+        output_file=None,
+        poll_interval=None,
+        private_key_file=None,
+        remote_user=None,
+        scp_extra_args=None,
+        seconds=None,
+        sftp_extra_args=None,
+        skip_tags=None,
+        ssh_common_args=None,
+        ssh_extra_args=None,
+        sudo=None,
+        sudo_user=None,
+        syntax=None,
+        tags=None,
+        timeout=None,
+        tree=None,
+        vault_password_files=None,
+        ask_vault_pass=None,
+        extra_vars=None,
+        inventory=None,
+        listhosts=None,
+        module_paths=None,
+        subset=None,
+        verbosity=None,
+    ):
+        if tags:
+            self.tags = tags
+        if skip_tags:
+            self.skip_tags = skip_tags
+        self.ask_pass = ask_pass
+        self.ask_su_pass = ask_su_pass
+        self.ask_sudo_pass = ask_sudo_pass
+        self.ask_vault_pass = ask_vault_pass
+        self.become = become
+        self.become_ask_pass = become_ask_pass
+        self.become_method = become_method
+        self.become_user = become_user
+        self.check = check
+        self.connection = connection
+        self.diff = diff
+        self.extra_vars = extra_vars
+        self.flush_cache = flush_cache
+        self.force_handlers = force_handlers
+        self.forks = forks
+        self.inventory = inventory
+        self.listhosts = listhosts
+        self.listtags = listtags
+        self.listtasks = listtasks
+        self.module_path = module_path
+        self.module_paths = module_paths
+        self.new_vault_password_file = new_vault_password_file
+        self.one_line = one_line
+        self.output_file = output_file
+        self.poll_interval = poll_interval
+        self.private_key_file = private_key_file
+        self.remote_user = remote_user
+        self.scp_extra_args = scp_extra_args
+        self.seconds = seconds
+        self.sftp_extra_args = sftp_extra_args
+        self.ssh_common_args = ssh_common_args
+        self.ssh_extra_args = ssh_extra_args
+        self.subset = subset
+        self.sudo = sudo
+        self.sudo_user = sudo_user
+        self.syntax = syntax
+        self.timeout = timeout
+        self.tree = tree
+        self.vault_password_files = vault_password_files
+        self.verbosity = verbosity
+class Runner(object):
+    def __init__(
+        self, playbook, run_data, private_key_file=None, verbosity=0, host_file=None
+    ):
+        self.playbook = playbook
+        self.run_data = run_data
+        self.options = Options()
+        self.options.output_file = playbook + ".result"
+        self.options.private_key_file = private_key_file
+        self.options.verbosity = verbosity
+        self.options.connection = "ssh"  # Need a connection type "smart" or "ssh"
+        # self.options.become = True
+        self.options.become_method = "sudo"
+        self.options.become_user = "root"
+        # Set global verbosity
+        self.display = Display()
+        self.display.verbosity = self.options.verbosity
+        # Executor appears to have it's own
+        # verbosity object/setting as well
+        playbook_executor.verbosity = self.options.verbosity
+        # Become Pass Needed if not logging in as user root
+        # passwords = {'become_pass': become_pass}
+        # Gets data from YAML/JSON files
+        self.loader = DataLoader()
+        try:
+            self.loader.set_vault_password(os.environ["VAULT_PASS"])
+        except AttributeError:
+            pass
+        # Set inventory, using most of above objects
+        if host_file:
+            self.inventory = InventoryManager(loader=self.loader, sources=host_file)
+        else:
+            self.inventory = InventoryManager(loader=self.loader)
+        # All the variables from all the various places
+        self.variable_manager = VariableManager(
+            loader=self.loader, inventory=self.inventory
+        )
+        self.variable_manager.extra_vars = {}  # self.run_data
+        # Setup playbook executor, but don't run until run() called
+        self.pbex = playbook_executor.PlaybookExecutor(
+            playbooks=[playbook],
+            inventory=self.inventory,
+            variable_manager=self.variable_manager,
+            loader=self.loader,
+            options=self.options,
+            passwords={},
+        )
+    def run(self):
+        os.environ[
+            "REQUESTS_CA_BUNDLE"
+        ] = "/usr/local/share/ca-certificates/local_certs.crt"
+        callback = ResultCallback()
+        self.pbex._tqm._stdout_callback = callback
+        stats = self.pbex._tqm._stats
+        # os.remove(
+        return stats, callback.results
+# Copyright 2017-present Open Networking Foundation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from modelaccessor import ModelAccessor
+import datetime
+import time
+class CoreApiModelAccessor(ModelAccessor):
+    def __init__(self, orm):
+        self.orm = orm
+        super(CoreApiModelAccessor, self).__init__()
+    def get_all_model_classes(self):
+        all_model_classes = {}
+        for k in self.orm.all_model_names:
+            all_model_classes[k] = getattr(self.orm, k)
+        return all_model_classes
+    def fetch_pending(self, main_objs, deletion=False):
+        if not isinstance(main_objs, list):
+            main_objs = [main_objs]
+        objs = []
+        for main_obj in main_objs:
+            if not deletion:
+                lobjs = main_obj.objects.filter_special(
+                    main_obj.objects.SYNCHRONIZER_DIRTY_OBJECTS
+                )
+            else:
+                lobjs = main_obj.objects.filter_special(
+                    main_obj.objects.SYNCHRONIZER_DELETED_OBJECTS
+                )
+            objs.extend(lobjs)
+        return objs
+    def fetch_policies(self, main_objs, deletion=False):
+        if not isinstance(main_objs, list):
+            main_objs = [main_objs]
+        objs = []
+        for main_obj in main_objs:
+            if not deletion:
+                lobjs = main_obj.objects.filter_special(
+                    main_obj.objects.SYNCHRONIZER_DIRTY_POLICIES
+                )
+            else:
+                lobjs = main_obj.objects.filter_special(
+                    main_obj.objects.SYNCHRONIZER_DELETED_POLICIES
+                )
+            objs.extend(lobjs)
+        return objs
+    def obj_exists(self, o):
+        # gRPC will default id to '0' for uninitialized objects
+        return ( is not None) and ( != 0)
+    def obj_in_list(self, o, olist):
+        ids = [ for x in olist]
+        return in ids
+    def now(self):
+        """ Return the current time for timestamping purposes """
+        return (
+            datetime.datetime.utcnow() - datetime.datetime.fromtimestamp(0)
+        ).total_seconds()
+    def is_type(self, obj, name):
+        return obj._wrapped_class.__class__.__name__ == name
+    def is_instance(self, obj, name):
+        return name in obj.class_names.split(",")
+    def get_content_type_id(self, obj):
+        return obj.self_content_type_id
+    def create_obj(self, cls, **kwargs):
+        return**kwargs)
+# Copyright 2017-present Open Networking Foundation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from __future__ import print_function
+import os
+import inspect
+import imp
+import sys
+import threading
+import time
+from xossynchronizer.steps.syncstep import SyncStep
+from xossynchronizer.event_loop import XOSObserver
+from xossynchronizer.model_policy_loop import XOSPolicyEngine
+from xossynchronizer.event_engine import XOSEventEngine
+from xossynchronizer.pull_step_engine import XOSPullStepEngine
+from xossynchronizer.modelaccessor import *
+from xosconfig import Config
+from multistructlog import create_logger
+log = create_logger(Config().get("logging"))
+class Backend:
+    def __init__(self, log=log):
+        self.log = log
+    def load_sync_step_modules(self, step_dir):
+        sync_steps = []
+"Loading sync steps", step_dir=step_dir)
+        for fn in os.listdir(step_dir):
+            pathname = os.path.join(step_dir, fn)
+            if (
+                os.path.isfile(pathname)
+                and fn.endswith(".py")
+                and (fn != "")
+                and (not fn.startswith("test"))
+            ):
+                # we need to extend the path to load modules in the step_dir
+                sys_path_save = sys.path
+                sys.path.append(step_dir)
+                module = imp.load_source(fn[:-3], pathname)
+                self.log.debug("Loaded file: %s", pathname)
+                # reset the original path
+                sys.path = sys_path_save
+                for classname in dir(module):
+                    c = getattr(module, classname, None)
+                    # if classname.startswith("Sync"):
+                    #    print classname, c, inspect.isclass(c), issubclass(c, SyncStep), hasattr(c,"provides")
+                    # make sure 'c' is a descendent of SyncStep and has a
+                    # provides field (this eliminates the abstract base classes
+                    # since they don't have a provides)
+                    if inspect.isclass(c):
+                        bases = inspect.getmro(c)
+                        base_names = [b.__name__ for b in bases]
+                        if (
+                            ("SyncStep" in base_names)
+                            and (hasattr(c, "provides") or hasattr(c, "observes"))
+                            and (c not in sync_steps)
+                        ):
+                            sync_steps.append(c)
+"Loaded sync steps", steps=sync_steps)
+        return sync_steps
+    def run(self):
+        observer_thread = None
+        model_policy_thread = None
+        event_engine = None
+        steps_dir = Config.get("steps_dir")
+        if steps_dir:
+            sync_steps = []
+            # load sync_steps
+            if steps_dir:
+                sync_steps = self.load_sync_step_modules(steps_dir)
+            # if we have at least one sync_step
+            if len(sync_steps) > 0:
+                # start the observer
+      "Starting XOSObserver", sync_steps=sync_steps)
+                observer = XOSObserver(sync_steps, self.log)
+                observer_thread = threading.Thread(
+          , name="synchronizer"
+                )
+                observer_thread.start()
+        else:
+  "Skipping observer thread due to no steps dir.")
+        pull_steps_dir = Config.get("pull_steps_dir")
+        if pull_steps_dir:
+  "Starting XOSPullStepEngine", pull_steps_dir=pull_steps_dir)
+            pull_steps_engine = XOSPullStepEngine()
+            pull_steps_engine.load_pull_step_modules(pull_steps_dir)
+            pull_steps_thread = threading.Thread(
+                target=pull_steps_engine.start, name="pull_step_engine"
+            )
+            pull_steps_thread.start()
+        else:
+  "Skipping pull step engine due to no pull_steps_dir dir.")
+        event_steps_dir = Config.get("event_steps_dir")
+        if event_steps_dir:
+  "Starting XOSEventEngine", event_steps_dir=event_steps_dir)
+            event_engine = XOSEventEngine(self.log)
+            event_engine.load_event_step_modules(event_steps_dir)
+            event_engine.start()
+        else:
+  "Skipping event engine due to no event_steps dir.")
+        # start model policies thread
+        policies_dir = Config.get("model_policies_dir")
+        if policies_dir:
+            policy_engine = XOSPolicyEngine(policies_dir=policies_dir, log=self.log)
+            model_policy_thread = threading.Thread(
+      , name="policy_engine"
+            )
+            model_policy_thread.is_policy_thread = True
+            model_policy_thread.start()
+        else:
+                "Skipping model policies thread due to no model_policies dir."
+            )
+        if (not observer_thread) and (not model_policy_thread) and (not event_engine):
+                "No sync steps, no policies, and no event steps. Synchronizer exiting."
+            )
+            # the caller will exit with status 0
+            return
+        while True:
+            try:
+                time.sleep(1000)
+            except KeyboardInterrupt:
+                print("exiting due to keyboard interrupt")
+                # TODO: See about setting the threads as daemons
+                if observer_thread:
+                    observer_thread._Thread__stop()
+                if model_policy_thread:
+                    model_policy_thread._Thread__stop()
+                sys.exit(1)
+# Copyright 2017-present Open Networking Foundation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from __future__ import print_function
+import os
+import inspect
+import imp
+import sys
+import threading
+import time
+from syncstep import SyncStep
+from synchronizers.new_base.event_loop import XOSObserver
+from xosconfig import Config
+from multistructlog import create_logger
+log = create_logger(Config().get("logging"))
+class Backend:
+    def run(self):
+        # start model policies thread
+        policies_dir = Config("model_policies_dir")
+        if policies_dir:
+            from synchronizers.model_policy import run_policy
+            model_policy_thread = threading.Thread(target=run_policy)
+            model_policy_thread.start()
+        else:
+            model_policy_thread = None
+  "Skipping model policies thread due to no model_policies dir.")
+        while True:
+            try:
+                time.sleep(1000)
+            except KeyboardInterrupt:
+                print("exiting due to keyboard interrupt")
+                if model_policy_thread:
+                    model_policy_thread._Thread__stop()
+                sys.exit(1)
+# Copyright 2017-present Open Networking Foundation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#!/usr/bin/env python
+# TODO: Moved this into the synchronizer, as it appeared to require model
+#       access. Verify whether or not that's true and reconcile with
+#       generate/
+from __future__ import print_function
+import os
+import imp
+import inspect
+import time
+import traceback
+import commands
+import threading
+from xosconfig import Config
+import json
+from xosconfig import Config
+from multistructlog import create_logger
+log = create_logger(Config().get("logging"))
+missing_links = {}
+if Config.get("dependency_graph"):
+    dep_data = open(Config.get("dependency_graph")).read()
+    dep_data = "{}"
+dependencies = json.loads(dep_data)
+dependencies = {k: [item[0] for item in items] for k, items in dependencies.items()}
+inv_dependencies = {}
+for k, lst in dependencies.items():
+    for v in lst:
+        try:
+            inv_dependencies[v].append(k)
+        except KeyError:
+            inv_dependencies[v] = [k]
+def plural(name):
+    if name.endswith("s"):
+        return name + "es"
+    else:
+        return name + "s"
+def walk_deps(fn, object):
+    model = object.__class__.__name__
+    try:
+        deps = dependencies[model]
+    except BaseException:
+        deps = []
+    return __walk_deps(fn, object, deps)
+def walk_inv_deps(fn, object):
+    model = object.__class__.__name__
+    try:
+        deps = inv_dependencies[model]
+    except BaseException:
+        deps = []
+    return __walk_deps(fn, object, deps)
+def __walk_deps(fn, object, deps):
+    model = object.__class__.__name__
+    ret = []
+    for dep in deps:
+        # print "Checking dep %s"%dep
+        peer = None
+        link = dep.lower()
+        try:
+            peer = getattr(object, link)
+        except AttributeError:
+            link = plural(link)
+            try:
+                peer = getattr(object, link)
+            except AttributeError:
+                if model + "." + link not in missing_links:
+                    print("Model %s missing link for dependency %s" % (model, link))
+                    log.exception(
+                        "WARNING: Model missing link for dependency.",
+                        model=model,
+                        link=link,
+                    )
+                    missing_links[model + "." + link] = True
+        if peer:
+            try:
+                peer_objects = peer.all()
+            except AttributeError:
+                peer_objects = [peer]
+            except BaseException:
+                peer_objects = []
+            for o in peer_objects:
+                if hasattr(o, "updated"):
+                    fn(o, object)
+                    ret.append(o)
+                # Uncomment the following line to enable recursion
+                # walk_inv_deps(fn, o)
+    return ret
+# Copyright 2017-present Open Networking Foundation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import confluent_kafka
+import imp
+import inspect
+import os
+import threading
+import time
+from xosconfig import Config
+class XOSKafkaMessage:
+    def __init__(self, consumer_msg):
+        self.topic = consumer_msg.topic()
+        self.key = consumer_msg.key()
+        self.value = consumer_msg.value()
+        self.timestamp = None
+        (ts_type, ts_val) = consumer_msg.timestamp()
+        if ts_type is not confluent_kafka.TIMESTAMP_NOT_AVAILABLE:
+            self.timestamp = ts_val
+class XOSKafkaThread(threading.Thread):
+    """ XOSKafkaThread
+        A Thread for servicing Kafka events. There is one event_step associated with one XOSKafkaThread. A
+        Consumer is launched to listen on the topics specified by the thread. The thread's process_event()
+        function is called for each event.
+    """
+    def __init__(self, step, bootstrap_servers, log, *args, **kwargs):
+        super(XOSKafkaThread, self).__init__(*args, **kwargs)
+        self.consumer = None
+        self.step = step
+        self.bootstrap_servers = bootstrap_servers
+        self.log = log
+        self.daemon = True
+    def create_kafka_consumer(self):
+        # use the service name as the group id
+        consumer_config = {
+            "": Config().get("name"),
+            "bootstrap.servers": ",".join(self.bootstrap_servers),
+            "default.topic.config": {"auto.offset.reset": "smallest"},
+        }
+        return confluent_kafka.Consumer(**consumer_config)
+    def run(self):
+        if (not self.step.topics) and (not self.step.pattern):
+            raise Exception(
+                "Neither topics nor pattern is defined for step %s" % self.step.__name__
+            )
+        if self.step.topics and self.step.pattern:
+            raise Exception(
+                "Both topics and pattern are defined for step %s. Choose one."
+                % self.step.__name__
+            )
+            "Waiting for events",
+            topic=self.step.topics,
+            pattern=self.step.pattern,
+            step=self.step.__name__,
+        )
+        while True:
+            try:
+                # setup consumer or loop on failure
+                if self.consumer is None:
+                    self.consumer = self.create_kafka_consumer()
+                    if self.step.topics:
+                        self.consumer.subscribe(self.step.topics)
+                    elif self.step.pattern:
+                        self.consumer.subscribe(self.step.pattern)
+            except confluent_kafka.KafkaError._ALL_BROKERS_DOWN as e:
+                self.log.warning(
+                    "No brokers available on %s, %s" % (self.bootstrap_servers, e)
+                )
+                time.sleep(20)
+                continue
+            except confluent_kafka.KafkaError as e:
+                # Maybe Kafka has not started yet. Log the exception and try again in a second.
+                self.log.exception("Exception in kafka loop: %s" % e)
+                time.sleep(1)
+                continue
+            # wait until we get a message, if no message, loop again
+            msg = self.consumer.poll(timeout=1.0)
+            if msg is None:
+                continue
+            if msg.error():
+                if msg.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:
+                    self.log.debug(
+                        "Reached end of kafka topic %s, partition: %s, offset: %d"
+                        % (msg.topic(), msg.partition(), msg.offset())
+                    )
+                else:
+                    self.log.exception("Error in kafka message: %s" % msg.error())
+            else:
+                # wrap parsing the event in a class
+                event_msg = XOSKafkaMessage(msg)
+                    "Processing event", event_msg=event_msg, step=self.step.__name__
+                )
+                try:
+                    self.step(log=self.log).process_event(event_msg)
+                except BaseException:
+                    self.log.exception(
+                        "Exception in event step",
+                        event_msg=event_msg,
+                        step=self.step.__name__,
+                    )
+class XOSEventEngine(object):
+    """ XOSEventEngine
+        Subscribe to and handle processing of events. Two methods are defined:
+            load_step_modules(dir) ... look for step modules in the given directory, and load objects that are
+                                       descendant from EventStep.
+            start() ... Launch threads to handle processing of the EventSteps. It's expected that load_step_modules()
+                        will be called before start().
+    """
+    def __init__(self, log):
+        self.event_steps = []
+        self.threads = []
+        self.log = log
+    def load_event_step_modules(self, event_step_dir):
+        self.event_steps = []
+"Loading event steps", event_step_dir=event_step_dir)
+        # NOTE we'll load all the classes that inherit from EventStep
+        for fn in os.listdir(event_step_dir):
+            pathname = os.path.join(event_step_dir, fn)
+            if (
+                os.path.isfile(pathname)
+                and fn.endswith(".py")
+                and (fn != "")
+                and ("test" not in fn)
+            ):
+                event_module = imp.load_source(fn[:-3], pathname)
+                for classname in dir(event_module):
+                    c = getattr(event_module, classname, None)
+                    if inspect.isclass(c):
+                        base_names = [b.__name__ for b in c.__bases__]
+                        if "EventStep" in base_names:
+                            self.event_steps.append(c)
+"Loaded event steps", steps=self.event_steps)
+    def start(self):
+        eventbus_kind = Config.get("event_bus.kind")
+        eventbus_endpoint = Config.get("event_bus.endpoint")
+        if not eventbus_kind:
+            self.log.error(
+                "Eventbus kind is not configured in synchronizer config file."
+            )
+            return
+        if eventbus_kind not in ["kafka"]:
+            self.log.error(
+                "Eventbus kind is set to a technology we do not implement.",
+                eventbus_kind=eventbus_kind,
+            )
+            return
+        if not eventbus_endpoint:
+            self.log.error(
+                "Eventbus endpoint is not configured in synchronizer config file."
+            )
+            return
+        for step in self.event_steps:
+            if == "kafka":
+                thread = XOSKafkaThread(step, [eventbus_endpoint], self.log)
+                thread.start()
+                self.threads.append(thread)
+            else:
+                self.log.error(
+                    "Unknown technology. Skipping step",
+          ,
+                    step=step.__name__,
+                )
+# Copyright 2017-present Open Networking Foundation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# TODO:
+# Add unit tests:
+# - 2 sets of Instance, ControllerSlice, ControllerNetworks - delete and create case
+import time
+import threading
+import json
+from collections import defaultdict
+from networkx import (
+    DiGraph,
+    weakly_connected_component_subgraphs,
+    all_shortest_paths,
+    NetworkXNoPath,
+from networkx.algorithms.dag import topological_sort
+from xossynchronizer.steps.syncstep import InnocuousException, DeferredException, SyncStep
+from xossynchronizer.modelaccessor import *
+from xosconfig import Config
+from multistructlog import create_logger
+log = create_logger(Config().get("logging"))
+class StepNotReady(Exception):
+    pass
+class ExternalDependencyFailed(Exception):
+    pass
+# FIXME: Move drivers into a context shared across sync steps.
+class NoOpDriver:
+    def __init__(self):
+        self.enabled = True
+        self.dependency_graph = None
+# Everyone gets NoOpDriver by default. To use a different driver, call
+# set_driver() below.
+DRIVER = NoOpDriver()
+def set_driver(x):
+    global DRIVER
+    DRIVER = x
+class XOSObserver(object):
+    sync_steps = []
+    def __init__(self, sync_steps, log=log):
+        # The Condition object via which events are received
+        self.log = log
+        self.step_lookup = {}
+        self.sync_steps = sync_steps
+        self.load_sync_steps()
+        self.load_dependency_graph()
+        self.event_cond = threading.Condition()
+        self.driver = DRIVER
+        self.observer_name = Config.get("name")
+    def wait_for_event(self, timeout):
+        self.event_cond.acquire()
+        self.event_cond.wait(timeout)
+        self.event_cond.release()
+    def wake_up(self):
+        self.log.debug("Wake up routine called")
+        self.event_cond.acquire()
+        self.event_cond.notify()
+        self.event_cond.release()
+    def load_dependency_graph(self):
+        try:
+            if Config.get("dependency_graph"):
+                self.log.trace(
+                    "Loading model dependency graph",
+                    path=Config.get("dependency_graph"),
+                )
+                dep_graph_str = open(Config.get("dependency_graph")).read()
+            else:
+                self.log.trace("Using default model dependency graph", graph={})
+                dep_graph_str = "{}"
+            # joint_dependencies is of the form { Model1 -> [(Model2, src_port, dst_port), ...] }
+            # src_port is the field that accesses Model2 from Model1
+            # dst_port is the field that accesses Model1 from Model2
+            static_dependencies = json.loads(dep_graph_str)
+            dynamic_dependencies = self.compute_service_dependencies()
+            joint_dependencies = dict(
+                static_dependencies.items() + dynamic_dependencies
+            )
+            model_dependency_graph = DiGraph()
+            for src_model, deps in joint_dependencies.items():
+                for dep in deps:
+                    dst_model, src_accessor, dst_accessor = dep
+                    if src_model != dst_model:
+                        edge_label = {
+                            "src_accessor": src_accessor,
+                            "dst_accessor": dst_accessor,
+                        }
+                        model_dependency_graph.add_edge(
+                            src_model, dst_model, edge_label
+                        )
+            model_dependency_graph_rev = model_dependency_graph.reverse(copy=True)
+            self.model_dependency_graph = {
+                # deletion
+                True: model_dependency_graph_rev,
+                False: model_dependency_graph,
+            }
+            self.log.trace("Loaded dependencies", edges=model_dependency_graph.edges())
+        except Exception as e:
+            self.log.exception("Error loading dependency graph", e=e)
+            raise e
+    def load_sync_steps(self):
+        model_to_step = defaultdict(list)
+        external_dependencies = []
+        for s in self.sync_steps:
+            if not isinstance(s.observes, list):
+                observes = [s.observes]
+            else:
+                observes = s.observes
+            for m in observes:
+                model_to_step[m.__name__].append(s.__name__)
+            try:
+                external_dependencies.extend(s.external_dependencies)
+            except AttributeError:
+                pass
+            self.step_lookup[s.__name__] = s
+        self.model_to_step = model_to_step
+        self.external_dependencies = list(set(external_dependencies))
+            "Loaded external dependencies", external_dependencies=external_dependencies
+        )
+"Loaded model_map", **model_to_step)
+    def reset_model_accessor(self, o=None):
+        try:
+            model_accessor.reset_queries()
+        except BaseException:
+            # this shouldn't happen, but in case it does, catch it...
+            if o:
+                logdict = o.tologdict()
+            else:
+                logdict = {}
+            self.log.error("exception in reset_queries", **logdict)
+    def delete_record(self, o, dr_log=None):
+        if dr_log is None:
+            dr_log = self.log
+        if getattr(o, "backend_need_reap", False):
+            # the object has already been deleted and marked for reaping
+            model_accessor.journal_object(o, "")
+        else:
+            step = getattr(o, "synchronizer_step", None)
+            if not step:
+                raise ExternalDependencyFailed
+            model_accessor.journal_object(o, "")
+            dr_log.debug("Deleting object", **o.tologdict())
+            step.log =
+            step.delete_record(o)
+            step.log = dr_log
+            dr_log.debug("Deleted object", **o.tologdict())
+            model_accessor.journal_object(o, "")
+            o.backend_need_reap = True
+  ["backend_need_reap"])
+    def sync_record(self, o, sr_log=None):
+        try:
+            step = o.synchronizer_step
+        except AttributeError:
+            step = None
+        if step is None:
+            raise ExternalDependencyFailed
+        if sr_log is None:
+            sr_log = self.log
+        # Mark this as an object that will require delete. Do
+        # this now rather than after the syncstep,
+        if not (o.backend_need_delete):
+            o.backend_need_delete = True
+  ["backend_need_delete"])
+        model_accessor.journal_object(o, "")
+        sr_log.debug("Syncing object", **o.tologdict())
+        step.log =
+        step.sync_record(o)
+        step.log = sr_log
+        sr_log.debug("Synced object", **o.tologdict())
+        o.enacted = max(o.updated, o.changed_by_policy)
+        scratchpad = {"next_run": 0, "exponent": 0, "last_success": time.time()}
+        o.backend_register = json.dumps(scratchpad)
+        o.backend_status = "OK"
+        o.backend_code = 1
+        model_accessor.journal_object(o, "")
+            update_fields=[
+                "enacted",
+                "backend_status",
+                "backend_register",
+                "backend_code",
+            ]
+        )
+        if hasattr(step, "after_sync_save"):
+            step.log =
+            step.after_sync_save(o)
+            step.log = sr_log
+"Saved sync object", o=o)
+    """ This function needs a cleanup. FIXME: Rethink backend_status, backend_register """
+    def handle_sync_exception(self, o, e):
+        self.log.exception("sync step failed!", e=e, **o.tologdict())
+        current_code = o.backend_code
+        if hasattr(e, "message"):
+            status = str(e.message)
+        else:
+            status = str(e)
+        if isinstance(e, InnocuousException):
+            code = 1
+        elif isinstance(e, DeferredException):
+            # NOTE if the synchronization is Deferred it means that synchronization is still in progress
+            code = 0
+        else:
+            code = 2
+        self.set_object_error(o, status, code)
+        dependency_error = "Failed due to error in model %s id %d: %s" % (
+            o.leaf_model_name,
+  ,
+            status,
+        )
+        return dependency_error, code
+    def set_object_error(self, o, status, code):
+        if o.backend_status:
+            error_list = o.backend_status.split(" // ")
+        else:
+            error_list = []
+        if status not in error_list:
+            error_list.append(status)
+        # Keep last two errors
+        error_list = error_list[-2:]
+        o.backend_code = code
+        o.backend_status = " // ".join(error_list)
+        try:
+            scratchpad = json.loads(o.backend_register)
+            scratchpad["exponent"]
+        except BaseException:
+            scratchpad = {
+                "next_run": 0,
+                "exponent": 0,
+                "last_success": time.time(),
+                "failures": 0,
+            }
+        # Second failure
+        if scratchpad["exponent"]:
+            if code == 1:
+                delay = scratchpad["exponent"] * 60  # 1 minute
+            else:
+                delay = scratchpad["exponent"] * 600  # 10 minutes
+            # cap delays at 8 hours
+            if delay > 8 * 60 * 60:
+                delay = 8 * 60 * 60
+            scratchpad["next_run"] = time.time() + delay
+        scratchpad["exponent"] += 1
+        try:
+            scratchpad["failures"] += 1
+        except KeyError:
+            scratchpad["failures"] = 1
+        scratchpad["last_failure"] = time.time()
+        o.backend_register = json.dumps(scratchpad)
+        # TOFIX:
+        # DatabaseError: value too long for type character varying(140)
+        if model_accessor.obj_exists(o):
+            try:
+                o.backend_status = o.backend_status[:1024]
+                    update_fields=["backend_status", "backend_register"],
+                    always_update_timestamp=True,
+                )
+            except BaseException as e:
+                self.log.exception("Could not update backend status field!", e=e)
+                pass
+    def sync_cohort(self, cohort, deletion):
+        threading.current_thread().is_sync_thread = True
+        sc_log =
+        try:
+            start_time = time.time()
+            sc_log.debug("Starting to work on cohort", cohort=cohort, deletion=deletion)
+            cohort_emptied = False
+            dependency_error = None
+            dependency_error_code = None
+            itty = iter(cohort)
+            while not cohort_emptied:
+                try:
+                    self.reset_model_accessor()
+                    o = next(itty)
+                    if dependency_error:
+                        self.set_object_error(
+                            o, dependency_error, dependency_error_code
+                        )
+                        continue
+                    try:
+                        if deletion:
+                            self.delete_record(o, sc_log)
+                        else:
+                            self.sync_record(o, sc_log)
+                    except ExternalDependencyFailed:
+                        dependency_error = (
+                            "External dependency on object %s id %d not met"
+                            % (o.leaf_model_name,
+                        )
+                        dependency_error_code = 1
+                    except (DeferredException, InnocuousException, Exception) as e:
+                        dependency_error, dependency_error_code = self.handle_sync_exception(
+                            o, e
+                        )
+                except StopIteration:
+                    sc_log.debug("Cohort completed", cohort=cohort, deletion=deletion)
+                    cohort_emptied = True
+        finally:
+            self.reset_model_accessor()
+            model_accessor.connection_close()
+    def tenant_class_name_from_service(self, service_name):
+        """ This code supports legacy functionality. To be cleaned up. """
+        name1 = service_name + "Instance"
+        if hasattr(Slice().stub, name1):
+            return name1
+        else:
+            name2 = service_name.replace("Service", "Tenant")
+            if hasattr(Slice().stub, name2):
+                return name2
+            else:
+                return None
+    def compute_service_dependencies(self):
+        """ FIXME: Implement more cleanly via xproto """
+        model_names = self.model_to_step.keys()
+        ugly_tuples = [
+            (m, m.replace("Instance", "").replace("Tenant", "Service"))
+            for m in model_names
+            if m.endswith("ServiceInstance") or m.endswith("Tenant")
+        ]
+        ugly_rtuples = [(v, k) for k, v in ugly_tuples]
+        ugly_map = dict(ugly_tuples)
+        ugly_rmap = dict(ugly_rtuples)
+        s_model_names = [v for k, v in ugly_tuples]
+        s_models0 = [
+            getattr(Slice().stub, model_name, None) for model_name in s_model_names
+        ]
+        s_models1 = [model.objects.first() for model in s_models0]
+        s_models = [m for m in s_models1 if m is not None]
+        dependencies = []
+        for model in s_models:
+            deps = ServiceDependency.objects.filter(
+            if deps:
+                services = [
+                    self.tenant_class_name_from_service(
+                        d.provider_service.leaf_model_name
+                    )
+                    for d in deps
+                ]
+                dependencies.append(
+                    (ugly_rmap[model.leaf_model_name], [(s, "", "") for s in services])
+                )
+        return dependencies
+    def compute_service_instance_dependencies(self, objects):
+        link_set = [
+            ServiceInstanceLink.objects.filter(
+            for o in objects
+        ]
+        dependencies = [
+            (l.provider_service_instance, l.subscriber_service_instance)
+            for links in link_set
+            for l in links
+        ]
+        providers = []
+        for p, s in dependencies:
+            if not p.enacted or p.enacted < p.updated:
+                p.dependent = s
+                providers.append(p)
+        return providers
+    def run(self):
+        # Cleanup: Move self.driver into a synchronizer context
+        # made available to every sync step.
+        if not self.driver.enabled:
+            self.log.warning("Driver is not enabled. Not running sync steps.")
+            return
+        while True:
+            self.log.trace("Waiting for event or timeout")
+            self.wait_for_event(timeout=5)
+            self.log.trace("Synchronizer awake")
+            self.run_once()
+    def fetch_pending(self, deletion=False):
+        unique_model_list = list(set(self.model_to_step.keys()))
+        pending_objects = []
+        pending_steps = []
+        step_list = self.step_lookup.values()
+        for e in self.external_dependencies:
+            s = SyncStep
+            s.observes = e
+            step_list.append(s)
+        for step_class in step_list:
+            step = step_class(driver=self.driver)
+            step.log =
+            if not hasattr(step, "call"):
+                pending = step.fetch_pending(deletion)
+                for obj in pending:
+                    step = step_class(driver=self.driver)
+                    step.log =
+                    obj.synchronizer_step = step
+                pending_service_dependencies = self.compute_service_instance_dependencies(
+                    pending
+                )
+                for obj in pending_service_dependencies:
+                    obj.synchronizer_step = None
+                pending_objects.extend(pending)
+                pending_objects.extend(pending_service_dependencies)
+            else:
+                # Support old and broken legacy synchronizers
+                # This needs to be dropped soon.
+                pending_steps.append(step)
+        self.log.trace(
+            "Fetched pending data",
+            pending_objects=pending_objects,
+            legacy_steps=pending_steps,
+        )
+        return pending_objects, pending_steps
+    def linked_objects(self, o):
+        if o is None:
+            return [], None
+        try:
+            o_lst = [o for o in o.all()]
+            edge_type = PROXY_EDGE
+        except (AttributeError, TypeError):
+            o_lst = [o]
+            edge_type = DIRECT_EDGE
+        return o_lst, edge_type
+    """ Automatically test if a real dependency path exists between two objects. e.g.
+        given an Instance, and a ControllerSite, the test amounts to:
+   ==
+        Then the two objects are related, and should be put in the same cohort.
+        If the models of the two objects are not dependent, then the check trivially
+        returns False.
+    """
+    def same_object(self, o1, o2):
+        if not o1 or not o2:
+            return False, None
+        o1_lst, edge_type = self.linked_objects(o1)
+        try:
+            found = next(
+                obj
+                for obj in o1_lst
+                if obj.leaf_model_name == o2.leaf_model_name and ==
+            )
+        except AttributeError as e:
+            self.log.exception("Compared objects could not be identified", e=e)
+            raise e
+        except StopIteration:
+            # This is a temporary workaround to establish dependencies between
+            # deleted proxy objects. A better solution would be for the ORM to
+            # return the set of deleted objects via foreign keys. At that point,
+            # the following line would change back to found = False
+            # - Sapan
+            found = getattr(o2, "deleted", False)
+        return found, edge_type
+    def concrete_path_exists(self, o1, o2):
+        try:
+            m1 = o1.leaf_model_name
+            m2 = o2.leaf_model_name
+        except AttributeError:
+            # One of the nodes is not in the dependency graph
+            # No dependency
+            return False, None
+        if m1.endswith("ServiceInstance") and m2.endswith("ServiceInstance"):
+            return getattr(o2, "dependent", None) == o1, DIRECT_EDGE
+        # FIXME: Dynamic dependency check
+        G = self.model_dependency_graph[False]
+        paths = all_shortest_paths(G, m1, m2)
+        try:
+            any(paths)
+            paths = all_shortest_paths(G, m1, m2)
+        except NetworkXNoPath:
+            # Easy. The two models are unrelated.
+            return False, None
+        for p in paths:
+            src_object = o1
+            edge_type = DIRECT_EDGE
+            for i in range(len(p) - 1):
+                src = p[i]
+                dst = p[i + 1]
+                edge_label = G[src][dst]
+                sa = edge_label["src_accessor"]
+                try:
+                    dst_accessor = getattr(src_object, sa)
+                    dst_objects, link_edge_type = self.linked_objects(dst_accessor)
+                    if link_edge_type == PROXY_EDGE:
+                        edge_type = link_edge_type
+                    """
+                    True       			If no linked objects and deletion
+                    False      			If no linked objects
+                    True       			If multiple linked objects
+                    <continue traversal> 	If single linked object
+                    """
+                    if dst_objects == []:
+                        # Workaround for ORM not returning linked deleted
+                        # objects
+                        if o2.deleted:
+                            return True, edge_type
+                        else:
+                            dst_object = None
+                    elif len(dst_objects) > 1:
+                        # Multiple linked objects. Assume anything could be among those multiple objects.
+                        raise AttributeError
+                    else:
+                        dst_object = dst_objects[0]
+                except AttributeError as e:
+                    if sa != "fake_accessor":
+                        self.log.debug(
+                            "Could not check object dependencies, making conservative choice %s",
+                            e,
+                            src_object=src_object,
+                            sa=sa,
+                            o1=o1,
+                            o2=o2,
+                        )
+                    return True, edge_type
+                src_object = dst_object
+                if not src_object:
+                    break
+            verdict, edge_type = self.same_object(src_object, o2)
+            if verdict:
+                return verdict, edge_type
+            # Otherwise try other paths
+        return False, None
+    """
+    This function implements the main scheduling logic
+    of the Synchronizer. It divides incoming work (dirty objects)
+    into cohorts of dependent objects, and runs each such cohort
+    in its own thread.
+    Future work:
+    * Run event thread in parallel to the scheduling thread, and
+      add incoming objects to existing cohorts. Doing so should
+      greatly improve synchronizer performance.
+    * A single object might need to be added to multiple cohorts.
+      In this case, the last cohort handles such an object.
+    * This algorithm is horizontal-scale-ready. Multiple synchronizers
+      could run off a shared runqueue of cohorts.
+    """
+    def compute_dependent_cohorts(self, objects, deletion):
+        model_map = defaultdict(list)
+        n = len(objects)
+        r = range(n)
+        indexed_objects = zip(r, objects)
+        oG = DiGraph()
+        for i in r:
+            oG.add_node(i)
+        try:
+            for i0 in range(n):
+                for i1 in range(n):
+                    if i0 != i1:
+                        if deletion:
+                            path_args = (objects[i1], objects[i0])
+                        else:
+                            path_args = (objects[i0], objects[i1])
+                        is_connected, edge_type = self.concrete_path_exists(*path_args)
+                        if is_connected:
+                            try:
+                                edge_type = oG[i1][i0]["type"]
+                                if edge_type == PROXY_EDGE:
+                                    oG.remove_edge(i1, i0)
+                                    oG.add_edge(i0, i1, {"type": edge_type})
+                            except KeyError:
+                                oG.add_edge(i0, i1, {"type": edge_type})
+        except KeyError:
+            pass
+        components = weakly_connected_component_subgraphs(oG)
+        cohort_indexes = [reversed(topological_sort(g)) for g in components]
+        cohorts = [
+            [objects[i] for i in cohort_index] for cohort_index in cohort_indexes
+        ]
+        return cohorts
+    def run_once(self):
+        self.load_dependency_graph()
+        try:
+            # Why are we checking the DB connection here?
+            model_accessor.check_db_connection_okay()
+            loop_start = time.time()
+            # Two passes. One for sync, the other for deletion.
+            for deletion in (False, True):
+                objects_to_process = []
+                objects_to_process, steps_to_process = self.fetch_pending(deletion)
+                dependent_cohorts = self.compute_dependent_cohorts(
+                    objects_to_process, deletion
+                )
+                threads = []
+                self.log.trace("In run once inner loop", deletion=deletion)
+                for cohort in dependent_cohorts:
+                    thread = threading.Thread(
+                        target=self.sync_cohort,
+                        name="synchronizer",
+                        args=(cohort, deletion),
+                    )
+                    threads.append(thread)
+                # Start threads
+                for t in threads:
+                    t.start()
+                self.reset_model_accessor()
+                # Wait for all threads to finish before continuing with the run
+                # loop
+                for t in threads:
+                    t.join()
+                # Run legacy synchronizers, which do everything in call()
+                for step in steps_to_process:
+                    try:
+                    except Exception as e:
+                        self.log.exception("Legacy step failed", step=step, e=e)
+            loop_end = time.time()
+        except Exception as e:
+            self.log.exception(
+                "Core error. This seems like a misconfiguration or bug. This error will not be relayed to the user!",
+                e=e,
+            )
+            self.log.error("Exception in observer run loop")
+# Copyright 2017-present Open Networking Foundation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# Copyright 2017-present Open Networking Foundation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+class EventStep(object):
+    """
+    All the event steps defined in each synchronizer needs to inherit from this class in order to be loaded
+    Each step should define a technology, and either a `topics` or a `pattern`. The meaning of `topics` and `pattern`
+    depend on the technology that is chosen.
+    """
+    technology = "kafka"
+    topics = []
+    pattern = None
+    def __init__(self, log, **kwargs):
+        """
+        Initialize a pull step. Override this function to include any initialization. Make sure to call the original
+        __init__() from your method.
+        """
+        # self.log can be used to emit logging messages.
+        self.log = log
+    def process_event(self, event):
+        # This method must be overridden in your class. Do not call the original method.
+        self.log.warning(
+            "There is no default process_event, please provide a process_event method",
+            msg=event,
+        )
+# Copyright 2017-present Open Networking Foundation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+class SynchronizerException(Exception):
+    pass
+class SynchronizerProgrammingError(SynchronizerException):
+    pass
+class SynchronizerConfigurationError(SynchronizerException):
+    pass
+# Copyright 2017-present Open Networking Foundation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import os
+from xosconfig import Config
+from multistructlog import create_logger
+log = create_logger(Config().get("logging"))
+class ModelLoadClient(object):
+    def __init__(self, api):
+        self.api = api
+    def upload_models(self, name, dir, version="unknown"):
+        request = self.api.dynamicload_pb2.LoadModelsRequest(name=name, version=version)
+        for fn in os.listdir(dir):
+            if fn.endswith(".xproto"):
+                item = request.xprotos.add()
+                item.filename = fn
+                item.contents = open(os.path.join(dir, fn)).read()
+        models_fn = os.path.join(dir, "")
+        if os.path.exists(models_fn):
+            item = request.decls.add()
+            item.filename = ""
+            item.contents = open(models_fn).read()
+        attic_dir = os.path.join(dir, "attic")
+        if os.path.exists(attic_dir):
+            log.warn(
+                "Attics are deprecated, please use the legacy=True option in xProto"
+            )
+            for fn in os.listdir(attic_dir):
+                if fn.endswith(".py"):
+                    item = request.attics.add()
+                    item.filename = fn
+                    item.contents = open(os.path.join(attic_dir, fn)).read()
+        api_convenience_dir = os.path.join(dir, "convenience")
+        if os.path.exists(api_convenience_dir):
+            for fn in os.listdir(api_convenience_dir):
+                if fn.endswith(".py") and "test" not in fn:
+                    item = request.convenience_methods.add()
+                    item.filename = fn
+                    item.contents = open(os.path.join(api_convenience_dir, fn)).read()
+        result = self.api.dynamicload.LoadModels(request)
+# Copyright 2017-present Open Networking Foundation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import os
+import cPickle
+import subprocess
+    Support for autogenerating mock_modelaccessor.
+    Each unit test might have its own requirements for the set of xprotos that make
+    up its model testing framework. These should always include the core, and
+    optionally include one or more services.
+def build_mock_modelaccessor(
+    dest_dir, xos_dir, services_dir, service_xprotos, target="mock_classes.xtarget"
+    dest_fn = os.path.join(dest_dir, "")
+    args = ["xosgenx", "--target", target]
+    args.append(os.path.join(xos_dir, "core/models/core.xproto"))
+    for xproto in service_xprotos:
+        args.append(os.path.join(services_dir, xproto))
+    # Check to see if we've already run xosgenx. If so, don't run it again.
+    context_fn = dest_fn + ".context"
+    this_context = (xos_dir, services_dir, service_xprotos, target)
+    need_xosgenx = True
+    if os.path.exists(context_fn):
+        try:
+            context = cPickle.loads(open(context_fn).read())
+            if context == this_context:
+                return
+        except (cPickle.UnpicklingError, EOFError):
+            # Something went wrong with the file read or depickling
+            pass
+    if os.path.exists(context_fn):
+        os.remove(context_fn)
+    if os.path.exists(dest_fn):
+        os.remove(dest_fn)
+    p = subprocess.Popen(
+        " ".join(args) + " > " + dest_fn,
+        shell=True,
+        stdout=subprocess.PIPE,
+        stderr=subprocess.STDOUT,
+    )
+    (stdoutdata, stderrdata) = p.communicate()
+    if (p.returncode != 0) or (not os.path.exists(dest_fn)):
+        raise Exception(
+            "Failed to create mock model accessor, returncode=%d, stdout=%s"
+            % (p.returncode, stdoutdata)
+        )
+    # Save the context of this invocation of xosgenx
+    open(context_fn, "w").write(cPickle.dumps(this_context))
+# Copyright 2017-present Open Networking Foundation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# Copyright 2017-present Open Networking Foundation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from xossynchronizer.modelaccessor import *
+from xossynchronizer.model_policies.policy import Policy
+from xossynchronizer.exceptions import *
+class Scheduler(object):
+    # XOS Scheduler Abstract Base Class
+    # Used to implement schedulers that pick which node to put instances on
+    def __init__(self, slice, label=None, constrain_by_service_instance=False):
+        self.slice = slice
+        self.label = label  # Only pick nodes with this label
+        # Apply service-instance-based constraints
+        self.constrain_by_service_instance = constrain_by_service_instance
+    def pick(self):
+        # this method should return a tuple (node, parent)
+        #    node is the node to instantiate on
+        #    parent is for container_vm instances only, and is the VM that will
+        #      hold the container
+        raise Exception("Abstract Base")
+class LeastLoadedNodeScheduler(Scheduler):
+    # This scheduler always return the node with the fewest number of
+    # instances.
+    def pick(self):
+        set_label = False
+        nodes = []
+        if self.label:
+            nodes = Node.objects.filter(nodelabels__name=self.label)
+            if not nodes:
+                set_label = self.constrain_by_service_instance
+        if not nodes:
+            if self.slice.default_node:
+                # if slice.default_node is set, then filter by default_node
+                nodes = Node.objects.filter(name=self.slice.default_node)
+            else:
+                nodes = Node.objects.all()
+        # convert to list
+        nodes = list(nodes)
+        # sort so that we pick the least-loaded node
+        nodes = sorted(nodes, key=lambda node: node.instances.count())
+        if not nodes:
+            raise Exception("LeastLoadedNodeScheduler: No suitable nodes to pick from")
+        picked_node = nodes[0]
+        if set_label:
+            nl = NodeLabel(name=self.label)
+            nl.node.add(picked_node)
+        # TODO: logic to filter nodes by which nodes are up, and which
+        #   nodes the slice can instantiate on.
+        return [picked_node, None]
+class TenantWithContainerPolicy(Policy):
+    # This policy is abstract. Inherit this class into your own policy and override model_name
+    model_name = None
+    def handle_create(self, tenant):
+        return self.handle_update(tenant)
+    def handle_update(self, service_instance):
+        if (service_instance.link_deleted_count > 0) and (
+            not service_instance.provided_links.exists()
+        ):
+            model = globals()[self.model_name]
+                "The last provided link has been deleted -- self-destructing."
+            )
+            self.handle_delete(service_instance)
+            if model.objects.filter(
+                service_instance.delete()
+            else:
+      "Tenant %s is already deleted" % service_instance)
+            return
+        self.manage_container(service_instance)
+    #    def handle_delete(self, tenant):
+    #        if tenant.vcpe:
+    #            tenant.vcpe.delete()
+    def save_instance(self, instance):
+        # Override this function to do custom pre-save or post-save processing,
+        # such as creating ports for containers.
+    def ip_to_mac(self, ip):
+        (a, b, c, d) = ip.split(".")
+        return "02:42:%02x:%02x:%02x:%02x" % (int(a), int(b), int(c), int(d))
+    def allocate_public_service_instance(self, **kwargs):
+        """ Get a ServiceInstance that provides public connectivity. Currently this means to use AddressPool and
+            the AddressManager Service.
+            Expect this to be refactored when we break hard-coded service dependencies.
+        """
+        address_pool_name = kwargs.pop("address_pool_name")
+        am_service = AddressManagerService.objects.all()  # TODO: Hardcoded dependency
+        if not am_service:
+            raise Exception("no addressing services")
+        am_service = am_service[0]
+        ap = AddressPool.objects.filter(
+            name=address_pool_name,
+        )
+        if not ap:
+            raise Exception("Addressing service unable to find addresspool %s" % name)
+        ap = ap[0]
+        ip = ap.get_address()
+        if not ip:
+            raise Exception("AddressPool '%s' has run out of addresses." %
+  # save the AddressPool to account for address being removed from it
+        subscriber_service = None
+        if "subscriber_service" in kwargs:
+            subscriber_service = kwargs.pop("subscriber_service")
+        subscriber_service_instance = None
+        if "subscriber_tenant" in kwargs:
+            subscriber_service_instance = kwargs.pop("subscriber_tenant")
+        elif "subscriber_service_instance" in kwargs:
+            subscriber_service_instance = kwargs.pop("subscriber_service_instance")
+        # TODO: potential partial failure -- AddressPool address is allocated and saved before addressing tenant
+        t = None
+        try:
+            t = AddressManagerServiceInstance(
+                owner=am_service, **kwargs
+            )  # TODO: Hardcoded dependency
+            t.public_ip = ip
+            t.public_mac = self.ip_to_mac(ip)
+            t.address_pool_id =
+            if subscriber_service:
+                link = ServiceInstanceLink(
+                    subscriber_service=subscriber_service, provider_service_instance=t
+                )
+            if subscriber_service_instance:
+                link = ServiceInstanceLink(
+                    subscriber_service_instance=subscriber_service_instance,
+                    provider_service_instance=t,
+                )
+        except BaseException:
+            # cleanup if anything went wrong
+            ap.put_address(ip)
+    # save the AddressPool to account for address being added to it
+            if t and
+                t.delete()
+            raise
+        return t
+    def get_image(self, tenant):
+        slice = tenant.owner.slices.all()
+        if not slice:
+            raise SynchronizerProgrammingError("provider service has no slice")
+        slice = slice[0]
+        # If slice has default_image set then use it
+        if slice.default_image:
+            return slice.default_image
+        raise SynchronizerProgrammingError(
+            "Please set a default image for %s" %
+        )
+    """ get_legacy_tenant_attribute
+        pick_least_loaded_instance_in_slice
+        count_of_tenants_of_an_instance
+        These three methods seem to be used by A-CORD. Look for ways to consolidate with existing methods and eliminate
+        these legacy ones
+    """
+    def get_legacy_tenant_attribute(self, tenant, name, default=None):
+        if tenant.service_specific_attribute:
+            attributes = json.loads(tenant.service_specific_attribute)
+        else:
+            attributes = {}
+        return attributes.get(name, default)
+    def pick_least_loaded_instance_in_slice(self, tenant, slices, image):
+        for slice in slices:
+            if slice.instances.all().count() > 0:
+                for instance in slice.instances.all():
+                    if instance.image != image:
+                        continue
+                    # Pick the first instance that has lesser than 5 tenants
+                    if self.count_of_tenants_of_an_instance(tenant, instance) < 5:
+                        return instance
+        return None
+    # TODO: Ideally the tenant count for an instance should be maintained using a
+    # many-to-one relationship attribute, however this model being proxy, it does
+    # not permit any new attributes to be defined. Find if any better solutions
+    def count_of_tenants_of_an_instance(self, tenant, instance):
+        tenant_count = 0
+        for tenant in self.__class__.objects.all():
+            if (
+                self.get_legacy_tenant_attribute(tenant, "instance_id", None)
+                ==
+            ):
+                tenant_count += 1
+        return tenant_count
+    def manage_container(self, tenant):
+        if tenant.deleted:
+            return
+        desired_image = self.get_image(tenant)
+        if (tenant.instance is not None) and (
+   !=
+        ):
+            tenant.instance.delete()
+            tenant.instance = None
+        if tenant.instance is None:
+            if not tenant.owner.slices.count():
+                raise SynchronizerConfigurationError("The service has no slices")
+            new_instance_created = False
+            instance = None
+            if self.get_legacy_tenant_attribute(
+                tenant, "use_same_instance_for_multiple_tenants", default=False
+            ):
+                # Find if any existing instances can be used for this tenant
+                slices = tenant.owner.slices.all()
+                instance = self.pick_least_loaded_instance_in_slice(
+                    tenant, slices, desired_image
+                )
+            if not instance:
+                slice = tenant.owner.slices.first()
+                flavor = slice.default_flavor
+                if not flavor:
+                    flavors = Flavor.objects.filter(name="m1.small")
+                    if not flavors:
+                        raise SynchronizerConfigurationError("No m1.small flavor")
+                    flavor = flavors[0]
+                if slice.default_isolation == "container_vm":
+                    raise Exception("Not implemented")
+                else:
+                    scheduler = getattr(self, "scheduler", LeastLoadedNodeScheduler)
+                    constrain_by_service_instance = getattr(
+                        self, "constrain_by_service_instance", False
+                    )
+                    tenant_node_label = getattr(tenant, "node_label", None)
+                    (node, parent) = scheduler(
+                        slice,
+                        label=tenant_node_label,
+                        constrain_by_service_instance=constrain_by_service_instance,
+                    ).pick()
+                assert slice is not None
+                assert node is not None
+                assert desired_image is not None
+                assert tenant.creator is not None
+                assert node.site_deployment.deployment is not None
+                assert flavor is not None
+                try:
+                    instance = Instance(
+                        slice=slice,
+                        node=node,
+                        image=desired_image,
+                        creator=tenant.creator,
+                        deployment=node.site_deployment.deployment,
+                        flavor=flavor,
+                        isolation=slice.default_isolation,
+                        parent=parent,
+                    )
+                    self.save_instance(instance)
+                    new_instance_created = True
+                    tenant.instance = instance
+                except BaseException:
+                    # NOTE: We don't have transactional support, so if the synchronizer crashes and exits after
+                    #       creating the instance, but before adding it to the tenant, then we will leave an
+                    #       orphaned instance.
+                    if new_instance_created:
+                        instance.delete()
+                    raise
+# Copyright 2017-present Open Networking Foundation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+    Base Classes for Model Policies
+from xosconfig import Config
+from multistructlog import create_logger
+log = create_logger(Config().get("logging"))
+class Policy(object):
+    """ An XOS Model Policy
+        Set the class member model_name to the name of the model that this policy will act on.
+        The following functions will be invoked if they are defined:
+            handle_create ... called when a model is created
+            handle_update ... called when a model is updated
+            handle_delete ... called when a model is deleted
+    """
+    def __init__(self):
+        self.logger = log
+# Copyright 2017-present Open Networking Foundation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+name: test-model-policies
+  username:
+  password: "sample"
+  kind: testframework
+  version: 1
+  handlers:
+    console:
+      class: logging.StreamHandler
+  loggers:
+    'multistructlog':
+      handlers:
+          - console
+# Copyright 2017-present Open Networking Foundation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from __future__ import print_function
+from xossynchronizer.modelaccessor import *
+from xossynchronizer.dependency_walker_new import *
+from xossynchronizer.policy import Policy
+import imp
+import pdb
+import time
+import traceback
+class XOSPolicyEngine(object):
+    def __init__(self, policies_dir, log):
+        self.model_policies = self.load_model_policies(policies_dir)
+        self.policies_by_name = {}
+        self.policies_by_class = {}
+        self.log = log
+        for policy in self.model_policies:
+            if policy.model_name not in self.policies_by_name:
+                self.policies_by_name[policy.model_name] = []
+            self.policies_by_name[policy.model_name].append(policy)
+            if policy.model not in self.policies_by_class:
+                self.policies_by_class[policy.model] = []
+            self.policies_by_class[policy.model].append(policy)
+    def update_wp(self, d, o):
+        try:
+            save_fields = []
+            if d.write_protect != o.write_protect:
+                d.write_protect = o.write_protect
+                save_fields.append("write_protect")
+            if save_fields:
+        except AttributeError as e:
+            raise e
+    def update_dep(self, d, o):
+        try:
+            print("Trying to update %s" % d)
+            save_fields = []
+            if d.updated < o.updated:
+                save_fields = ["updated"]
+            if save_fields:
+        except AttributeError as e:
+            log.exception("AttributeError in update_dep", e=e)
+            raise e
+        except Exception as e:
+            log.exception("Exception in update_dep", e=e)
+    def delete_if_inactive(self, d, o):
+        try:
+            d.delete()
+            print("Deleted %s (%s)" % (d, d.__class__.__name__))
+        except BaseException:
+            pass
+        return
+    def load_model_policies(self, policies_dir):
+        policies = []
+        for fn in os.listdir(policies_dir):
+            if fn.startswith("test"):
+                # don't try to import unit tests!
+                continue
+            pathname = os.path.join(policies_dir, fn)
+            if (
+                os.path.isfile(pathname)
+                and fn.endswith(".py")
+                and (fn != "")
+            ):
+                module = imp.load_source(fn[:-3], pathname)
+                for classname in dir(module):
+                    c = getattr(module, classname, None)
+                    # make sure 'c' is a descendent of Policy and has a
+                    # provides field (this eliminates the abstract base classes
+                    # since they don't have a provides)
+                    if (
+                        inspect.isclass(c)
+                        and issubclass(c, Policy)
+                        and hasattr(c, "model_name")
+                        and (c not in policies)
+                    ):
+                        if not c.model_name:
+                                "load_model_policies: skipping model policy",
+                                classname=classname,
+                            )
+                            continue
+                        if not model_accessor.has_model_class(c.model_name):
+                            log.error(
+                                "load_model_policies: unable to find model policy",
+                                classname=classname,
+                                model=c.model_name,
+                            )
+                        c.model = model_accessor.get_model_class(c.model_name)
+                        policies.append(c)
+"Loaded model policies", policies=policies)
+        return policies
+    def execute_model_policy(self, instance, action):
+        # These are the models whose children get deleted when they are
+        delete_policy_models = ["Slice", "Instance", "Network"]
+        sender_name = getattr(instance, "model_name", instance.__class__.__name__)
+        # if (action != "deleted"):
+        #    walk_inv_deps(self.update_dep, instance)
+        #    walk_deps(self.update_wp, instance)
+        # elif (sender_name in delete_policy_models):
+        #    walk_inv_deps(self.delete_if_inactive, instance)
+        policies_failed = False
+        for policy in self.policies_by_name.get(sender_name, None):
+            method_name = "handle_%s" % action
+            if hasattr(policy, method_name):
+                try:
+                    log.debug(
+                        "MODEL POLICY: calling handler",
+                        sender_name=sender_name,
+                        instance=instance,
+                        policy=policy.__name__,
+                        method=method_name,
+                    )
+                    getattr(policy(), method_name)(instance)
+                    log.debug(
+                        "MODEL POLICY: completed handler",
+                        sender_name=sender_name,
+                        instance=instance,
+                        policy_name=policy.__name__,
+                        method=method_name,
+                    )
+                except Exception as e:
+                    log.exception("MODEL POLICY: Exception when running handler", e=e)
+                    policies_failed = True
+                    try:
+                        instance.policy_status = "%s" % traceback.format_exc(limit=1)
+                        instance.policy_code = 2
+              ["policy_status", "policy_code"])
+                    except Exception as e:
+                        log.exception(
+                            "MODEL_POLICY: Exception when storing policy_status", e=e
+                        )
+        if not policies_failed:
+            try:
+                instance.policed = max(instance.updated, instance.changed_by_step)
+                instance.policy_status = "done"
+                instance.policy_code = 1
+      ["policed", "policy_status", "policy_code"])
+                if hasattr(policy, "after_policy_save"):
+                    policy().after_policy_save(instance)
+      "MODEL_POLICY: Saved", o=instance)
+            except BaseException:
+                log.exception(
+                    "MODEL POLICY: Object failed to update policed timestamp",
+                    instance=instance,
+                )
+    def noop(self, o, p):
+        pass
+    def run(self):
+        while True:
+            start = time.time()
+            try:
+                self.run_policy_once()
+            except Exception as e:
+                log.exception("MODEL_POLICY: Exception in run()", e=e)
+            if time.time() - start < 5:
+                time.sleep(5)
+    # TODO: This loop is different from the synchronizer event_loop, but they both do mostly the same thing. Look for
+    # ways to combine them.
+    def run_policy_once(self):
+        models = self.policies_by_class.keys()
+        model_accessor.check_db_connection_okay()
+        objects = model_accessor.fetch_policies(models, False)
+        deleted_objects = model_accessor.fetch_policies(models, True)
+        for o in objects:
+            if o.deleted:
+                # This shouldn't happen, but previous code was examining o.deleted. Verify.
+                continue
+            if not o.policed:
+                self.execute_model_policy(o, "create")
+            else:
+                self.execute_model_policy(o, "update")
+        for o in deleted_objects:
+            self.execute_model_policy(o, "delete")
+        try:
+            model_accessor.reset_queries()
+        except Exception as e:
+            # this shouldn't happen, but in case it does, catch it...
+            log.exception("MODEL POLICY: exception in reset_queries", e)
+# Copyright 2017-present Open Networking Foundation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+""" ModelAccessor
+    A class for abstracting access to models. Used to get any djangoisms out
+    of the synchronizer code base.
+    This module will import all models into this module's global scope, so doing
+    a "from modelaccessor import *" from a calling module ought to import all
+    models into the calling module's scope.
+import functools
+import importlib
+import os
+import signal
+import sys
+import time
+from loadmodels import ModelLoadClient
+from xosconfig import Config
+from multistructlog import create_logger
+from xosutil.autodiscover_version import autodiscover_version_of_main
+log = create_logger(Config().get("logging"))
+orig_sigint = None
+model_accessor = None
+class ModelAccessor(object):
+    def __init__(self):
+        self.all_model_classes = self.get_all_model_classes()
+    def __getattr__(self, name):
+        """ Wrapper for getattr to facilitate retrieval of classes """
+        has_model_class = self.__getattribute__("has_model_class")
+        get_model_class = self.__getattribute__("get_model_class")
+        if has_model_class(name):
+            return get_model_class(name)
+        # Default behaviour
+        return self.__getattribute__(name)
+    def get_all_model_classes(self):
+        """ Build a dictionary of all model class names """
+        raise Exception("Not Implemented")
+    def get_model_class(self, name):
+        """ Given a class name, return that model class """
+        return self.all_model_classes[name]
+    def has_model_class(self, name):
+        """ Given a class name, return that model class """
+        return name in self.all_model_classes
+    def fetch_pending(self, main_objs, deletion=False):
+        """ Execute the default fetch_pending query """
+        raise Exception("Not Implemented")
+    def fetch_policies(self, main_objs, deletion=False):
+        """ Execute the default fetch_pending query """
+        raise Exception("Not Implemented")
+    def reset_queries(self):
+        """ Reset any state between passes of synchronizer. For django, to
+            limit memory consumption of cached queries.
+        """
+        pass
+    def connection_close(self):
+        """ Close any active database connection. For django, to limit memory
+            consumption.
+        """
+        pass
+    def check_db_connection_okay(self):
+        """ Checks to make sure the db connection is okay """
+        pass
+    def obj_exists(self, o):
+        """ Return True if the object exists in the data model """
+        raise Exception("Not Implemented")
+    def obj_in_list(self, o, olist):
+        """ Return True if o is the same as one of the objects in olist """
+        raise Exception("Not Implemented")
+    def now(self):
+        """ Return the current time for timestamping purposes """
+        raise Exception("Not Implemented")
+    def is_type(self, obj, name):
+        """ returns True is obj is of model type "name" """
+        raise Exception("Not Implemented")
+    def is_instance(self, obj, name):
+        """ returns True if obj is of model type "name" or is a descendant """
+        raise Exception("Not Implemented")
+    def get_content_type_id(self, obj):
+        raise Exception("Not Implemented")
+    def journal_object(self, o, operation, msg=None, timestamp=None):
+        pass
+    def create_obj(self, cls, **kwargs):
+        raise Exception("Not Implemented")
+def import_models_to_globals():
+    # add all models to globals
+    for (k, v) in model_accessor.all_model_classes.items():
+        globals()[k] = v
+    # xosbase doesn't exist from the synchronizer's perspective, so fake out
+    # ModelLink.
+    if "ModelLink" not in globals():
+        class ModelLink:
+            def __init__(self, dest, via, into=None):
+                self.dest = dest
+                self.via = via
+                self.into = into
+        globals()["ModelLink"] = ModelLink
+def keep_trying(client, reactor):
+    # Keep checking the connection to wait for it to become unavailable.
+    # Then reconnect. The strategy is to send NoOp operations, one per second, until eventually a NoOp throws an
+    # exception. This will indicate the server has reset. When that happens, we force the client to reconnect, and
+    # it will download a new API from the server.
+    from xosapi.xos_grpc_client import Empty
+    try:
+        client.utility.NoOp(Empty())
+    except Exception as e:
+        # If we caught an exception, then the API has become unavailable.
+        # So reconnect.
+        log.exception("exception in NoOp", e=e)
+"restarting synchronizer")
+        os.execv(sys.executable, ["python"] + sys.argv)
+        return
+    reactor.callLater(1, functools.partial(keep_trying, client, reactor))
+def grpcapi_reconnect(client, reactor):
+    global model_accessor
+    # Make sure to try to load models before trying to initialize the ORM. It might be the ORM is broken because it
+    # is waiting on our models.
+    if Config.get("models_dir"):
+        version = autodiscover_version_of_main(max_parent_depth=0) or "unknown"
+"Service version is %s" % version)
+        try:
+            ModelLoadClient(client).upload_models(
+                Config.get("name"), Config.get("models_dir"), version=version
+            )
+        except Exception as e:  # TODO: narrow exception scope
+            if (
+                hasattr(e, "code")
+                and callable(e.code)
+                and hasattr(e.code(), "name")
+                and (e.code().name) == "UNAVAILABLE"
+            ):
+                # We need to make sure we force a reconnection, as it's possible that we will end up downloading a
+                # new xos API.
+      "grpc unavailable during loadmodels. Force a reconnect")
+                client.connected = False
+                client.connect()
+                return
+            log.exception("failed to onboard models")
+            # If it's some other error, then we don't need to force a reconnect. Just try the LoadModels() again.
+            reactor.callLater(10, functools.partial(grpcapi_reconnect, client, reactor))
+            return
+    # If the ORM is broken, then wait for the orm to become available.
+    if not getattr(client, "xos_orm", None):
+        log.warning("No xos_orm. Will keep trying...")
+        reactor.callLater(1, functools.partial(keep_trying, client, reactor))
+        return
+    # this will prevent updated timestamps from being automatically updated
+    client.xos_orm.caller_kind = "synchronizer"
+    client.xos_orm.restart_on_disconnect = True
+    from apiaccessor import CoreApiModelAccessor
+    model_accessor = CoreApiModelAccessor(orm=client.xos_orm)
+    # If required_models is set, then check to make sure the required_models
+    # are present. If not, then the synchronizer needs to go to sleep until
+    # the models show up.
+    required_models = Config.get("required_models")
+    if required_models:
+        required_models = [x.strip() for x in required_models]
+        missing = []
+        found = []
+        for model in required_models:
+            if model_accessor.has_model_class(model):
+                found.append(model)
+            else:
+                missing.append(model)
+"required_models, found:", models=", ".join(found))
+        if missing:
+            log.warning("required_models: missing", models=", ".join(missing))
+            # We're missing a required model. Give up and wait for the connection
+            # to reconnect, and hope our missing model has shown up.
+            reactor.callLater(1, functools.partial(keep_trying, client, reactor))
+            return
+    # import all models to global space
+    import_models_to_globals()
+    # Synchronizer framework isn't ready to embrace reactor yet...
+    reactor.stop()
+    # Restore the sigint handler
+    signal.signal(signal.SIGINT, orig_sigint)
+def config_accessor_grpcapi():
+    global orig_sigint
+"Connecting to the gRPC API")
+    grpcapi_endpoint = Config.get("accessor.endpoint")
+    grpcapi_username = Config.get("accessor.username")
+    grpcapi_password = Config.get("accessor.password")
+    # if password starts with "@", then retreive the password from a file
+    if grpcapi_password.startswith("@"):
+        fn = grpcapi_password[1:]
+        if not os.path.exists(fn):
+            raise Exception("%s does not exist" % fn)
+        grpcapi_password = open(fn).readline().strip()
+    from xosapi.xos_grpc_client import SecureClient
+    from twisted.internet import reactor
+    grpcapi_client = SecureClient(
+        endpoint=grpcapi_endpoint, username=grpcapi_username, password=grpcapi_password
+    )
+    grpcapi_client.set_reconnect_callback(
+        functools.partial(grpcapi_reconnect, grpcapi_client, reactor)
+    )
+    grpcapi_client.start()
+    # Start reactor. This will cause the client to connect and then execute
+    # grpcapi_callback().
+    # Reactor will take over SIGINT during, but does not return it when reactor.stop() is called.
+    orig_sigint = signal.getsignal(signal.SIGINT)
+    # Start reactor. This will cause the client to connect and then execute
+    # grpcapi_callback().
+def config_accessor_mock():
+    global model_accessor
+    from mock_modelaccessor import model_accessor as mock_model_accessor
+    model_accessor = mock_model_accessor
+    # mock_model_accessor doesn't have an all_model_classes field, so make one.
+    import mock_modelaccessor as mma
+    all_model_classes = {}
+    for k in dir(mma):
+        v = getattr(mma, k)
+        if hasattr(v, "leaf_model_name"):
+            all_model_classes[k] = v
+    model_accessor.all_model_classes = all_model_classes
+    import_models_to_globals()
+def config_accessor():
+    accessor_kind = Config.get("accessor.kind")
+    if accessor_kind == "testframework":
+        config_accessor_mock()
+    elif accessor_kind == "grpcapi":
+        config_accessor_grpcapi()
+    else:
+        raise Exception("Unknown accessor kind %s" % accessor_kind)
+    # now import any wrappers that the synchronizer needs to add to the ORM
+    if Config.get("wrappers"):
+        for wrapper_name in Config.get("wrappers"):
+            importlib.import_module(wrapper_name)
+# Copyright 2017-present Open Networking Foundation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import imp
+import inspect
+import os
+import threading
+import time
+from xosconfig import Config
+from multistructlog import create_logger
+log = create_logger(Config().get("logging"))
+class XOSPullStepScheduler:
+    """ XOSPullStepThread
+        A Thread for servicing pull steps. There is one event_step associated with one XOSPullStepThread.
+        The thread's pull_records() function is called for every five seconds.
+    """
+    def __init__(self, steps, *args, **kwargs):
+        self.steps = steps
+    def run(self):
+        while True:
+            time.sleep(5)
+            self.run_once()
+    def run_once(self):
+        log.trace("Starting pull steps", steps=self.steps)
+        threads = []
+        for step in self.steps:
+            thread = threading.Thread(target=step().pull_records, name="pull_step")
+            threads.append(thread)
+        for t in threads:
+            t.start()
+        for t in threads:
+            t.join()
+        log.trace("Done with pull steps", steps=self.steps)
+class XOSPullStepEngine:
+    """ XOSPullStepEngine
+        Load pull step modules. Two methods are defined:
+            load_pull_step_modules(dir) ... look for step modules in the given directory, and load objects that are
+                                       descendant from PullStep.
+            start() ... Launch threads to handle processing of the PullSteps. It's expected that load_step_modules()
+                        will be called before start().
+    """
+    def __init__(self):
+        self.pull_steps = []
+    def load_pull_step_modules(self, pull_step_dir):
+        self.pull_steps = []
+"Loading pull steps", pull_step_dir=pull_step_dir)
+        # NOTE we'll load all the classes that inherit from PullStep
+        for fn in os.listdir(pull_step_dir):
+            pathname = os.path.join(pull_step_dir, fn)
+            if (
+                os.path.isfile(pathname)
+                and fn.endswith(".py")
+                and (fn != "")
+                and ("test" not in fn)
+            ):
+                event_module = imp.load_source(fn[:-3], pathname)
+                for classname in dir(event_module):
+                    c = getattr(event_module, classname, None)
+                    if inspect.isclass(c):
+                        base_names = [b.__name__ for b in c.__bases__]
+                        if "PullStep" in base_names:
+                            self.pull_steps.append(c)
+"Loaded pull steps", steps=self.pull_steps)
+    def start(self):
+"Starting pull steps engine", steps=self.pull_steps)
+        for step in self.pull_steps:
+            sched = XOSPullStepScheduler(steps=self.pull_steps)
+# Copyright 2017-present Open Networking Foundation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# Copyright 2017-present Open Networking Foundation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+class PullStep(object):
+    """
+    All the pull steps defined in each synchronizer needs to inherit from this class in order to be loaded
+    """
+    def __init__(self, **kwargs):
+        """
+        Initialize a pull step
+        :param kwargs:
+        -- observed_model: name of the model that is being polled
+        """
+        self.observed_model = kwargs.get("observed_model")
+    def pull_records(self):
+        self.log.debug(
+            "There is no default pull_records, please provide a pull_records method for %s"
+            % self.observed_model
+        )
+# Copyright 2017-present Open Networking Foundation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import hashlib
+import os
+import socket
+import sys
+import base64
+import time
+from xosconfig import Config
+from xossynchronizer.steps.syncstep import SyncStep, DeferredException
+from xossynchronizer.ansible_helper import run_template_ssh
+from xossynchronizer.modelaccessor import *
+class SyncInstanceUsingAnsible(SyncStep):
+    # All of the following should be defined for classes derived from this
+    # base class. Examples below use VSGTenant.
+    # provides=[VSGTenant]
+    # observes=VSGTenant
+    # requested_interval=0
+    # template_name = "sync_vcpetenant.yaml"
+    def __init__(self, **args):
+        SyncStep.__init__(self, **args)
+    def skip_ansible_fields(self, o):
+        # Return True if the instance processing and get_ansible_fields stuff
+        # should be skipped. This hook is primarily for the OnosApp
+        # sync step, so it can do its external REST API sync thing.
+        return False
+    def defer_sync(self, o, reason):
+        # zdw, 2017-02-18 - is raising the exception here necessary? - seems like
+        # it's just logging the same thing twice
+"defer object", object=str(o), reason=reason, **o.tologdict())
+        raise DeferredException("defer object %s due to %s" % (str(o), reason))
+    def get_extra_attributes(self, o):
+        # This is a place to include extra attributes that aren't part of the
+        # object itself.
+        return {}
+    def get_instance(self, o):
+        # We need to know what instance is associated with the object. Let's
+        # assume 'o' has a field called 'instance'. If the field is called
+        # something else, or if custom logic is needed, then override this
+        # method.
+        return o.instance
+    def get_external_sync(self, o):
+        hostname = getattr(o, "external_hostname", None)
+        container = getattr(o, "external_container", None)
+        if hostname and container:
+            return (hostname, container)
+        else:
+            return None
+    def run_playbook(self, o, fields, template_name=None):
+        if not template_name:
+            template_name = self.template_name
+        tStart = time.time()
+        run_template_ssh(template_name, fields, object=o)
+            "playbook execution time", time=int(time.time() - tStart), **o.tologdict()
+        )
+    def pre_sync_hook(self, o, fields):
+        pass
+    def post_sync_hook(self, o, fields):
+        pass
+    def sync_fields(self, o, fields):
+        self.run_playbook(o, fields)
+    def prepare_record(self, o):
+        pass
+    def get_node(self, o):
+        return o.node
+    def get_node_key(self, node):
+        # NOTE `node_key` is never defined, does it differ from `proxy_ssh_key`? the value looks to be the same
+        return Config.get("node_key")
+    def get_key_name(self, instance):
+        if instance.isolation == "vm":
+            if (
+                instance.slice
+                and instance.slice.service
+                and instance.slice.service.private_key_fn
+            ):
+                key_name = instance.slice.service.private_key_fn
+            else:
+                raise Exception("Make sure to set private_key_fn in the service")
+        elif instance.isolation == "container":
+            node = self.get_node(instance)
+            key_name = self.get_node_key(node)
+        else:
+            # container in VM
+            key_name = instance.parent.slice.service.private_key_fn
+        return key_name
+    def get_ansible_fields(self, instance):
+        # return all of the fields that tell Ansible how to talk to the context
+        # that's setting up the container.
+        if instance.isolation == "vm":
+            # legacy where container was configured by
+            fields = {
+                "instance_name":,
+                "hostname":,
+                "instance_id": instance.instance_id,
+                "username": "ubuntu",
+                "ssh_ip": instance.get_ssh_ip(),
+            }
+        elif instance.isolation == "container":
+            # container on bare metal
+            node = self.get_node(instance)
+            hostname =
+            fields = {
+                "hostname": hostname,
+                "baremetal_ssh": True,
+                "instance_name": "rootcontext",
+                "username": "root",
+                "container_name": "%s-%s" % (, str(
+                # ssh_ip is not used for container-on-metal
+            }
+        else:
+            # container in a VM
+            if not instance.parent:
+                raise Exception("Container-in-VM has no parent")
+            if not instance.parent.instance_id:
+                raise Exception("Container-in-VM parent is not yet instantiated")
+            if not instance.parent.slice.service:
+                raise Exception("Container-in-VM parent has no service")
+            if not instance.parent.slice.service.private_key_fn:
+                raise Exception("Container-in-VM parent service has no private_key_fn")
+            fields = {
+                "hostname":,
+                "instance_name":,
+                "instance_id": instance.parent.instance_id,
+                "username": "ubuntu",
+                "ssh_ip": instance.parent.get_ssh_ip(),
+                "container_name": "%s-%s" % (, str(,
+            }
+        key_name = self.get_key_name(instance)
+        if not os.path.exists(key_name):
+            raise Exception("Node key %s does not exist" % key_name)
+        key = file(key_name).read()
+        fields["private_key"] = key
+        # Now the ceilometer stuff
+        # Only do this if the instance is not being deleted.
+        if not instance.deleted:
+            cslice = ControllerSlice.objects.get(
+            if not cslice:
+                raise Exception(
+                    "Controller slice object for %s does not exist"
+                    %
+                )
+            cuser = ControllerUser.objects.get(
+            if not cuser:
+                raise Exception(
+                    "Controller user object for %s does not exist" % instance.creator
+                )
+            fields.update(
+                {
+                    "keystone_tenant_id": cslice.tenant_id,
+                    "keystone_user_id": cuser.kuser_id,
+                    "rabbit_user": getattr(instance.controller, "rabbit_user", None),
+                    "rabbit_password": getattr(
+                        instance.controller, "rabbit_password", None
+                    ),
+                    "rabbit_host": getattr(instance.controller, "rabbit_host", None),
+                }
+            )
+        return fields
+    def sync_record(self, o):
+"sync'ing object", object=str(o), **o.tologdict())
+        self.prepare_record(o)
+        if self.skip_ansible_fields(o):
+            fields = {}
+        else:
+            if self.get_external_sync(o):
+                # sync to some external host
+                # UNTESTED
+                (hostname, container_name) = self.get_external_sync(o)
+                fields = {
+                    "hostname": hostname,
+                    "baremetal_ssh": True,
+                    "instance_name": "rootcontext",
+                    "username": "root",
+                    "container_name": container_name,
+                }
+                key_name = self.get_node_key(node)
+                if not os.path.exists(key_name):
+                    raise Exception("Node key %s does not exist" % key_name)
+                key = file(key_name).read()
+                fields["private_key"] = key
+                # TO DO: Ceilometer stuff
+            else:
+                instance = self.get_instance(o)
+                # sync to an XOS instance
+                if not instance:
+                    self.defer_sync(o, "waiting on instance")
+                    return
+                if not instance.instance_name:
+                    self.defer_sync(o, "waiting on instance.instance_name")
+                    return
+                fields = self.get_ansible_fields(instance)
+        fields["ansible_tag"] = getattr(
+            o, "ansible_tag", o.__class__.__name__ + "_" + str(
+        )
+        # If 'o' defines a 'sync_attributes' list, then we'll copy those
+        # attributes into the Ansible recipe's field list automatically.
+        if hasattr(o, "sync_attributes"):
+            for attribute_name in o.sync_attributes:
+                fields[attribute_name] = getattr(o, attribute_name)
+        fields.update(self.get_extra_attributes(o))
+        self.sync_fields(o, fields)
+    def delete_record(self, o):
+        try:
+            # TODO: This may be broken, as get_controller() does not exist in convenience wrapper
+            controller = o.get_controller()
+            controller_register = json.loads(
+                o.node.site_deployment.controller.backend_register
+            )
+            if controller_register.get("disabled", False):
+                raise InnocuousException(
+                    "Controller %s is disabled" %
+                )
+        except AttributeError:
+            pass
+        instance = self.get_instance(o)
+        if not instance:
+            # the instance is gone. There's nothing left for us to do.
+            return
+        if instance.deleted:
+            # the instance is being deleted. There's nothing left for us to do.
+            return
+        if isinstance(instance, basestring):
+            # sync to some external host
+            # XXX - this probably needs more work...
+            fields = {
+                "hostname": instance,
+                "instance_id": "ubuntu",  # this is the username to log into
+                "private_key": service.key,
+            }
+        else:
+            # sync to an XOS instance
+            fields = self.get_ansible_fields(instance)
+            fields["ansible_tag"] = getattr(
+                o, "ansible_tag", o.__class__.__name__ + "_" + str(
+            )
+        # If 'o' defines a 'sync_attributes' list, then we'll copy those
+        # attributes into the Ansible recipe's field list automatically.
+        if hasattr(o, "sync_attributes"):
+            for attribute_name in o.sync_attributes:
+                fields[attribute_name] = getattr(o, attribute_name)
+        if hasattr(self, "map_delete_inputs"):
+            fields.update(self.map_delete_inputs(o))
+        fields["delete"] = True
+        res = self.run_playbook(o, fields)
+        if hasattr(self, "map_delete_outputs"):
+            self.map_delete_outputs(o, res)
+# Copyright 2017-present Open Networking Foundation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# Copyright 2017-present Open Networking Foundation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from synchronizers.new_base.syncstep import *
+class SyncObject(SyncStep):
+    provides = []  # Caller fills this in
+    requested_interval = 0
+    observes = []  # Caller fills this in
+    def sync_record(self, r):
+        raise DeferredException("Waiting for Service dependency: %r" % r)
+# Copyright 2017-present Open Networking Foundation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import os
+import base64
+from xosconfig import Config
+from xossynchronizer.modelaccessor import *
+from xossynchronizer.ansible_helper import run_template
+# from tests.steps.mock_modelaccessor import model_accessor
+import json
+import time
+import pdb
+from xosconfig import Config
+from functools import reduce
+def f7(seq):
+    seen = set()
+    seen_add = seen.add
+    return [x for x in seq if not (x in seen or seen_add(x))]
+def elim_dups(backend_str):
+    strs = backend_str.split(" // ")
+    strs2 = f7(strs)
+    return " // ".join(strs2)
+def deepgetattr(obj, attr):
+    return reduce(getattr, attr.split("."), obj)
+def obj_class_name(obj):
+    return getattr(obj, "model_name", obj.__class__.__name__)
+class InnocuousException(Exception):
+    pass
+class DeferredException(Exception):
+    pass
+class FailedDependency(Exception):
+    pass
+class SyncStep(object):
+    """ An XOS Sync step.
+    Attributes:
+        psmodel        Model name the step synchronizes
+        dependencies    list of names of models that must be synchronized first if the current model depends on them
+    """
+    # map_sync_outputs can return this value to cause a step to be marked
+    # successful without running ansible. Used for sync_network_controllers
+    # on nat networks.
+    SYNC_WITHOUT_RUNNING = "sync_without_running"
+    slow = False
+    def get_prop(self, prop):
+        # NOTE config_dir is never define, is this used?
+        sync_config_dir = Config.get("config_dir")
+        prop_config_path = "/".join(sync_config_dir,, prop)
+        return open(prop_config_path).read().rstrip()
+    def __init__(self, **args):
+        """Initialize a sync step
+           Keyword arguments:
+                   name -- Name of the step
+                provides -- XOS models sync'd by this step
+        """
+        dependencies = []
+        self.driver = args.get("driver")
+        self.error_map = args.get("error_map")
+        try:
+            self.soft_deadline = int(self.get_prop("soft_deadline_seconds"))
+        except BaseException:
+            self.soft_deadline = 5  # 5 seconds
+        if "log" in args:
+            self.log = args.get("log")
+        return
+    def fetch_pending(self, deletion=False):
+        # This is the most common implementation of fetch_pending
+        # Steps should override it if they have their own logic
+        # for figuring out what objects are outstanding.
+        return model_accessor.fetch_pending(self.observes, deletion)
+    def sync_record(self, o):
+        self.log.debug("In default sync record", **o.tologdict())
+        tenant_fields = self.map_sync_inputs(o)
+        if tenant_fields == SyncStep.SYNC_WITHOUT_RUNNING:
+            return
+        main_objs = self.observes
+        if isinstance(main_objs, list):
+            main_objs = main_objs[0]
+        path = "".join(main_objs.__name__).lower()
+        res = run_template(self.playbook, tenant_fields, path=path, object=o)
+        if hasattr(self, "map_sync_outputs"):
+            self.map_sync_outputs(o, res)
+        self.log.debug("Finished default sync record", **o.tologdict())
+    def delete_record(self, o):
+        self.log.debug("In default delete record", **o.tologdict())
+        # If there is no map_delete_inputs, then assume deleting a record is a no-op.
+        if not hasattr(self, "map_delete_inputs"):
+            return
+        tenant_fields = self.map_delete_inputs(o)
+        main_objs = self.observes
+        if isinstance(main_objs, list):
+            main_objs = main_objs[0]
+        path = "".join(main_objs.__name__).lower()
+        tenant_fields["delete"] = True
+        res = run_template(self.playbook, tenant_fields, path=path)
+        if hasattr(self, "map_delete_outputs"):
+            self.map_delete_outputs(o, res)
+        else:
+            # "rc" is often only returned when something bad happens, so assume that no "rc" implies a successful rc
+            # of 0.
+            if res[0].get("rc", 0) != 0:
+                raise Exception("Nonzero rc from Ansible during delete_record")
+        self.log.debug("Finished default delete record", **o.tologdict())
+#!/usr/bin/env python
+# Copyright 2017-present Open Networking Foundation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import os
+import sys
+import time
+from xosconfig import Config
+from multistructlog import create_logger
+class Synchronizer(object):
+    def __init__(self):
+        self.log = create_logger(Config().get("logging"))
+    def create_model_accessor(self):
+        from modelaccessor import model_accessor
+        self.model_accessor = model_accessor
+    def wait_for_ready(self):
+        models_active = False
+        wait = False
+        while not models_active:
+            try:
+                _i = self.model_accessor.Instance.objects.first()
+                _n = self.model_accessor.NetworkTemplate.objects.first()
+                models_active = True
+            except Exception as e:
+      "Exception", e=e)
+      "Waiting for data model to come up before starting...")
+                time.sleep(10)
+                wait = True
+        if wait:
+            time.sleep(
+                60
+            )  # Safety factor, seeing that we stumbled waiting for the data model to come up.
+    def run(self):
+        self.create_model_accessor()
+        self.wait_for_ready()
+        # Don't import backend until after the model accessor has been initialized. This is to support sync steps that
+        # use `from xossynchronizer.modelaccessor import ...` and require the model accessor to be initialized before
+        # their code can be imported.
+        from backend import Backend
+        log_closure = self.log.bind(synchronizer_name=Config().get("name"))
+        backend = Backend(log=log_closure)
+if __name__ == "__main__":
+    main()