SEBA-498 Remove old synchronizer framework
Change-Id: Ic663011ad658475d1e887abae52b6d862e686071
diff --git a/VERSION b/VERSION
index 8c57128..4a36342 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.2.18
+3.0.0
diff --git a/containers/chameleon/Dockerfile.chameleon b/containers/chameleon/Dockerfile.chameleon
index 2b218e4..ba53dc9 100644
--- a/containers/chameleon/Dockerfile.chameleon
+++ b/containers/chameleon/Dockerfile.chameleon
@@ -13,7 +13,8 @@
# limitations under the License.
# xosproject/chameleon
-FROM xosproject/xos-base:2.2.18
+
+FROM xosproject/xos-base:3.0.0
# xos-base already has protoc and dependencies installed
diff --git a/containers/xos/Dockerfile.client b/containers/xos/Dockerfile.client
index bc01347..47e5c54 100644
--- a/containers/xos/Dockerfile.client
+++ b/containers/xos/Dockerfile.client
@@ -13,7 +13,8 @@
# limitations under the License.
# xosproject/xos-client
-FROM xosproject/xos-libraries:2.2.18
+
+FROM xosproject/xos-libraries:3.0.0
# Install XOS client
COPY lib/xos-api /tmp/xos-api
diff --git a/containers/xos/Dockerfile.libraries b/containers/xos/Dockerfile.libraries
index 590a2e9..56a81b7 100644
--- a/containers/xos/Dockerfile.libraries
+++ b/containers/xos/Dockerfile.libraries
@@ -13,7 +13,8 @@
# limitations under the License.
# xosproject/xos-libraries
-FROM xosproject/xos-base:2.2.18
+
+FROM xosproject/xos-base:3.0.0
# Add libraries
COPY lib /opt/xos/lib
diff --git a/containers/xos/Dockerfile.synchronizer-base b/containers/xos/Dockerfile.synchronizer-base
index 175b378..8faa570 100644
--- a/containers/xos/Dockerfile.synchronizer-base
+++ b/containers/xos/Dockerfile.synchronizer-base
@@ -13,12 +13,12 @@
# limitations under the License.
# xosproject/xos-synchronizer-base
-FROM xosproject/xos-client:2.2.18
-COPY xos/synchronizers/new_base /opt/xos/synchronizers/new_base
+FROM xosproject/xos-client:3.0.0
+
COPY xos/xos/logger.py /opt/xos/xos/logger.py
COPY xos/xos/__init__.py /opt/xos/xos/__init__.py
-COPY xos/synchronizers/__init__.py /opt/xos/synchronizers/__init__.py
+RUN mkdir -p /opt/xos/synchronizers
# Copy over ansible hosts
COPY containers/xos/ansible-hosts /etc/ansible/hosts
diff --git a/containers/xos/Dockerfile.xos-core b/containers/xos/Dockerfile.xos-core
index 8ed98f9..2b9ef92 100644
--- a/containers/xos/Dockerfile.xos-core
+++ b/containers/xos/Dockerfile.xos-core
@@ -13,7 +13,8 @@
# limitations under the License.
# xosproject/xos-core
-FROM xosproject/xos-libraries:2.2.18
+
+FROM xosproject/xos-libraries:3.0.0
# Install XOS
ADD xos /opt/xos
diff --git a/lib/xos-synchronizer/requirements.txt b/lib/xos-synchronizer/requirements.txt
index dc7e5b9..ce153d5 100644
--- a/lib/xos-synchronizer/requirements.txt
+++ b/lib/xos-synchronizer/requirements.txt
@@ -6,7 +6,6 @@
astunparse~=1.5.0
xosconfig~=2.2.6
xosgenx~=2.2.6
-xosutil~=2.2.6
# remove once xosconfig/xosgenx are updated with correct requirements.txt
plyxproto~=4.0.0
diff --git a/lib/xos-synchronizer/xossynchronizer/modelaccessor.py b/lib/xos-synchronizer/xossynchronizer/modelaccessor.py
index b4b26f3..d408ace 100644
--- a/lib/xos-synchronizer/xossynchronizer/modelaccessor.py
+++ b/lib/xos-synchronizer/xossynchronizer/modelaccessor.py
@@ -32,7 +32,6 @@
from threading import Timer
from xosconfig import Config
-from xosutil.autodiscover_version import autodiscover_version_of_main
from .loadmodels import ModelLoadClient
@@ -204,6 +203,19 @@
after_reactor_exit_code = code
+def get_synchronizer_version():
+ import __main__ as synchronizer_main
+
+ # VERSION file should be in same directory as the synchronizer's __main__
+ if hasattr(synchronizer_main, "__file__"):
+ version_fn = os.path.join(os.path.dirname(synchronizer_main.__file__), "VERSION")
+ if os.path.exists(version_fn):
+ version = open(version_fn, "rt").readline().strip()
+ if version:
+ return version
+ return "unknown"
+
+
def grpcapi_reconnect(client, reactor):
global model_accessor
@@ -211,7 +223,7 @@
# is waiting on our models.
if Config.get("models_dir"):
- version = autodiscover_version_of_main(max_parent_depth=0) or "unknown"
+ version = get_synchronizer_version()
log.info("Service version is %s" % version, core_version=Config.get("core_version"))
try:
if Config.get("desired_state") == "load":
diff --git a/xos/synchronizers/.gitignore b/xos/synchronizers/.gitignore
deleted file mode 100644
index f0650b7..0000000
--- a/xos/synchronizers/.gitignore
+++ /dev/null
@@ -1,6 +0,0 @@
-new_base/event_loop
-new_base/mock_modelaccessor.py
-new_base/mock_modelaccessor.py.context
-new_base/synchronizers.new_base.ansible_helper
-new_base/xos.synchronizers.new_base.tests.test_payload
-new_base/synchronizers.new_base.diag
\ No newline at end of file
diff --git a/xos/synchronizers/__init__.py b/xos/synchronizers/__init__.py
deleted file mode 100644
index b0fb0b2..0000000
--- a/xos/synchronizers/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
diff --git a/xos/synchronizers/model_policy.py b/xos/synchronizers/model_policy.py
deleted file mode 100644
index 5374e01..0000000
--- a/xos/synchronizers/model_policy.py
+++ /dev/null
@@ -1,243 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from __future__ import print_function
-from multistructlog import create_logger
-from xosconfig import Config
-from core.models import Privilege, imp
-from django.db import reset_queries
-from django.db.models import F, Q
-from django.utils import timezone
-from generate.dependency_walker import walk_deps, walk_inv_deps
-
-import os
-import time
-import traceback
-
-modelPolicyEnabled = True
-bad_instances = []
-
-model_policies = {}
-
-
-Config.init()
-log = create_logger(Config().get("logging"))
-
-
-def EnableModelPolicy(x):
- global modelPolicyEnabled
- modelPolicyEnabled = x
-
-
-def update_wp(d, o):
- try:
- save_fields = []
- if d.write_protect != o.write_protect:
- d.write_protect = o.write_protect
- save_fields.append("write_protect")
- if save_fields:
- d.save(update_fields=save_fields)
- except AttributeError as e:
- raise e
-
-
-def update_dep(d, o):
- try:
- print("Trying to update %s" % d)
- save_fields = []
- if d.updated < o.updated:
- save_fields = ["updated"]
-
- if save_fields:
- d.save(update_fields=save_fields)
- except AttributeError as e:
- log.exception("AttributeError in update_dep", e=e)
- raise e
- except Exception as e:
- log.exception("Exception in update_dep", e=e)
-
-
-def delete_if_inactive(d, o):
- try:
- d.delete()
- print("Deleted %s (%s)" % (d, d.__class__.__name__))
- except BaseException:
- pass
- return
-
-
-def load_model_policies(policies_dir=None):
- global model_policies
-
- if policies_dir is None:
- policies_dir = Config().observer_model_policies_dir
-
- for fn in os.listdir(policies_dir):
- pathname = os.path.join(policies_dir, fn)
- if (
- os.path.isfile(pathname)
- and fn.startswith("model_policy_")
- and fn.endswith(".py")
- and (fn != "__init__.py")
- ):
- model_policies[fn[:-3]] = imp.load_source(fn[:-3], pathname)
-
- log.debug(
- "Loaded model polices %s from %s"
- % (",".join(model_policies.keys()), policies_dir)
- )
-
-
-# @atomic
-def execute_model_policy(instance, deleted):
- # Automatic dirtying
- if instance in bad_instances:
- return
-
- # These are the models whose children get deleted when they are
- delete_policy_models = ["Slice", "Instance", "Network"]
- sender_name = instance.__class__.__name__
- policy_name = "model_policy_%s" % sender_name
-
- if not deleted:
- walk_inv_deps(update_dep, instance)
- walk_deps(update_wp, instance)
- elif sender_name in delete_policy_models:
- walk_inv_deps(delete_if_inactive, instance)
-
- try:
- policy_handler = model_policies.get(
- policy_name, None
- ) # getattr(model_policies, policy_name, None)
- log.debug(
- "MODEL POLICY: handler %s %s",
- policy_name=policy_name,
- policy_handler=policy_handler,
- )
- if policy_handler is not None:
- if deleted:
- try:
- policy_handler.handle_delete(instance)
- except AttributeError:
- pass
- else:
- policy_handler.handle(instance)
- log.debug(
- "MODEL POLICY: completed handler",
- policy_name=policy_name,
- policy_handler=policy_handler,
- )
- except Exception as e:
- log.exception("MODEL POLICY: Exception when running handler", e=e)
-
- try:
- instance.policed = timezone.now()
- instance.save(update_fields=["policed"])
- except BaseException:
- log.exception("MODEL POLICY: Object is defective", object=instance)
- bad_instances.append(instance)
-
-
-def noop(o, p):
- pass
-
-
-def check_db_connection_okay():
- # django implodes if the database connection is closed by docker-compose
- from django import db
-
- try:
- db.connection.cursor()
- except Exception as e:
- if "connection already closed" in traceback.format_exc():
- log.error("XXX connection already closed")
- try:
- # if db.connection:
- # db.connection.close()
- db.close_old_connections()
- except Exception as f:
- log.exception("XXX we failed to fix the failure", e=f)
- else:
- log.exception("XXX some other error", e=e)
-
-
-def run_policy():
- load_model_policies()
-
- while True:
- start = time.time()
- try:
- run_policy_once()
- except Exception as e:
- log.exception("MODEL_POLICY: Exception in run_policy()", e)
-
- if time.time() - start < 1:
- time.sleep(1)
-
-
-def run_policy_once():
- from core.models import (
- Instance,
- Slice,
- Controller,
- Network,
- User,
- SlicePrivilege,
- Site,
- SitePrivilege,
- Image,
- ControllerSlice,
- ControllerUser,
- ControllerSite,
- )
-
- models = [
- Controller,
- Site,
- SitePrivilege,
- Image,
- ControllerSlice,
- ControllerSite,
- ControllerUser,
- User,
- Slice,
- Network,
- Instance,
- SlicePrivilege,
- Privilege,
- ]
- objects = []
- deleted_objects = []
-
- check_db_connection_okay()
-
- for m in models:
- res = m.objects.filter(
- (Q(policed__lt=F("updated")) | Q(policed=None)) & Q(no_policy=False)
- )
- objects.extend(res)
- res = m.deleted_objects.filter(Q(policed__lt=F("updated")) | Q(policed=None))
- deleted_objects.extend(res)
-
- for o in objects:
- execute_model_policy(o, o.deleted)
-
- for o in deleted_objects:
- execute_model_policy(o, True)
-
- try:
- reset_queries()
- except Exception as e:
- # this shouldn't happen, but in case it does, catch it...
- log.exception("MODEL POLICY: exception in reset_queries", e=e)
diff --git a/xos/synchronizers/new_base/SyncInstanceUsingAnsible.py b/xos/synchronizers/new_base/SyncInstanceUsingAnsible.py
deleted file mode 100644
index 1ffe3e2..0000000
--- a/xos/synchronizers/new_base/SyncInstanceUsingAnsible.py
+++ /dev/null
@@ -1,320 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import hashlib
-import os
-import socket
-import sys
-import base64
-import time
-from xosconfig import Config
-
-from synchronizers.new_base.syncstep import SyncStep, DeferredException
-from synchronizers.new_base.ansible_helper import run_template_ssh
-from synchronizers.new_base.modelaccessor import *
-
-
-class SyncInstanceUsingAnsible(SyncStep):
- # All of the following should be defined for classes derived from this
- # base class. Examples below use VSGTenant.
-
- # provides=[VSGTenant]
- # observes=VSGTenant
- # requested_interval=0
- # template_name = "sync_vcpetenant.yaml"
-
- def __init__(self, **args):
- SyncStep.__init__(self, **args)
-
- def skip_ansible_fields(self, o):
- # Return True if the instance processing and get_ansible_fields stuff
- # should be skipped. This hook is primarily for the OnosApp
- # sync step, so it can do its external REST API sync thing.
- return False
-
- def defer_sync(self, o, reason):
- # zdw, 2017-02-18 - is raising the exception here necessary? - seems like
- # it's just logging the same thing twice
- self.log.info("defer object", object=str(o), reason=reason, **o.tologdict())
- raise DeferredException("defer object %s due to %s" % (str(o), reason))
-
- def get_extra_attributes(self, o):
- # This is a place to include extra attributes that aren't part of the
- # object itself.
-
- return {}
-
- def get_instance(self, o):
- # We need to know what instance is associated with the object. Let's
- # assume 'o' has a field called 'instance'. If the field is called
- # something else, or if custom logic is needed, then override this
- # method.
-
- return o.instance
-
- def get_external_sync(self, o):
- hostname = getattr(o, "external_hostname", None)
- container = getattr(o, "external_container", None)
- if hostname and container:
- return (hostname, container)
- else:
- return None
-
- def run_playbook(self, o, fields, template_name=None):
- if not template_name:
- template_name = self.template_name
- tStart = time.time()
- run_template_ssh(template_name, fields, object=o)
- self.log.info(
- "playbook execution time", time=int(time.time() - tStart), **o.tologdict()
- )
-
- def pre_sync_hook(self, o, fields):
- pass
-
- def post_sync_hook(self, o, fields):
- pass
-
- def sync_fields(self, o, fields):
- self.run_playbook(o, fields)
-
- def prepare_record(self, o):
- pass
-
- def get_node(self, o):
- return o.node
-
- def get_node_key(self, node):
- # NOTE `node_key` is never defined, does it differ from `proxy_ssh_key`? the value looks to be the same
- return Config.get("node_key")
-
- def get_key_name(self, instance):
- if instance.isolation == "vm":
- if (
- instance.slice
- and instance.slice.service
- and instance.slice.service.private_key_fn
- ):
- key_name = instance.slice.service.private_key_fn
- else:
- raise Exception("Make sure to set private_key_fn in the service")
- elif instance.isolation == "container":
- node = self.get_node(instance)
- key_name = self.get_node_key(node)
- else:
- # container in VM
- key_name = instance.parent.slice.service.private_key_fn
-
- return key_name
-
- def get_ansible_fields(self, instance):
- # return all of the fields that tell Ansible how to talk to the context
- # that's setting up the container.
-
- if instance.isolation == "vm":
- # legacy where container was configured by sync_vcpetenant.py
-
- fields = {
- "instance_name": instance.name,
- "hostname": instance.node.name,
- "instance_id": instance.instance_id,
- "username": "ubuntu",
- "ssh_ip": instance.get_ssh_ip(),
- }
-
- elif instance.isolation == "container":
- # container on bare metal
- node = self.get_node(instance)
- hostname = node.name
- fields = {
- "hostname": hostname,
- "baremetal_ssh": True,
- "instance_name": "rootcontext",
- "username": "root",
- "container_name": "%s-%s" % (instance.slice.name, str(instance.id))
- # ssh_ip is not used for container-on-metal
- }
- else:
- # container in a VM
- if not instance.parent:
- raise Exception("Container-in-VM has no parent")
- if not instance.parent.instance_id:
- raise Exception("Container-in-VM parent is not yet instantiated")
- if not instance.parent.slice.service:
- raise Exception("Container-in-VM parent has no service")
- if not instance.parent.slice.service.private_key_fn:
- raise Exception("Container-in-VM parent service has no private_key_fn")
- fields = {
- "hostname": instance.parent.node.name,
- "instance_name": instance.parent.name,
- "instance_id": instance.parent.instance_id,
- "username": "ubuntu",
- "ssh_ip": instance.parent.get_ssh_ip(),
- "container_name": "%s-%s" % (instance.slice.name, str(instance.id)),
- }
-
- key_name = self.get_key_name(instance)
- if not os.path.exists(key_name):
- raise Exception("Node key %s does not exist" % key_name)
-
- key = file(key_name).read()
-
- fields["private_key"] = key
-
- # Now the ceilometer stuff
- # Only do this if the instance is not being deleted.
- if not instance.deleted:
- cslice = ControllerSlice.objects.get(slice_id=instance.slice.id)
- if not cslice:
- raise Exception(
- "Controller slice object for %s does not exist"
- % instance.slice.name
- )
-
- cuser = ControllerUser.objects.get(user_id=instance.creator.id)
- if not cuser:
- raise Exception(
- "Controller user object for %s does not exist" % instance.creator
- )
-
- fields.update(
- {
- "keystone_tenant_id": cslice.tenant_id,
- "keystone_user_id": cuser.kuser_id,
- "rabbit_user": getattr(instance.controller, "rabbit_user", None),
- "rabbit_password": getattr(
- instance.controller, "rabbit_password", None
- ),
- "rabbit_host": getattr(instance.controller, "rabbit_host", None),
- }
- )
-
- return fields
-
- def sync_record(self, o):
- self.log.info("sync'ing object", object=str(o), **o.tologdict())
-
- self.prepare_record(o)
-
- if self.skip_ansible_fields(o):
- fields = {}
- else:
- if self.get_external_sync(o):
- # sync to some external host
-
- # UNTESTED
-
- (hostname, container_name) = self.get_external_sync(o)
- fields = {
- "hostname": hostname,
- "baremetal_ssh": True,
- "instance_name": "rootcontext",
- "username": "root",
- "container_name": container_name,
- }
- key_name = self.get_node_key(node)
- if not os.path.exists(key_name):
- raise Exception("Node key %s does not exist" % key_name)
-
- key = file(key_name).read()
-
- fields["private_key"] = key
- # TO DO: Ceilometer stuff
- else:
- instance = self.get_instance(o)
- # sync to an XOS instance
- if not instance:
- self.defer_sync(o, "waiting on instance")
- return
-
- if not instance.instance_name:
- self.defer_sync(o, "waiting on instance.instance_name")
- return
-
- fields = self.get_ansible_fields(instance)
-
- fields["ansible_tag"] = getattr(
- o, "ansible_tag", o.__class__.__name__ + "_" + str(o.id)
- )
-
- # If 'o' defines a 'sync_attributes' list, then we'll copy those
- # attributes into the Ansible recipe's field list automatically.
- if hasattr(o, "sync_attributes"):
- for attribute_name in o.sync_attributes:
- fields[attribute_name] = getattr(o, attribute_name)
-
- fields.update(self.get_extra_attributes(o))
-
- self.sync_fields(o, fields)
-
- o.save()
-
- def delete_record(self, o):
- try:
- # TODO: This may be broken, as get_controller() does not exist in convenience wrapper
- controller = o.get_controller()
- controller_register = json.loads(
- o.node.site_deployment.controller.backend_register
- )
-
- if controller_register.get("disabled", False):
- raise InnocuousException(
- "Controller %s is disabled" % o.node.site_deployment.controller.name
- )
- except AttributeError:
- pass
-
- instance = self.get_instance(o)
-
- if not instance:
- # the instance is gone. There's nothing left for us to do.
- return
-
- if instance.deleted:
- # the instance is being deleted. There's nothing left for us to do.
- return
-
- if isinstance(instance, basestring):
- # sync to some external host
-
- # XXX - this probably needs more work...
-
- fields = {
- "hostname": instance,
- "instance_id": "ubuntu", # this is the username to log into
- "private_key": service.key,
- }
- else:
- # sync to an XOS instance
- fields = self.get_ansible_fields(instance)
-
- fields["ansible_tag"] = getattr(
- o, "ansible_tag", o.__class__.__name__ + "_" + str(o.id)
- )
-
- # If 'o' defines a 'sync_attributes' list, then we'll copy those
- # attributes into the Ansible recipe's field list automatically.
- if hasattr(o, "sync_attributes"):
- for attribute_name in o.sync_attributes:
- fields[attribute_name] = getattr(o, attribute_name)
-
- if hasattr(self, "map_delete_inputs"):
- fields.update(self.map_delete_inputs(o))
-
- fields["delete"] = True
- res = self.run_playbook(o, fields)
-
- if hasattr(self, "map_delete_outputs"):
- self.map_delete_outputs(o, res)
diff --git a/xos/synchronizers/new_base/__init__.py b/xos/synchronizers/new_base/__init__.py
deleted file mode 100644
index b0fb0b2..0000000
--- a/xos/synchronizers/new_base/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
diff --git a/xos/synchronizers/new_base/ansible_helper.py b/xos/synchronizers/new_base/ansible_helper.py
deleted file mode 100644
index c607607..0000000
--- a/xos/synchronizers/new_base/ansible_helper.py
+++ /dev/null
@@ -1,325 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from __future__ import print_function
-import jinja2
-import tempfile
-import os
-import json
-import pickle
-import pdb
-import string
-import random
-import re
-import traceback
-import subprocess
-import threading
-
-from multiprocessing import Process, Queue
-from xosconfig import Config
-
-from multistructlog import create_logger
-
-log = create_logger(Config().get("logging"))
-
-
-step_dir = Config.get("steps_dir")
-sys_dir = Config.get("sys_dir")
-
-os_template_loader = jinja2.FileSystemLoader(
- searchpath=[step_dir, "/opt/xos/synchronizers/shared_templates"]
-)
-os_template_env = jinja2.Environment(loader=os_template_loader)
-
-
-def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
- return "".join(random.choice(chars) for _ in range(size))
-
-
-def shellquote(s):
- return "'" + s.replace("'", "'\\''") + "'"
-
-
-def get_playbook_fn(opts, path):
- if not opts.get("ansible_tag", None):
- # if no ansible_tag is in the options, then generate a unique one
- objname = id_generator()
- opts = opts.copy()
- opts["ansible_tag"] = objname
-
- objname = opts["ansible_tag"]
-
- pathed_sys_dir = os.path.join(sys_dir, path)
- if not os.path.isdir(pathed_sys_dir):
- os.makedirs(pathed_sys_dir)
-
- # symlink steps/roles into sys/roles so that playbooks can access roles
- roledir = os.path.join(step_dir, "roles")
- rolelink = os.path.join(pathed_sys_dir, "roles")
- if os.path.isdir(roledir) and not os.path.islink(rolelink):
- os.symlink(roledir, rolelink)
-
- return (opts, os.path.join(pathed_sys_dir, objname))
-
-
-def run_playbook(ansible_hosts, ansible_config, fqp, opts):
- args = {
- "ansible_hosts": ansible_hosts,
- "ansible_config": ansible_config,
- "fqp": fqp,
- "opts": opts,
- "config_file": Config.get_config_file(),
- }
-
- keep_temp_files = Config.get("keep_temp_files")
-
- dir = tempfile.mkdtemp()
- args_fn = None
- result_fn = None
- try:
- log.info("creating args file", dir=dir)
-
- args_fn = os.path.join(dir, "args")
- result_fn = os.path.join(dir, "result")
-
- open(args_fn, "w").write(pickle.dumps(args))
-
- ansible_main_fn = os.path.join(os.path.dirname(__file__), "ansible_main.py")
-
- os.system("python %s %s %s" % (ansible_main_fn, args_fn, result_fn))
-
- result = pickle.loads(open(result_fn).read())
-
- if hasattr(result, "exception"):
- log.error("Exception in playbook", exception=result["exception"])
-
- stats = result.get("stats", None)
- aresults = result.get("aresults", None)
- except Exception as e:
- log.exception("Exception running ansible_main")
- stats = None
- aresults = None
- finally:
- if not keep_temp_files:
- if args_fn and os.path.exists(args_fn):
- os.remove(args_fn)
- if result_fn and os.path.exists(result_fn):
- os.remove(result_fn)
- os.rmdir(dir)
-
- return (stats, aresults)
-
-
-def run_template(
- name,
- opts,
- path="",
- expected_num=None,
- ansible_config=None,
- ansible_hosts=None,
- run_ansible_script=None,
- object=None,
-):
- template = os_template_env.get_template(name)
- buffer = template.render(opts)
-
- (opts, fqp) = get_playbook_fn(opts, path)
-
- f = open(fqp, "w")
- f.write(buffer)
- f.flush()
-
- """
- q = Queue()
- p = Process(target=run_playbook, args=(ansible_hosts, ansible_config, fqp, opts, q,))
- p.start()
- stats,aresults = q.get()
- p.join()
- """
- stats, aresults = run_playbook(ansible_hosts, ansible_config, fqp, opts)
-
- error_msg = []
-
- output_file = fqp + ".out"
- try:
- if aresults is None:
- raise ValueError("Error executing playbook %s" % fqp)
-
- ok_results = []
- total_unreachable = 0
- failed = 0
-
- ofile = open(output_file, "w")
-
- for x in aresults:
- if not x.is_failed() and not x.is_unreachable() and not x.is_skipped():
- ok_results.append(x)
- elif x.is_unreachable():
- failed += 1
- total_unreachable += 1
- try:
- error_msg.append(x._result["msg"])
- except BaseException:
- pass
- elif x.is_failed():
- failed += 1
- try:
- error_msg.append(x._result["msg"])
- except BaseException:
- pass
-
- # FIXME (zdw, 2017-02-19) - may not be needed with new callback logging
-
- ofile.write("%s: %s\n" % (x._task, str(x._result)))
-
- if object:
- oprops = object.tologdict()
- ansible = x._result
- oprops["xos_type"] = "ansible"
- oprops["ansible_result"] = json.dumps(ansible)
-
- if failed == 0:
- oprops["ansible_status"] = "OK"
- else:
- oprops["ansible_status"] = "FAILED"
-
- log.info("Ran Ansible task", task=x._task, **oprops)
-
- ofile.close()
-
- if (expected_num is not None) and (len(ok_results) != expected_num):
- raise ValueError(
- "Unexpected num %s!=%d" % (str(expected_num), len(ok_results))
- )
-
- if failed:
- raise ValueError("Ansible playbook failed.")
-
- # NOTE(smbaker): Playbook errors are slipping through where `aresults` does not show any failed tasks, but
- # `stats` does show them. See CORD-3169.
- hosts = sorted(stats.processed.keys())
- for h in hosts:
- t = stats.summarize(h)
- if t["unreachable"] > 0:
- raise ValueError(
- "Ansible playbook reported unreachable for host %s" % h
- )
- if t["failures"] > 0:
- raise ValueError("Ansible playbook reported failures for host %s" % h)
-
- except ValueError as e:
- if error_msg:
- try:
- error = " // ".join(error_msg)
- except BaseException:
- error = "failed to join error_msg"
- raise Exception(error)
- else:
- raise
-
- processed_results = map(lambda x: x._result, ok_results)
- return processed_results[1:] # 0 is setup
-
-
-def run_template_ssh(name, opts, path="", expected_num=None, object=None):
- instance_name = opts["instance_name"]
- hostname = opts["hostname"]
- private_key = opts["private_key"]
- baremetal_ssh = opts.get("baremetal_ssh", False)
- if baremetal_ssh:
- # no instance_id or ssh_ip for baremetal
- # we never proxy to baremetal
- proxy_ssh = False
- else:
- instance_id = opts["instance_id"]
- ssh_ip = opts["ssh_ip"]
- proxy_ssh = Config.get("proxy_ssh.enabled")
-
- if not ssh_ip:
- raise Exception("IP of ssh proxy not available. Synchronization deferred")
-
- (opts, fqp) = get_playbook_fn(opts, path)
- private_key_pathname = fqp + ".key"
- config_pathname = fqp + ".cfg"
- hosts_pathname = fqp + ".hosts"
-
- f = open(private_key_pathname, "w")
- f.write(private_key)
- f.close()
-
- f = open(config_pathname, "w")
- f.write("[ssh_connection]\n")
- if proxy_ssh:
- proxy_ssh_key = Config.get("proxy_ssh.key")
- proxy_ssh_user = Config.get("proxy_ssh.user")
- if proxy_ssh_key:
- # If proxy_ssh_key is known, then we can proxy into the compute
- # node without needing to have the OpenCloud sshd machinery in
- # place.
- proxy_command = (
- "ProxyCommand ssh -q -i %s -o StrictHostKeyChecking=no %s@%s nc %s 22"
- % (proxy_ssh_key, proxy_ssh_user, hostname, ssh_ip)
- )
- else:
- proxy_command = (
- "ProxyCommand ssh -q -i %s -o StrictHostKeyChecking=no %s@%s"
- % (private_key_pathname, instance_id, hostname)
- )
- f.write('ssh_args = -o "%s"\n' % proxy_command)
- f.write("scp_if_ssh = True\n")
- f.write("pipelining = True\n")
- f.write("\n[defaults]\n")
- f.write("host_key_checking = False\n")
- f.write("timeout = 30\n")
- f.close()
-
- f = open(hosts_pathname, "w")
- f.write("[%s]\n" % instance_name)
- f.write("%s ansible_ssh_private_key_file=%s\n" % (ssh_ip, private_key_pathname))
- f.close()
-
- # SSH will complain if private key is world or group readable
- os.chmod(private_key_pathname, 0o600)
-
- print("ANSIBLE_CONFIG=%s" % config_pathname)
- print("ANSIBLE_HOSTS=%s" % hosts_pathname)
-
- return run_template(
- name,
- opts,
- path,
- ansible_config=config_pathname,
- ansible_hosts=hosts_pathname,
- run_ansible_script="/opt/xos/synchronizers/base/run_ansible_verbose",
- object=object,
- )
-
-
-def main():
- run_template(
- "ansible/sync_user_deployments.yaml",
- {
- "endpoint": "http://172.31.38.128:5000/v2.0/",
- "name": "Sapan Bhatia",
- "email": "gwsapan@gmail.com",
- "password": "foobar",
- "admin_user": "admin",
- "admin_password": "6a789bf69dd647e2",
- "admin_tenant": "admin",
- "tenant": "demo",
- "roles": ["user", "admin"],
- },
- )
diff --git a/xos/synchronizers/new_base/ansible_main.py b/xos/synchronizers/new_base/ansible_main.py
deleted file mode 100644
index 08283a4..0000000
--- a/xos/synchronizers/new_base/ansible_main.py
+++ /dev/null
@@ -1,80 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import os
-import pickle
-import sys
-
-# import json
-import traceback
-from xosconfig import Config
-
-sys.path.append("/opt/xos")
-
-
-def run_playbook(ansible_hosts, ansible_config, fqp, opts):
- try:
- if ansible_config:
- os.environ["ANSIBLE_CONFIG"] = ansible_config
- else:
- try:
- del os.environ["ANSIBLE_CONFIG"]
- except KeyError:
- pass
-
- if ansible_hosts:
- os.environ["ANSIBLE_HOSTS"] = ansible_hosts
- else:
- try:
- del os.environ["ANSIBLE_HOSTS"]
- except KeyError:
- pass
-
- import ansible_runner
-
- reload(ansible_runner)
-
- # Dropped support for observer_pretend - to be redone
- runner = ansible_runner.Runner(
- playbook=fqp, run_data=opts, host_file=ansible_hosts
- )
-
- stats, aresults = runner.run()
- except Exception as e:
- return {"stats": None, "aresults": None, "exception": traceback.format_exc()}
-
- return {"stats": stats, "aresults": aresults}
-
-
-def main():
- input_fn = sys.argv[1]
- result_fn = sys.argv[2]
-
- args = pickle.loads(open(input_fn).read())
-
- Config.init(args["config_file"], "synchronizer-config-schema.yaml")
-
- ansible_hosts = args["ansible_hosts"]
- ansible_config = args["ansible_config"]
- fqp = args["fqp"]
- opts = args["opts"]
-
- result = run_playbook(ansible_hosts, ansible_config, fqp, opts)
-
- open(result_fn, "w").write(pickle.dumps(result))
-
-
-if __name__ == "__main__":
- main()
diff --git a/xos/synchronizers/new_base/ansible_runner.py b/xos/synchronizers/new_base/ansible_runner.py
deleted file mode 100644
index d20feb5..0000000
--- a/xos/synchronizers/new_base/ansible_runner.py
+++ /dev/null
@@ -1,388 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from multistructlog import create_logger
-from xosconfig import Config
-from ansible.plugins.callback import CallbackBase
-from ansible.utils.display import Display
-from ansible.executor import playbook_executor
-from ansible.parsing.dataloader import DataLoader
-from ansible.vars.manager import VariableManager
-from ansible.inventory.manager import InventoryManager
-from tempfile import NamedTemporaryFile
-import os
-import sys
-import pdb
-import json
-import uuid
-
-from ansible import constants
-
-constants = reload(constants)
-
-
-log = create_logger(Config().get("logging"))
-
-
-class ResultCallback(CallbackBase):
-
- CALLBACK_VERSION = 2.0
- CALLBACK_NAME = "resultcallback"
- CALLBACK_TYPE = "programmatic"
-
- def __init__(self):
- super(ResultCallback, self).__init__()
- self.results = []
- self.uuid = str(uuid.uuid1())
- self.playbook_status = "OK"
-
- def v2_playbook_on_start(self, playbook):
- self.playbook = playbook._file_name
- log_extra = {
- "xos_type": "ansible",
- "ansible_uuid": self.uuid,
- "ansible_type": "playbook start",
- "ansible_status": "OK",
- "ansible_playbook": self.playbook,
- }
- log.info("PLAYBOOK START", playbook=self.playbook, **log_extra)
-
- def v2_playbook_on_stats(self, stats):
- host_stats = {}
- for host in stats.processed.keys():
- host_stats[host] = stats.summarize(host)
-
- log_extra = {
- "xos_type": "ansible",
- "ansible_uuid": self.uuid,
- "ansible_type": "playbook stats",
- "ansible_status": self.playbook_status,
- "ansible_playbook": self.playbook,
- "ansible_result": json.dumps(host_stats),
- }
-
- if self.playbook_status == "OK":
- log.info("PLAYBOOK END", playbook=self.playbook, **log_extra)
- else:
- log.error("PLAYBOOK END", playbook=self.playbook, **log_extra)
-
- def v2_playbook_on_play_start(self, play):
- log_extra = {
- "xos_type": "ansible",
- "ansible_uuid": self.uuid,
- "ansible_type": "play start",
- "ansible_status": self.playbook_status,
- "ansible_playbook": self.playbook,
- }
- log.debug("PLAY START", play_name=play.name, **log_extra)
-
- def v2_runner_on_ok(self, result, **kwargs):
- log_extra = {
- "xos_type": "ansible",
- "ansible_uuid": self.uuid,
- "ansible_type": "task",
- "ansible_status": "OK",
- "ansible_result": json.dumps(result._result),
- "ansible_task": result._task,
- "ansible_playbook": self.playbook,
- "ansible_host": result._host.get_name(),
- }
- log.debug("OK", task=str(result._task), **log_extra)
- self.results.append(result)
-
- def v2_runner_on_failed(self, result, **kwargs):
- self.playbook_status = "FAILED"
- log_extra = {
- "xos_type": "ansible",
- "ansible_uuid": self.uuid,
- "ansible_type": "task",
- "ansible_status": "FAILED",
- "ansible_result": json.dumps(result._result),
- "ansible_task": result._task,
- "ansible_playbook": self.playbook,
- "ansible_host": result._host.get_name(),
- }
- log.error("FAILED", task=str(result._task), **log_extra)
- self.results.append(result)
-
- def v2_runner_on_async_failed(self, result, **kwargs):
- self.playbook_status = "FAILED"
- log_extra = {
- "xos_type": "ansible",
- "ansible_uuid": self.uuid,
- "ansible_type": "task",
- "ansible_status": "ASYNC FAILED",
- "ansible_result": json.dumps(result._result),
- "ansible_task": result._task,
- "ansible_playbook": self.playbook,
- "ansible_host": result._host.get_name(),
- }
- log.error("ASYNC FAILED", task=str(result._task), **log_extra)
-
- def v2_runner_on_skipped(self, result, **kwargs):
- log_extra = {
- "xos_type": "ansible",
- "ansible_uuid": self.uuid,
- "ansible_type": "task",
- "ansible_status": "SKIPPED",
- "ansible_result": json.dumps(result._result),
- "ansible_task": result._task,
- "ansible_playbook": self.playbook,
- "ansible_host": result._host.get_name(),
- }
- log.debug("SKIPPED", task=str(result._task), **log_extra)
- self.results.append(result)
-
- def v2_runner_on_unreachable(self, result, **kwargs):
- log_extra = {
- "xos_type": "ansible",
- "ansible_uuid": self.uuid,
- "ansible_type": "task",
- "ansible_status": "UNREACHABLE",
- "ansible_result": json.dumps(result._result),
- "ansible_task": result._task,
- "ansible_playbook": self.playbook,
- "ansible_host": result._host.get_name(),
- }
- log.error("UNREACHABLE", task=str(result._task), **log_extra)
- self.results.append(result)
-
- def v2_runner_retry(self, result, **kwargs):
- log_extra = {
- "xos_type": "ansible",
- "ansible_uuid": self.uuid,
- "ansible_type": "task",
- "ansible_status": "RETRY",
- "ansible_result": json.dumps(result._result),
- "ansible_task": result._task,
- "ansible_playbook": self.playbook,
- "ansible_host": result._host.get_name(),
- }
- log.warning(
- "RETRYING - attempt",
- task=str(result._task),
- attempt=result._result["attempts"],
- **log_extra
- )
- self.results.append(result)
-
- def v2_playbook_on_handler_task_start(self, task, **kwargs):
- log_extra = {
- "xos_type": "ansible",
- "ansible_uuid": self.uuid,
- "ansible_type": "task",
- "ansible_status": "HANDLER",
- "ansible_task": task.get_name().strip(),
- "ansible_playbook": self.playbook,
- # 'ansible_host': result._host.get_name()
- }
- log.debug("HANDLER", task=task.get_name().strip(), **log_extra)
-
- def v2_playbook_on_import_for_host(self, result, imported_file):
- log_extra = {
- "xos_type": "ansible",
- "ansible_uuid": self.uuid,
- "ansible_type": "import",
- "ansible_status": "IMPORT",
- "ansible_result": json.dumps(result._result),
- "ansible_playbook": self.playbook,
- "ansible_host": result._host.get_name(),
- }
- log.debug("IMPORT", imported_file=imported_file, **log_extra)
- self.results.append(result)
-
- def v2_playbook_on_not_import_for_host(self, result, missing_file):
- log_extra = {
- "xos_type": "ansible",
- "ansible_uuid": self.uuid,
- "ansible_type": "import",
- "ansible_status": "MISSING IMPORT",
- "ansible_result": json.dumps(result._result),
- "ansible_playbook": self.playbook,
- "ansible_host": result._host.get_name(),
- }
- log.debug("MISSING IMPORT", missing=missing_file, **log_extra)
- self.results.append(result)
-
-
-class Options(object):
- """
- Options class to replace Ansible OptParser
- """
-
- def __init__(
- self,
- ask_pass=None,
- ask_su_pass=None,
- ask_sudo_pass=None,
- become=None,
- become_ask_pass=None,
- become_method=None,
- become_user=None,
- check=None,
- connection=None,
- diff=None,
- flush_cache=None,
- force_handlers=None,
- forks=1,
- listtags=None,
- listtasks=None,
- module_path=None,
- new_vault_password_file=None,
- one_line=None,
- output_file=None,
- poll_interval=None,
- private_key_file=None,
- remote_user=None,
- scp_extra_args=None,
- seconds=None,
- sftp_extra_args=None,
- skip_tags=None,
- ssh_common_args=None,
- ssh_extra_args=None,
- sudo=None,
- sudo_user=None,
- syntax=None,
- tags=None,
- timeout=None,
- tree=None,
- vault_password_files=None,
- ask_vault_pass=None,
- extra_vars=None,
- inventory=None,
- listhosts=None,
- module_paths=None,
- subset=None,
- verbosity=None,
- ):
-
- if tags:
- self.tags = tags
-
- if skip_tags:
- self.skip_tags = skip_tags
-
- self.ask_pass = ask_pass
- self.ask_su_pass = ask_su_pass
- self.ask_sudo_pass = ask_sudo_pass
- self.ask_vault_pass = ask_vault_pass
- self.become = become
- self.become_ask_pass = become_ask_pass
- self.become_method = become_method
- self.become_user = become_user
- self.check = check
- self.connection = connection
- self.diff = diff
- self.extra_vars = extra_vars
- self.flush_cache = flush_cache
- self.force_handlers = force_handlers
- self.forks = forks
- self.inventory = inventory
- self.listhosts = listhosts
- self.listtags = listtags
- self.listtasks = listtasks
- self.module_path = module_path
- self.module_paths = module_paths
- self.new_vault_password_file = new_vault_password_file
- self.one_line = one_line
- self.output_file = output_file
- self.poll_interval = poll_interval
- self.private_key_file = private_key_file
- self.remote_user = remote_user
- self.scp_extra_args = scp_extra_args
- self.seconds = seconds
- self.sftp_extra_args = sftp_extra_args
- self.ssh_common_args = ssh_common_args
- self.ssh_extra_args = ssh_extra_args
- self.subset = subset
- self.sudo = sudo
- self.sudo_user = sudo_user
- self.syntax = syntax
- self.timeout = timeout
- self.tree = tree
- self.vault_password_files = vault_password_files
- self.verbosity = verbosity
-
-
-class Runner(object):
- def __init__(
- self, playbook, run_data, private_key_file=None, verbosity=0, host_file=None
- ):
-
- self.playbook = playbook
- self.run_data = run_data
-
- self.options = Options()
- self.options.output_file = playbook + ".result"
- self.options.private_key_file = private_key_file
- self.options.verbosity = verbosity
- self.options.connection = "ssh" # Need a connection type "smart" or "ssh"
- # self.options.become = True
- self.options.become_method = "sudo"
- self.options.become_user = "root"
-
- # Set global verbosity
- self.display = Display()
- self.display.verbosity = self.options.verbosity
- # Executor appears to have it's own
- # verbosity object/setting as well
- playbook_executor.verbosity = self.options.verbosity
-
- # Become Pass Needed if not logging in as user root
- # passwords = {'become_pass': become_pass}
-
- # Gets data from YAML/JSON files
- self.loader = DataLoader()
- try:
- self.loader.set_vault_password(os.environ["VAULT_PASS"])
- except AttributeError:
- pass
-
- # Set inventory, using most of above objects
- if host_file:
- self.inventory = InventoryManager(loader=self.loader, sources=host_file)
- else:
- self.inventory = InventoryManager(loader=self.loader)
-
- # All the variables from all the various places
- self.variable_manager = VariableManager(
- loader=self.loader, inventory=self.inventory
- )
- self.variable_manager.extra_vars = {} # self.run_data
-
- # Setup playbook executor, but don't run until run() called
- self.pbex = playbook_executor.PlaybookExecutor(
- playbooks=[playbook],
- inventory=self.inventory,
- variable_manager=self.variable_manager,
- loader=self.loader,
- options=self.options,
- passwords={},
- )
-
- def run(self):
- os.environ[
- "REQUESTS_CA_BUNDLE"
- ] = "/usr/local/share/ca-certificates/local_certs.crt"
- callback = ResultCallback()
- self.pbex._tqm._stdout_callback = callback
-
- self.pbex.run()
- stats = self.pbex._tqm._stats
-
- # os.remove(self.hosts.name)
-
- return stats, callback.results
diff --git a/xos/synchronizers/new_base/apiaccessor.py b/xos/synchronizers/new_base/apiaccessor.py
deleted file mode 100644
index a56381b..0000000
--- a/xos/synchronizers/new_base/apiaccessor.py
+++ /dev/null
@@ -1,92 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from modelaccessor import ModelAccessor
-import datetime
-import time
-
-
-class CoreApiModelAccessor(ModelAccessor):
- def __init__(self, orm):
- self.orm = orm
- super(CoreApiModelAccessor, self).__init__()
-
- def get_all_model_classes(self):
- all_model_classes = {}
- for k in self.orm.all_model_names:
- all_model_classes[k] = getattr(self.orm, k)
- return all_model_classes
-
- def fetch_pending(self, main_objs, deletion=False):
- if not isinstance(main_objs, list):
- main_objs = [main_objs]
-
- objs = []
- for main_obj in main_objs:
- if not deletion:
- lobjs = main_obj.objects.filter_special(
- main_obj.objects.SYNCHRONIZER_DIRTY_OBJECTS
- )
- else:
- lobjs = main_obj.objects.filter_special(
- main_obj.objects.SYNCHRONIZER_DELETED_OBJECTS
- )
- objs.extend(lobjs)
-
- return objs
-
- def fetch_policies(self, main_objs, deletion=False):
- if not isinstance(main_objs, list):
- main_objs = [main_objs]
-
- objs = []
- for main_obj in main_objs:
- if not deletion:
- lobjs = main_obj.objects.filter_special(
- main_obj.objects.SYNCHRONIZER_DIRTY_POLICIES
- )
- else:
- lobjs = main_obj.objects.filter_special(
- main_obj.objects.SYNCHRONIZER_DELETED_POLICIES
- )
- objs.extend(lobjs)
-
- return objs
-
- def obj_exists(self, o):
- # gRPC will default id to '0' for uninitialized objects
- return (o.id is not None) and (o.id != 0)
-
- def obj_in_list(self, o, olist):
- ids = [x.id for x in olist]
- return o.id in ids
-
- def now(self):
- """ Return the current time for timestamping purposes """
- return (
- datetime.datetime.utcnow() - datetime.datetime.fromtimestamp(0)
- ).total_seconds()
-
- def is_type(self, obj, name):
- return obj._wrapped_class.__class__.__name__ == name
-
- def is_instance(self, obj, name):
- return name in obj.class_names.split(",")
-
- def get_content_type_id(self, obj):
- return obj.self_content_type_id
-
- def create_obj(self, cls, **kwargs):
- return cls.objects.new(**kwargs)
diff --git a/xos/synchronizers/new_base/backend.py b/xos/synchronizers/new_base/backend.py
deleted file mode 100644
index 2074445..0000000
--- a/xos/synchronizers/new_base/backend.py
+++ /dev/null
@@ -1,166 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from __future__ import print_function
-import os
-import inspect
-import imp
-import sys
-import threading
-import time
-from synchronizers.new_base.syncstep import SyncStep
-from synchronizers.new_base.event_loop import XOSObserver
-from synchronizers.new_base.model_policy_loop import XOSPolicyEngine
-from synchronizers.new_base.event_engine import XOSEventEngine
-from synchronizers.new_base.pull_step_engine import XOSPullStepEngine
-from synchronizers.new_base.modelaccessor import *
-
-from xosconfig import Config
-from multistructlog import create_logger
-
-log = create_logger(Config().get("logging"))
-
-
-class Backend:
- def __init__(self, log=log):
- self.log = log
- pass
-
- def load_sync_step_modules(self, step_dir):
- sync_steps = []
-
- self.log.info("Loading sync steps", step_dir=step_dir)
-
- for fn in os.listdir(step_dir):
- pathname = os.path.join(step_dir, fn)
- if (
- os.path.isfile(pathname)
- and fn.endswith(".py")
- and (fn != "__init__.py")
- and (not fn.startswith("test"))
- ):
-
- # we need to extend the path to load modules in the step_dir
- sys_path_save = sys.path
- sys.path.append(step_dir)
- module = imp.load_source(fn[:-3], pathname)
-
- self.log.debug("Loaded file: %s", pathname)
-
- # reset the original path
- sys.path = sys_path_save
-
- for classname in dir(module):
- c = getattr(module, classname, None)
-
- # if classname.startswith("Sync"):
- # print classname, c, inspect.isclass(c), issubclass(c, SyncStep), hasattr(c,"provides")
-
- # make sure 'c' is a descendent of SyncStep and has a
- # provides field (this eliminates the abstract base classes
- # since they don't have a provides)
-
- if inspect.isclass(c):
- bases = inspect.getmro(c)
- base_names = [b.__name__ for b in bases]
- if (
- ("SyncStep" in base_names)
- and (hasattr(c, "provides") or hasattr(c, "observes"))
- and (c not in sync_steps)
- ):
- sync_steps.append(c)
-
- self.log.info("Loaded sync steps", steps=sync_steps)
-
- return sync_steps
-
- def run(self):
- observer_thread = None
- model_policy_thread = None
- event_engine = None
-
- steps_dir = Config.get("steps_dir")
- if steps_dir:
- sync_steps = []
-
- # load sync_steps
- if steps_dir:
- sync_steps = self.load_sync_step_modules(steps_dir)
-
- # if we have at least one sync_step
- if len(sync_steps) > 0:
- # start the observer
- self.log.info("Starting XOSObserver", sync_steps=sync_steps)
- observer = XOSObserver(sync_steps, self.log)
- observer_thread = threading.Thread(
- target=observer.run, name="synchronizer"
- )
- observer_thread.start()
-
- else:
- self.log.info("Skipping observer thread due to no steps dir.")
-
- pull_steps_dir = Config.get("pull_steps_dir")
- if pull_steps_dir:
- self.log.info("Starting XOSPullStepEngine", pull_steps_dir=pull_steps_dir)
- pull_steps_engine = XOSPullStepEngine()
- pull_steps_engine.load_pull_step_modules(pull_steps_dir)
- pull_steps_thread = threading.Thread(
- target=pull_steps_engine.start, name="pull_step_engine"
- )
- pull_steps_thread.start()
- else:
- self.log.info("Skipping pull step engine due to no pull_steps_dir dir.")
-
- event_steps_dir = Config.get("event_steps_dir")
- if event_steps_dir:
- self.log.info("Starting XOSEventEngine", event_steps_dir=event_steps_dir)
- event_engine = XOSEventEngine(self.log)
- event_engine.load_event_step_modules(event_steps_dir)
- event_engine.start()
- else:
- self.log.info("Skipping event engine due to no event_steps dir.")
-
- # start model policies thread
- policies_dir = Config.get("model_policies_dir")
- if policies_dir:
- policy_engine = XOSPolicyEngine(policies_dir=policies_dir, log=self.log)
- model_policy_thread = threading.Thread(
- target=policy_engine.run, name="policy_engine"
- )
- model_policy_thread.is_policy_thread = True
- model_policy_thread.start()
- else:
- self.log.info(
- "Skipping model policies thread due to no model_policies dir."
- )
-
- if (not observer_thread) and (not model_policy_thread) and (not event_engine):
- self.log.info(
- "No sync steps, no policies, and no event steps. Synchronizer exiting."
- )
- # the caller will exit with status 0
- return
-
- while True:
- try:
- time.sleep(1000)
- except KeyboardInterrupt:
- print("exiting due to keyboard interrupt")
- # TODO: See about setting the threads as daemons
- if observer_thread:
- observer_thread._Thread__stop()
- if model_policy_thread:
- model_policy_thread._Thread__stop()
- sys.exit(1)
diff --git a/xos/synchronizers/new_base/backend_modelpolicy.py b/xos/synchronizers/new_base/backend_modelpolicy.py
deleted file mode 100644
index a8e826b..0000000
--- a/xos/synchronizers/new_base/backend_modelpolicy.py
+++ /dev/null
@@ -1,51 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from __future__ import print_function
-import os
-import inspect
-import imp
-import sys
-import threading
-import time
-from syncstep import SyncStep
-from synchronizers.new_base.event_loop import XOSObserver
-
-from xosconfig import Config
-from multistructlog import create_logger
-
-log = create_logger(Config().get("logging"))
-
-
-class Backend:
- def run(self):
- # start model policies thread
- policies_dir = Config("model_policies_dir")
- if policies_dir:
- from synchronizers.model_policy import run_policy
-
- model_policy_thread = threading.Thread(target=run_policy)
- model_policy_thread.start()
- else:
- model_policy_thread = None
- log.info("Skipping model policies thread due to no model_policies dir.")
-
- while True:
- try:
- time.sleep(1000)
- except KeyboardInterrupt:
- print("exiting due to keyboard interrupt")
- if model_policy_thread:
- model_policy_thread._Thread__stop()
- sys.exit(1)
diff --git a/xos/synchronizers/new_base/dependency_walker_new.py b/xos/synchronizers/new_base/dependency_walker_new.py
deleted file mode 100644
index 138c26d..0000000
--- a/xos/synchronizers/new_base/dependency_walker_new.py
+++ /dev/null
@@ -1,119 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-#!/usr/bin/env python
-
-# TODO: Moved this into the synchronizer, as it appeared to require model
-# access. Verify whether or not that's true and reconcile with
-# generate/dependency_walker.py
-
-from __future__ import print_function
-import os
-import imp
-import inspect
-import time
-import traceback
-import commands
-import threading
-from xosconfig import Config
-import json
-
-from xosconfig import Config
-from multistructlog import create_logger
-
-log = create_logger(Config().get("logging"))
-
-missing_links = {}
-
-if Config.get("dependency_graph"):
- dep_data = open(Config.get("dependency_graph")).read()
-else:
- dep_data = "{}"
-
-dependencies = json.loads(dep_data)
-dependencies = {k: [item[0] for item in items] for k, items in dependencies.items()}
-
-inv_dependencies = {}
-for k, lst in dependencies.items():
- for v in lst:
- try:
- inv_dependencies[v].append(k)
- except KeyError:
- inv_dependencies[v] = [k]
-
-
-def plural(name):
- if name.endswith("s"):
- return name + "es"
- else:
- return name + "s"
-
-
-def walk_deps(fn, object):
- model = object.__class__.__name__
- try:
- deps = dependencies[model]
- except BaseException:
- deps = []
- return __walk_deps(fn, object, deps)
-
-
-def walk_inv_deps(fn, object):
- model = object.__class__.__name__
- try:
- deps = inv_dependencies[model]
- except BaseException:
- deps = []
- return __walk_deps(fn, object, deps)
-
-
-def __walk_deps(fn, object, deps):
- model = object.__class__.__name__
- ret = []
- for dep in deps:
- # print "Checking dep %s"%dep
- peer = None
- link = dep.lower()
- try:
- peer = getattr(object, link)
- except AttributeError:
- link = plural(link)
- try:
- peer = getattr(object, link)
- except AttributeError:
- if model + "." + link not in missing_links:
- print("Model %s missing link for dependency %s" % (model, link))
- log.exception(
- "WARNING: Model missing link for dependency.",
- model=model,
- link=link,
- )
- missing_links[model + "." + link] = True
-
- if peer:
- try:
- peer_objects = peer.all()
- except AttributeError:
- peer_objects = [peer]
- except BaseException:
- peer_objects = []
-
- for o in peer_objects:
- if hasattr(o, "updated"):
- fn(o, object)
- ret.append(o)
- # Uncomment the following line to enable recursion
- # walk_inv_deps(fn, o)
- return ret
diff --git a/xos/synchronizers/new_base/event_engine.py b/xos/synchronizers/new_base/event_engine.py
deleted file mode 100644
index e5e18d1..0000000
--- a/xos/synchronizers/new_base/event_engine.py
+++ /dev/null
@@ -1,216 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import confluent_kafka
-import imp
-import inspect
-import os
-import threading
-import time
-from xosconfig import Config
-
-
-class XOSKafkaMessage:
- def __init__(self, consumer_msg):
-
- self.topic = consumer_msg.topic()
- self.key = consumer_msg.key()
- self.value = consumer_msg.value()
-
- self.timestamp = None
- (ts_type, ts_val) = consumer_msg.timestamp()
-
- if ts_type is not confluent_kafka.TIMESTAMP_NOT_AVAILABLE:
- self.timestamp = ts_val
-
-
-class XOSKafkaThread(threading.Thread):
- """ XOSKafkaThread
-
- A Thread for servicing Kafka events. There is one event_step associated with one XOSKafkaThread. A
- Consumer is launched to listen on the topics specified by the thread. The thread's process_event()
- function is called for each event.
- """
-
- def __init__(self, step, bootstrap_servers, log, *args, **kwargs):
- super(XOSKafkaThread, self).__init__(*args, **kwargs)
- self.consumer = None
- self.step = step
- self.bootstrap_servers = bootstrap_servers
- self.log = log
- self.daemon = True
-
- def create_kafka_consumer(self):
- # use the service name as the group id
- consumer_config = {
- "group.id": Config().get("name"),
- "bootstrap.servers": ",".join(self.bootstrap_servers),
- "default.topic.config": {"auto.offset.reset": "smallest"},
- }
-
- return confluent_kafka.Consumer(**consumer_config)
-
- def run(self):
- if (not self.step.topics) and (not self.step.pattern):
- raise Exception(
- "Neither topics nor pattern is defined for step %s" % self.step.__name__
- )
-
- if self.step.topics and self.step.pattern:
- raise Exception(
- "Both topics and pattern are defined for step %s. Choose one."
- % self.step.__name__
- )
-
- self.log.info(
- "Waiting for events",
- topic=self.step.topics,
- pattern=self.step.pattern,
- step=self.step.__name__,
- )
-
- while True:
- try:
- # setup consumer or loop on failure
- if self.consumer is None:
- self.consumer = self.create_kafka_consumer()
-
- if self.step.topics:
- self.consumer.subscribe(self.step.topics)
-
- elif self.step.pattern:
- self.consumer.subscribe(self.step.pattern)
-
- except confluent_kafka.KafkaError._ALL_BROKERS_DOWN as e:
- self.log.warning(
- "No brokers available on %s, %s" % (self.bootstrap_servers, e)
- )
- time.sleep(20)
- continue
-
- except confluent_kafka.KafkaError as e:
- # Maybe Kafka has not started yet. Log the exception and try again in a second.
- self.log.exception("Exception in kafka loop: %s" % e)
- time.sleep(1)
- continue
-
- # wait until we get a message, if no message, loop again
- msg = self.consumer.poll(timeout=1.0)
-
- if msg is None:
- continue
-
- if msg.error():
- if msg.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:
- self.log.debug(
- "Reached end of kafka topic %s, partition: %s, offset: %d"
- % (msg.topic(), msg.partition(), msg.offset())
- )
- else:
- self.log.exception("Error in kafka message: %s" % msg.error())
-
- else:
- # wrap parsing the event in a class
- event_msg = XOSKafkaMessage(msg)
-
- self.log.info(
- "Processing event", event_msg=event_msg, step=self.step.__name__
- )
-
- try:
- self.step(log=self.log).process_event(event_msg)
-
- except BaseException:
- self.log.exception(
- "Exception in event step",
- event_msg=event_msg,
- step=self.step.__name__,
- )
-
-
-class XOSEventEngine(object):
- """ XOSEventEngine
-
- Subscribe to and handle processing of events. Two methods are defined:
-
- load_step_modules(dir) ... look for step modules in the given directory, and load objects that are
- descendant from EventStep.
-
- start() ... Launch threads to handle processing of the EventSteps. It's expected that load_step_modules()
- will be called before start().
- """
-
- def __init__(self, log):
- self.event_steps = []
- self.threads = []
- self.log = log
-
- def load_event_step_modules(self, event_step_dir):
- self.event_steps = []
- self.log.info("Loading event steps", event_step_dir=event_step_dir)
-
- # NOTE we'll load all the classes that inherit from EventStep
- for fn in os.listdir(event_step_dir):
- pathname = os.path.join(event_step_dir, fn)
- if (
- os.path.isfile(pathname)
- and fn.endswith(".py")
- and (fn != "__init__.py")
- and ("test" not in fn)
- ):
- event_module = imp.load_source(fn[:-3], pathname)
-
- for classname in dir(event_module):
- c = getattr(event_module, classname, None)
-
- if inspect.isclass(c):
- base_names = [b.__name__ for b in c.__bases__]
- if "EventStep" in base_names:
- self.event_steps.append(c)
- self.log.info("Loaded event steps", steps=self.event_steps)
-
- def start(self):
- eventbus_kind = Config.get("event_bus.kind")
- eventbus_endpoint = Config.get("event_bus.endpoint")
-
- if not eventbus_kind:
- self.log.error(
- "Eventbus kind is not configured in synchronizer config file."
- )
- return
-
- if eventbus_kind not in ["kafka"]:
- self.log.error(
- "Eventbus kind is set to a technology we do not implement.",
- eventbus_kind=eventbus_kind,
- )
- return
-
- if not eventbus_endpoint:
- self.log.error(
- "Eventbus endpoint is not configured in synchronizer config file."
- )
- return
-
- for step in self.event_steps:
- if step.technology == "kafka":
- thread = XOSKafkaThread(step, [eventbus_endpoint], self.log)
- thread.start()
- self.threads.append(thread)
- else:
- self.log.error(
- "Unknown technology. Skipping step",
- technology=step.technology,
- step=step.__name__,
- )
diff --git a/xos/synchronizers/new_base/event_loop.py b/xos/synchronizers/new_base/event_loop.py
deleted file mode 100644
index 15514a5..0000000
--- a/xos/synchronizers/new_base/event_loop.py
+++ /dev/null
@@ -1,773 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# TODO:
-# Add unit tests:
-# - 2 sets of Instance, ControllerSlice, ControllerNetworks - delete and create case
-
-import time
-import threading
-import json
-
-from collections import defaultdict
-from networkx import (
- DiGraph,
- weakly_connected_component_subgraphs,
- all_shortest_paths,
- NetworkXNoPath,
-)
-from networkx.algorithms.dag import topological_sort
-
-from synchronizers.new_base.steps import *
-from syncstep import InnocuousException, DeferredException, SyncStep
-from synchronizers.new_base.modelaccessor import *
-
-from xosconfig import Config
-from multistructlog import create_logger
-
-log = create_logger(Config().get("logging"))
-
-
-class StepNotReady(Exception):
- pass
-
-
-class ExternalDependencyFailed(Exception):
- pass
-
-
-# FIXME: Move drivers into a context shared across sync steps.
-
-
-class NoOpDriver:
- def __init__(self):
- self.enabled = True
- self.dependency_graph = None
-
-
-# Everyone gets NoOpDriver by default. To use a different driver, call
-# set_driver() below.
-DRIVER = NoOpDriver()
-
-DIRECT_EDGE = 1
-PROXY_EDGE = 2
-
-
-def set_driver(x):
- global DRIVER
- DRIVER = x
-
-
-class XOSObserver(object):
- sync_steps = []
-
- def __init__(self, sync_steps, log=log):
- # The Condition object via which events are received
- self.log = log
-
- self.step_lookup = {}
- self.sync_steps = sync_steps
- self.load_sync_steps()
-
- self.load_dependency_graph()
-
- self.event_cond = threading.Condition()
-
- self.driver = DRIVER
- self.observer_name = Config.get("name")
-
- def wait_for_event(self, timeout):
- self.event_cond.acquire()
- self.event_cond.wait(timeout)
- self.event_cond.release()
-
- def wake_up(self):
- self.log.debug("Wake up routine called")
- self.event_cond.acquire()
- self.event_cond.notify()
- self.event_cond.release()
-
- def load_dependency_graph(self):
-
- try:
- if Config.get("dependency_graph"):
- self.log.debug(
- "Loading model dependency graph",
- path=Config.get("dependency_graph"),
- )
- dep_graph_str = open(Config.get("dependency_graph")).read()
- else:
- self.log.debug("Using default model dependency graph", graph={})
- dep_graph_str = "{}"
-
- # joint_dependencies is of the form { Model1 -> [(Model2, src_port, dst_port), ...] }
- # src_port is the field that accesses Model2 from Model1
- # dst_port is the field that accesses Model1 from Model2
- static_dependencies = json.loads(dep_graph_str)
- dynamic_dependencies = self.compute_service_dependencies()
-
- joint_dependencies = dict(
- static_dependencies.items() + dynamic_dependencies
- )
-
- model_dependency_graph = DiGraph()
- for src_model, deps in joint_dependencies.items():
- for dep in deps:
- dst_model, src_accessor, dst_accessor = dep
- if src_model != dst_model:
- edge_label = {
- "src_accessor": src_accessor,
- "dst_accessor": dst_accessor,
- }
- model_dependency_graph.add_edge(
- src_model, dst_model, edge_label
- )
-
- model_dependency_graph_rev = model_dependency_graph.reverse(copy=True)
- self.model_dependency_graph = {
- # deletion
- True: model_dependency_graph_rev,
- False: model_dependency_graph,
- }
- self.log.debug("Loaded dependencies", edges=model_dependency_graph.edges())
- except Exception as e:
- self.log.exception("Error loading dependency graph", e=e)
- raise e
-
- def load_sync_steps(self):
- model_to_step = defaultdict(list)
- external_dependencies = []
-
- for s in self.sync_steps:
- if not isinstance(s.observes, list):
- observes = [s.observes]
- else:
- observes = s.observes
-
- for m in observes:
- model_to_step[m.__name__].append(s.__name__)
-
- try:
- external_dependencies.extend(s.external_dependencies)
- except AttributeError:
- pass
-
- self.step_lookup[s.__name__] = s
-
- self.model_to_step = model_to_step
- self.external_dependencies = list(set(external_dependencies))
- self.log.info(
- "Loaded external dependencies", external_dependencies=external_dependencies
- )
- self.log.info("Loaded model_map", **model_to_step)
-
- def reset_model_accessor(self, o=None):
- try:
- model_accessor.reset_queries()
- except BaseException:
- # this shouldn't happen, but in case it does, catch it...
- if o:
- logdict = o.tologdict()
- else:
- logdict = {}
-
- self.log.error("exception in reset_queries", **logdict)
-
- def delete_record(self, o, dr_log=None):
-
- if dr_log is None:
- dr_log = self.log
-
- if getattr(o, "backend_need_reap", False):
- # the object has already been deleted and marked for reaping
- model_accessor.journal_object(o, "syncstep.call.already_marked_reap")
- else:
- step = getattr(o, "synchronizer_step", None)
- if not step:
- raise ExternalDependencyFailed
-
- model_accessor.journal_object(o, "syncstep.call.delete_record")
-
- dr_log.debug("Deleting object", **o.tologdict())
-
- step.log = dr_log.new(step=step)
- step.delete_record(o)
- step.log = dr_log
-
- dr_log.debug("Deleted object", **o.tologdict())
-
- model_accessor.journal_object(o, "syncstep.call.delete_set_reap")
- o.backend_need_reap = True
- o.save(update_fields=["backend_need_reap"])
-
- def sync_record(self, o, sr_log=None):
- try:
- step = o.synchronizer_step
- except AttributeError:
- step = None
-
- if step is None:
- raise ExternalDependencyFailed
-
- if sr_log is None:
- sr_log = self.log
-
- # Mark this as an object that will require delete. Do
- # this now rather than after the syncstep,
- if not (o.backend_need_delete):
- o.backend_need_delete = True
- o.save(update_fields=["backend_need_delete"])
-
- model_accessor.journal_object(o, "syncstep.call.sync_record")
-
- sr_log.debug("Syncing object", **o.tologdict())
-
- step.log = sr_log.new(step=step)
- step.sync_record(o)
- step.log = sr_log
-
- sr_log.debug("Synced object", **o.tologdict())
-
- o.enacted = max(o.updated, o.changed_by_policy)
- scratchpad = {"next_run": 0, "exponent": 0, "last_success": time.time()}
- o.backend_register = json.dumps(scratchpad)
- o.backend_status = "OK"
- o.backend_code = 1
- model_accessor.journal_object(o, "syncstep.call.save_update")
- o.save(
- update_fields=[
- "enacted",
- "backend_status",
- "backend_register",
- "backend_code",
- ]
- )
-
- if hasattr(step, "after_sync_save"):
- step.log = sr_log.new(step=step)
- step.after_sync_save(o)
- step.log = sr_log
-
- sr_log.info("Saved sync object", o=o)
-
- """ This function needs a cleanup. FIXME: Rethink backend_status, backend_register """
-
- def handle_sync_exception(self, o, e):
- self.log.exception("sync step failed!", e=e, **o.tologdict())
- current_code = o.backend_code
-
- if hasattr(e, "message"):
- status = str(e.message)
- else:
- status = str(e)
-
- if isinstance(e, InnocuousException):
- code = 1
- elif isinstance(e, DeferredException):
- # NOTE if the synchronization is Deferred it means that synchronization is still in progress
- code = 0
- else:
- code = 2
-
- self.set_object_error(o, status, code)
-
- dependency_error = "Failed due to error in model %s id %d: %s" % (
- o.leaf_model_name,
- o.id,
- status,
- )
- return dependency_error, code
-
- def set_object_error(self, o, status, code):
- if o.backend_status:
- error_list = o.backend_status.split(" // ")
- else:
- error_list = []
-
- if status not in error_list:
- error_list.append(status)
-
- # Keep last two errors
- error_list = error_list[-2:]
-
- o.backend_code = code
- o.backend_status = " // ".join(error_list)
-
- try:
- scratchpad = json.loads(o.backend_register)
- scratchpad["exponent"]
- except BaseException:
- scratchpad = {
- "next_run": 0,
- "exponent": 0,
- "last_success": time.time(),
- "failures": 0,
- }
-
- # Second failure
- if scratchpad["exponent"]:
- if code == 1:
- delay = scratchpad["exponent"] * 60 # 1 minute
- else:
- delay = scratchpad["exponent"] * 600 # 10 minutes
-
- # cap delays at 8 hours
- if delay > 8 * 60 * 60:
- delay = 8 * 60 * 60
- scratchpad["next_run"] = time.time() + delay
-
- scratchpad["exponent"] += 1
-
- try:
- scratchpad["failures"] += 1
- except KeyError:
- scratchpad["failures"] = 1
-
- scratchpad["last_failure"] = time.time()
-
- o.backend_register = json.dumps(scratchpad)
-
- # TOFIX:
- # DatabaseError: value too long for type character varying(140)
- if model_accessor.obj_exists(o):
- try:
- o.backend_status = o.backend_status[:1024]
- o.save(
- update_fields=["backend_status", "backend_register"],
- always_update_timestamp=True,
- )
- except BaseException as e:
- self.log.exception("Could not update backend status field!", e=e)
- pass
-
- def sync_cohort(self, cohort, deletion):
- threading.current_thread().is_sync_thread = True
-
- sc_log = self.log.new(thread_id=threading.current_thread().ident)
-
- try:
- start_time = time.time()
- sc_log.debug("Starting to work on cohort", cohort=cohort, deletion=deletion)
-
- cohort_emptied = False
- dependency_error = None
- dependency_error_code = None
-
- itty = iter(cohort)
-
- while not cohort_emptied:
- try:
- self.reset_model_accessor()
- o = next(itty)
-
- if dependency_error:
- self.set_object_error(
- o, dependency_error, dependency_error_code
- )
- continue
-
- try:
- if deletion:
- self.delete_record(o, sc_log)
- else:
- self.sync_record(o, sc_log)
- except ExternalDependencyFailed:
- dependency_error = (
- "External dependency on object %s id %d not met"
- % (o.leaf_model_name, o.id)
- )
- dependency_error_code = 1
- except (DeferredException, InnocuousException, Exception) as e:
- dependency_error, dependency_error_code = self.handle_sync_exception(
- o, e
- )
-
- except StopIteration:
- sc_log.debug("Cohort completed", cohort=cohort, deletion=deletion)
- cohort_emptied = True
- finally:
- self.reset_model_accessor()
- model_accessor.connection_close()
-
- def tenant_class_name_from_service(self, service_name):
- """ This code supports legacy functionality. To be cleaned up. """
- name1 = service_name + "Instance"
- if hasattr(Slice().stub, name1):
- return name1
- else:
- name2 = service_name.replace("Service", "Tenant")
- if hasattr(Slice().stub, name2):
- return name2
- else:
- return None
-
- def compute_service_dependencies(self):
- """ FIXME: Implement more cleanly via xproto """
-
- model_names = self.model_to_step.keys()
- ugly_tuples = [
- (m, m.replace("Instance", "").replace("Tenant", "Service"))
- for m in model_names
- if m.endswith("ServiceInstance") or m.endswith("Tenant")
- ]
- ugly_rtuples = [(v, k) for k, v in ugly_tuples]
-
- ugly_map = dict(ugly_tuples)
- ugly_rmap = dict(ugly_rtuples)
-
- s_model_names = [v for k, v in ugly_tuples]
- s_models0 = [
- getattr(Slice().stub, model_name, None) for model_name in s_model_names
- ]
- s_models1 = [model.objects.first() for model in s_models0]
- s_models = [m for m in s_models1 if m is not None]
-
- dependencies = []
- for model in s_models:
- deps = ServiceDependency.objects.filter(subscriber_service_id=model.id)
- if deps:
- services = [
- self.tenant_class_name_from_service(
- d.provider_service.leaf_model_name
- )
- for d in deps
- ]
- dependencies.append(
- (ugly_rmap[model.leaf_model_name], [(s, "", "") for s in services])
- )
-
- return dependencies
-
- def compute_service_instance_dependencies(self, objects):
- link_set = [
- ServiceInstanceLink.objects.filter(subscriber_service_instance_id=o.id)
- for o in objects
- ]
-
- dependencies = [
- (l.provider_service_instance, l.subscriber_service_instance)
- for links in link_set
- for l in links
- ]
- providers = []
-
- for p, s in dependencies:
- if not p.enacted or p.enacted < p.updated:
- p.dependent = s
- providers.append(p)
-
- return providers
-
- def run(self):
- # Cleanup: Move self.driver into a synchronizer context
- # made available to every sync step.
- if not self.driver.enabled:
- self.log.warning("Driver is not enabled. Not running sync steps.")
- return
-
- while True:
- self.log.debug("Waiting for event or timeout")
- self.wait_for_event(timeout=5)
- self.log.debug("Synchronizer awake")
-
- self.run_once()
-
- def fetch_pending(self, deletion=False):
- unique_model_list = list(set(self.model_to_step.keys()))
- pending_objects = []
- pending_steps = []
- step_list = self.step_lookup.values()
-
- for e in self.external_dependencies:
- s = SyncStep
- s.observes = e
- step_list.append(s)
-
- for step_class in step_list:
- step = step_class(driver=self.driver)
- step.log = self.log.new(step=step)
-
- if not hasattr(step, "call"):
- pending = step.fetch_pending(deletion)
- for obj in pending:
- step = step_class(driver=self.driver)
- step.log = self.log.new(step=step)
- obj.synchronizer_step = step
-
- pending_service_dependencies = self.compute_service_instance_dependencies(
- pending
- )
-
- for obj in pending_service_dependencies:
- obj.synchronizer_step = None
-
- pending_objects.extend(pending)
- pending_objects.extend(pending_service_dependencies)
- else:
- # Support old and broken legacy synchronizers
- # This needs to be dropped soon.
- pending_steps.append(step)
-
- self.log.debug(
- "Fetched pending data",
- pending_objects=pending_objects,
- legacy_steps=pending_steps,
- )
- return pending_objects, pending_steps
-
- def linked_objects(self, o):
- if o is None:
- return [], None
- try:
- o_lst = [o for o in o.all()]
- edge_type = PROXY_EDGE
- except (AttributeError, TypeError):
- o_lst = [o]
- edge_type = DIRECT_EDGE
- return o_lst, edge_type
-
- """ Automatically test if a real dependency path exists between two objects. e.g.
- given an Instance, and a ControllerSite, the test amounts to:
- instance.slice.site == controller.site
-
- Then the two objects are related, and should be put in the same cohort.
- If the models of the two objects are not dependent, then the check trivially
- returns False.
- """
-
- def same_object(self, o1, o2):
- if not o1 or not o2:
- return False, None
-
- o1_lst, edge_type = self.linked_objects(o1)
-
- try:
- found = next(
- obj
- for obj in o1_lst
- if obj.leaf_model_name == o2.leaf_model_name and obj.pk == o2.pk
- )
- except AttributeError as e:
- self.log.exception("Compared objects could not be identified", e=e)
- raise e
- except StopIteration:
- # This is a temporary workaround to establish dependencies between
- # deleted proxy objects. A better solution would be for the ORM to
- # return the set of deleted objects via foreign keys. At that point,
- # the following line would change back to found = False
- # - Sapan
-
- found = getattr(o2, "deleted", False)
-
- return found, edge_type
-
- def concrete_path_exists(self, o1, o2):
- try:
- m1 = o1.leaf_model_name
- m2 = o2.leaf_model_name
- except AttributeError:
- # One of the nodes is not in the dependency graph
- # No dependency
- return False, None
-
- if m1.endswith("ServiceInstance") and m2.endswith("ServiceInstance"):
- return getattr(o2, "dependent", None) == o1, DIRECT_EDGE
-
- # FIXME: Dynamic dependency check
- G = self.model_dependency_graph[False]
- paths = all_shortest_paths(G, m1, m2)
-
- try:
- any(paths)
- paths = all_shortest_paths(G, m1, m2)
- except NetworkXNoPath:
- # Easy. The two models are unrelated.
- return False, None
-
- for p in paths:
- src_object = o1
- edge_type = DIRECT_EDGE
-
- for i in range(len(p) - 1):
- src = p[i]
- dst = p[i + 1]
- edge_label = G[src][dst]
- sa = edge_label["src_accessor"]
- try:
- dst_accessor = getattr(src_object, sa)
- dst_objects, link_edge_type = self.linked_objects(dst_accessor)
- if link_edge_type == PROXY_EDGE:
- edge_type = link_edge_type
-
- """
-
- True If no linked objects and deletion
- False If no linked objects
- True If multiple linked objects
- <continue traversal> If single linked object
-
- """
-
- if dst_objects == []:
- # Workaround for ORM not returning linked deleted
- # objects
- if o2.deleted:
- return True, edge_type
- else:
- dst_object = None
- elif len(dst_objects) > 1:
- # Multiple linked objects. Assume anything could be among those multiple objects.
- raise AttributeError
- else:
- dst_object = dst_objects[0]
- except AttributeError as e:
- if sa != "fake_accessor":
- self.log.debug(
- "Could not check object dependencies, making conservative choice %s",
- e,
- src_object=src_object,
- sa=sa,
- o1=o1,
- o2=o2,
- )
- return True, edge_type
-
- src_object = dst_object
-
- if not src_object:
- break
-
- verdict, edge_type = self.same_object(src_object, o2)
- if verdict:
- return verdict, edge_type
-
- # Otherwise try other paths
-
- return False, None
-
- """
-
- This function implements the main scheduling logic
- of the Synchronizer. It divides incoming work (dirty objects)
- into cohorts of dependent objects, and runs each such cohort
- in its own thread.
-
- Future work:
-
- * Run event thread in parallel to the scheduling thread, and
- add incoming objects to existing cohorts. Doing so should
- greatly improve synchronizer performance.
- * A single object might need to be added to multiple cohorts.
- In this case, the last cohort handles such an object.
- * This algorithm is horizontal-scale-ready. Multiple synchronizers
- could run off a shared runqueue of cohorts.
-
- """
-
- def compute_dependent_cohorts(self, objects, deletion):
- model_map = defaultdict(list)
- n = len(objects)
- r = range(n)
- indexed_objects = zip(r, objects)
-
- oG = DiGraph()
-
- for i in r:
- oG.add_node(i)
-
- try:
- for i0 in range(n):
- for i1 in range(n):
- if i0 != i1:
- if deletion:
- path_args = (objects[i1], objects[i0])
- else:
- path_args = (objects[i0], objects[i1])
-
- is_connected, edge_type = self.concrete_path_exists(*path_args)
- if is_connected:
- try:
- edge_type = oG[i1][i0]["type"]
- if edge_type == PROXY_EDGE:
- oG.remove_edge(i1, i0)
- oG.add_edge(i0, i1, {"type": edge_type})
- except KeyError:
- oG.add_edge(i0, i1, {"type": edge_type})
- except KeyError:
- pass
-
- components = weakly_connected_component_subgraphs(oG)
- cohort_indexes = [reversed(topological_sort(g)) for g in components]
- cohorts = [
- [objects[i] for i in cohort_index] for cohort_index in cohort_indexes
- ]
-
- return cohorts
-
- def run_once(self):
- self.load_dependency_graph()
-
- try:
- # Why are we checking the DB connection here?
- model_accessor.check_db_connection_okay()
-
- loop_start = time.time()
-
- # Two passes. One for sync, the other for deletion.
- for deletion in (False, True):
- objects_to_process = []
-
- objects_to_process, steps_to_process = self.fetch_pending(deletion)
- dependent_cohorts = self.compute_dependent_cohorts(
- objects_to_process, deletion
- )
-
- threads = []
- self.log.debug("In run once inner loop", deletion=deletion)
-
- for cohort in dependent_cohorts:
- thread = threading.Thread(
- target=self.sync_cohort,
- name="synchronizer",
- args=(cohort, deletion),
- )
-
- threads.append(thread)
-
- # Start threads
- for t in threads:
- t.start()
-
- self.reset_model_accessor()
-
- # Wait for all threads to finish before continuing with the run
- # loop
- for t in threads:
- t.join()
-
- # Run legacy synchronizers, which do everything in call()
- for step in steps_to_process:
- try:
- step.call(deletion=deletion)
- except Exception as e:
- self.log.exception("Legacy step failed", step=step, e=e)
-
- loop_end = time.time()
-
- except Exception as e:
- self.log.exception(
- "Core error. This seems like a misconfiguration or bug. This error will not be relayed to the user!",
- e=e,
- )
- self.log.error("Exception in observer run loop")
diff --git a/xos/synchronizers/new_base/eventstep.py b/xos/synchronizers/new_base/eventstep.py
deleted file mode 100644
index 9596248..0000000
--- a/xos/synchronizers/new_base/eventstep.py
+++ /dev/null
@@ -1,43 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-class EventStep(object):
- """
- All the event steps defined in each synchronizer needs to inherit from this class in order to be loaded
-
- Each step should define a technology, and either a `topics` or a `pattern`. The meaning of `topics` and `pattern`
- depend on the technology that is chosen.
- """
-
- technology = "kafka"
- topics = []
- pattern = None
-
- def __init__(self, log, **kwargs):
- """
- Initialize a pull step. Override this function to include any initialization. Make sure to call the original
- __init__() from your method.
- """
-
- # self.log can be used to emit logging messages.
- self.log = log
-
- def process_event(self, event):
- # This method must be overridden in your class. Do not call the original method.
-
- self.log.warning(
- "There is no default process_event, please provide a process_event method",
- msg=event,
- )
diff --git a/xos/synchronizers/new_base/exceptions.py b/xos/synchronizers/new_base/exceptions.py
deleted file mode 100644
index 3589777..0000000
--- a/xos/synchronizers/new_base/exceptions.py
+++ /dev/null
@@ -1,25 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-class SynchronizerException(Exception):
- pass
-
-
-class SynchronizerProgrammingError(SynchronizerException):
- pass
-
-
-class SynchronizerConfigurationError(SynchronizerException):
- pass
diff --git a/xos/synchronizers/new_base/loadmodels.py b/xos/synchronizers/new_base/loadmodels.py
deleted file mode 100644
index 7e82ac9..0000000
--- a/xos/synchronizers/new_base/loadmodels.py
+++ /dev/null
@@ -1,60 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import os
-from xosconfig import Config
-from multistructlog import create_logger
-
-log = create_logger(Config().get("logging"))
-
-
-class ModelLoadClient(object):
- def __init__(self, api):
- self.api = api
-
- def upload_models(self, name, dir, version="unknown"):
- request = self.api.dynamicload_pb2.LoadModelsRequest(name=name, version=version)
-
- for fn in os.listdir(dir):
- if fn.endswith(".xproto"):
- item = request.xprotos.add()
- item.filename = fn
- item.contents = open(os.path.join(dir, fn)).read()
-
- models_fn = os.path.join(dir, "models.py")
- if os.path.exists(models_fn):
- item = request.decls.add()
- item.filename = "models.py"
- item.contents = open(models_fn).read()
-
- attic_dir = os.path.join(dir, "attic")
- if os.path.exists(attic_dir):
- log.warn(
- "Attics are deprecated, please use the legacy=True option in xProto"
- )
- for fn in os.listdir(attic_dir):
- if fn.endswith(".py"):
- item = request.attics.add()
- item.filename = fn
- item.contents = open(os.path.join(attic_dir, fn)).read()
-
- api_convenience_dir = os.path.join(dir, "convenience")
- if os.path.exists(api_convenience_dir):
- for fn in os.listdir(api_convenience_dir):
- if fn.endswith(".py") and "test" not in fn:
- item = request.convenience_methods.add()
- item.filename = fn
- item.contents = open(os.path.join(api_convenience_dir, fn)).read()
-
- result = self.api.dynamicload.LoadModels(request)
diff --git a/xos/synchronizers/new_base/mock_modelaccessor_build.py b/xos/synchronizers/new_base/mock_modelaccessor_build.py
deleted file mode 100644
index 9cb1d4f..0000000
--- a/xos/synchronizers/new_base/mock_modelaccessor_build.py
+++ /dev/null
@@ -1,73 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import os
-import cPickle
-import subprocess
-
-"""
- Support for autogenerating mock_modelaccessor.
-
- Each unit test might have its own requirements for the set of xprotos that make
- up its model testing framework. These should always include the core, and
- optionally include one or more services.
-"""
-
-
-def build_mock_modelaccessor(
- xos_dir, services_dir, service_xprotos, target="mock_classes.xtarget"
-):
- dest_fn = os.path.join(
- xos_dir, "synchronizers", "new_base", "mock_modelaccessor.py"
- )
-
- args = ["xosgenx", "--target", target]
- args.append(os.path.join(xos_dir, "core/models/core.xproto"))
- for xproto in service_xprotos:
- args.append(os.path.join(services_dir, xproto))
-
- # Check to see if we've already run xosgenx. If so, don't run it again.
- context_fn = dest_fn + ".context"
- this_context = (xos_dir, services_dir, service_xprotos, target)
- need_xosgenx = True
- if os.path.exists(context_fn):
- try:
- context = cPickle.loads(open(context_fn).read())
- if context == this_context:
- return
- except (cPickle.UnpicklingError, EOFError):
- # Something went wrong with the file read or depickling
- pass
-
- if os.path.exists(context_fn):
- os.remove(context_fn)
-
- if os.path.exists(dest_fn):
- os.remove(dest_fn)
-
- p = subprocess.Popen(
- " ".join(args) + " > " + dest_fn,
- shell=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- )
- (stdoutdata, stderrdata) = p.communicate()
- if (p.returncode != 0) or (not os.path.exists(dest_fn)):
- raise Exception(
- "Failed to create mock model accessor, returncode=%d, stdout=%s"
- % (p.returncode, stdoutdata)
- )
-
- # Save the context of this invocation of xosgenx
- open(context_fn, "w").write(cPickle.dumps(this_context))
diff --git a/xos/synchronizers/new_base/model_policies/__init__.py b/xos/synchronizers/new_base/model_policies/__init__.py
deleted file mode 100644
index b0fb0b2..0000000
--- a/xos/synchronizers/new_base/model_policies/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
diff --git a/xos/synchronizers/new_base/model_policies/model_policy_tenantwithcontainer.py b/xos/synchronizers/new_base/model_policies/model_policy_tenantwithcontainer.py
deleted file mode 100644
index afe2c67..0000000
--- a/xos/synchronizers/new_base/model_policies/model_policy_tenantwithcontainer.py
+++ /dev/null
@@ -1,320 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from synchronizers.new_base.modelaccessor import *
-from synchronizers.new_base.policy import Policy
-from synchronizers.new_base.exceptions import *
-
-
-class Scheduler(object):
- # XOS Scheduler Abstract Base Class
- # Used to implement schedulers that pick which node to put instances on
-
- def __init__(self, slice, label=None, constrain_by_service_instance=False):
- self.slice = slice
- self.label = label # Only pick nodes with this label
- # Apply service-instance-based constraints
- self.constrain_by_service_instance = constrain_by_service_instance
-
- def pick(self):
- # this method should return a tuple (node, parent)
- # node is the node to instantiate on
- # parent is for container_vm instances only, and is the VM that will
- # hold the container
-
- raise Exception("Abstract Base")
-
-
-class LeastLoadedNodeScheduler(Scheduler):
- # This scheduler always return the node with the fewest number of
- # instances.
-
- def pick(self):
- set_label = False
-
- nodes = []
- if self.label:
- nodes = Node.objects.filter(nodelabels__name=self.label)
- if not nodes:
- set_label = self.constrain_by_service_instance
-
- if not nodes:
- if self.slice.default_node:
- # if slice.default_node is set, then filter by default_node
- nodes = Node.objects.filter(name=self.slice.default_node)
- else:
- nodes = Node.objects.all()
-
- # convert to list
- nodes = list(nodes)
-
- # sort so that we pick the least-loaded node
- nodes = sorted(nodes, key=lambda node: node.instances.count())
-
- if not nodes:
- raise Exception("LeastLoadedNodeScheduler: No suitable nodes to pick from")
-
- picked_node = nodes[0]
-
- if set_label:
- nl = NodeLabel(name=self.label)
- nl.node.add(picked_node)
- nl.save()
-
- # TODO: logic to filter nodes by which nodes are up, and which
- # nodes the slice can instantiate on.
- return [picked_node, None]
-
-
-class TenantWithContainerPolicy(Policy):
- # This policy is abstract. Inherit this class into your own policy and override model_name
- model_name = None
-
- def handle_create(self, tenant):
- return self.handle_update(tenant)
-
- def handle_update(self, service_instance):
- if (service_instance.link_deleted_count > 0) and (
- not service_instance.provided_links.exists()
- ):
- model = globals()[self.model_name]
- self.log.info(
- "The last provided link has been deleted -- self-destructing."
- )
- self.handle_delete(service_instance)
- if model.objects.filter(id=service_instance.id).exists():
- service_instance.delete()
- else:
- self.log.info("Tenant %s is already deleted" % service_instance)
- return
- self.manage_container(service_instance)
-
- # def handle_delete(self, tenant):
- # if tenant.vcpe:
- # tenant.vcpe.delete()
-
- def save_instance(self, instance):
- # Override this function to do custom pre-save or post-save processing,
- # such as creating ports for containers.
- instance.save()
-
- def ip_to_mac(self, ip):
- (a, b, c, d) = ip.split(".")
- return "02:42:%02x:%02x:%02x:%02x" % (int(a), int(b), int(c), int(d))
-
- def allocate_public_service_instance(self, **kwargs):
- """ Get a ServiceInstance that provides public connectivity. Currently this means to use AddressPool and
- the AddressManager Service.
-
- Expect this to be refactored when we break hard-coded service dependencies.
- """
- address_pool_name = kwargs.pop("address_pool_name")
-
- am_service = AddressManagerService.objects.all() # TODO: Hardcoded dependency
- if not am_service:
- raise Exception("no addressing services")
- am_service = am_service[0]
-
- ap = AddressPool.objects.filter(
- name=address_pool_name, service_id=am_service.id
- )
- if not ap:
- raise Exception("Addressing service unable to find addresspool %s" % name)
- ap = ap[0]
-
- ip = ap.get_address()
- if not ip:
- raise Exception("AddressPool '%s' has run out of addresses." % ap.name)
-
- ap.save() # save the AddressPool to account for address being removed from it
-
- subscriber_service = None
- if "subscriber_service" in kwargs:
- subscriber_service = kwargs.pop("subscriber_service")
-
- subscriber_service_instance = None
- if "subscriber_tenant" in kwargs:
- subscriber_service_instance = kwargs.pop("subscriber_tenant")
- elif "subscriber_service_instance" in kwargs:
- subscriber_service_instance = kwargs.pop("subscriber_service_instance")
-
- # TODO: potential partial failure -- AddressPool address is allocated and saved before addressing tenant
-
- t = None
- try:
- t = AddressManagerServiceInstance(
- owner=am_service, **kwargs
- ) # TODO: Hardcoded dependency
- t.public_ip = ip
- t.public_mac = self.ip_to_mac(ip)
- t.address_pool_id = ap.id
- t.save()
-
- if subscriber_service:
- link = ServiceInstanceLink(
- subscriber_service=subscriber_service, provider_service_instance=t
- )
- link.save()
-
- if subscriber_service_instance:
- link = ServiceInstanceLink(
- subscriber_service_instance=subscriber_service_instance,
- provider_service_instance=t,
- )
- link.save()
- except BaseException:
- # cleanup if anything went wrong
- ap.put_address(ip)
- ap.save() # save the AddressPool to account for address being added to it
- if t and t.id:
- t.delete()
- raise
-
- return t
-
- def get_image(self, tenant):
- slice = tenant.owner.slices.all()
- if not slice:
- raise SynchronizerProgrammingError("provider service has no slice")
- slice = slice[0]
-
- # If slice has default_image set then use it
- if slice.default_image:
- return slice.default_image
-
- raise SynchronizerProgrammingError(
- "Please set a default image for %s" % self.slice.name
- )
-
- """ get_legacy_tenant_attribute
- pick_least_loaded_instance_in_slice
- count_of_tenants_of_an_instance
-
- These three methods seem to be used by A-CORD. Look for ways to consolidate with existing methods and eliminate
- these legacy ones
- """
-
- def get_legacy_tenant_attribute(self, tenant, name, default=None):
- if tenant.service_specific_attribute:
- attributes = json.loads(tenant.service_specific_attribute)
- else:
- attributes = {}
- return attributes.get(name, default)
-
- def pick_least_loaded_instance_in_slice(self, tenant, slices, image):
- for slice in slices:
- if slice.instances.all().count() > 0:
- for instance in slice.instances.all():
- if instance.image != image:
- continue
- # Pick the first instance that has lesser than 5 tenants
- if self.count_of_tenants_of_an_instance(tenant, instance) < 5:
- return instance
- return None
-
- # TODO: Ideally the tenant count for an instance should be maintained using a
- # many-to-one relationship attribute, however this model being proxy, it does
- # not permit any new attributes to be defined. Find if any better solutions
- def count_of_tenants_of_an_instance(self, tenant, instance):
- tenant_count = 0
- for tenant in self.__class__.objects.all():
- if (
- self.get_legacy_tenant_attribute(tenant, "instance_id", None)
- == instance.id
- ):
- tenant_count += 1
- return tenant_count
-
- def manage_container(self, tenant):
- if tenant.deleted:
- return
-
- desired_image = self.get_image(tenant)
-
- if (tenant.instance is not None) and (
- tenant.instance.image.id != desired_image.id
- ):
- tenant.instance.delete()
- tenant.instance = None
-
- if tenant.instance is None:
- if not tenant.owner.slices.count():
- raise SynchronizerConfigurationError("The service has no slices")
-
- new_instance_created = False
- instance = None
- if self.get_legacy_tenant_attribute(
- tenant, "use_same_instance_for_multiple_tenants", default=False
- ):
- # Find if any existing instances can be used for this tenant
- slices = tenant.owner.slices.all()
- instance = self.pick_least_loaded_instance_in_slice(
- tenant, slices, desired_image
- )
-
- if not instance:
- slice = tenant.owner.slices.first()
-
- flavor = slice.default_flavor
- if not flavor:
- flavors = Flavor.objects.filter(name="m1.small")
- if not flavors:
- raise SynchronizerConfigurationError("No m1.small flavor")
- flavor = flavors[0]
-
- if slice.default_isolation == "container_vm":
- raise Exception("Not implemented")
- else:
- scheduler = getattr(self, "scheduler", LeastLoadedNodeScheduler)
- constrain_by_service_instance = getattr(
- self, "constrain_by_service_instance", False
- )
- tenant_node_label = getattr(tenant, "node_label", None)
- (node, parent) = scheduler(
- slice,
- label=tenant_node_label,
- constrain_by_service_instance=constrain_by_service_instance,
- ).pick()
-
- assert slice is not None
- assert node is not None
- assert desired_image is not None
- assert tenant.creator is not None
- assert node.site_deployment.deployment is not None
- assert flavor is not None
-
- try:
- instance = Instance(
- slice=slice,
- node=node,
- image=desired_image,
- creator=tenant.creator,
- deployment=node.site_deployment.deployment,
- flavor=flavor,
- isolation=slice.default_isolation,
- parent=parent,
- )
- self.save_instance(instance)
- new_instance_created = True
-
- tenant.instance = instance
- tenant.save()
- except BaseException:
- # NOTE: We don't have transactional support, so if the synchronizer crashes and exits after
- # creating the instance, but before adding it to the tenant, then we will leave an
- # orphaned instance.
- if new_instance_created:
- instance.delete()
- raise
diff --git a/xos/synchronizers/new_base/model_policies/test_config.yaml b/xos/synchronizers/new_base/model_policies/test_config.yaml
deleted file mode 100644
index bffe809..0000000
--- a/xos/synchronizers/new_base/model_policies/test_config.yaml
+++ /dev/null
@@ -1,30 +0,0 @@
-
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-name: test-model-policies
-accessor:
- username: xosadmin@opencord.org
- password: "sample"
- kind: testframework
-logging:
- version: 1
- handlers:
- console:
- class: logging.StreamHandler
- loggers:
- 'multistructlog':
- handlers:
- - console
diff --git a/xos/synchronizers/new_base/model_policy_loop.py b/xos/synchronizers/new_base/model_policy_loop.py
deleted file mode 100644
index 1160386..0000000
--- a/xos/synchronizers/new_base/model_policy_loop.py
+++ /dev/null
@@ -1,223 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from __future__ import print_function
-from synchronizers.new_base.modelaccessor import *
-from synchronizers.new_base.dependency_walker_new import *
-from synchronizers.new_base.policy import Policy
-
-import imp
-import pdb
-import time
-import traceback
-
-
-class XOSPolicyEngine(object):
- def __init__(self, policies_dir, log):
- self.model_policies = self.load_model_policies(policies_dir)
- self.policies_by_name = {}
- self.policies_by_class = {}
- self.log = log
-
- for policy in self.model_policies:
- if policy.model_name not in self.policies_by_name:
- self.policies_by_name[policy.model_name] = []
- self.policies_by_name[policy.model_name].append(policy)
-
- if policy.model not in self.policies_by_class:
- self.policies_by_class[policy.model] = []
- self.policies_by_class[policy.model].append(policy)
-
- def update_wp(self, d, o):
- try:
- save_fields = []
- if d.write_protect != o.write_protect:
- d.write_protect = o.write_protect
- save_fields.append("write_protect")
- if save_fields:
- d.save(update_fields=save_fields)
- except AttributeError as e:
- raise e
-
- def update_dep(self, d, o):
- try:
- print("Trying to update %s" % d)
- save_fields = []
- if d.updated < o.updated:
- save_fields = ["updated"]
-
- if save_fields:
- d.save(update_fields=save_fields)
- except AttributeError as e:
- log.exception("AttributeError in update_dep", e=e)
- raise e
- except Exception as e:
- log.exception("Exception in update_dep", e=e)
-
- def delete_if_inactive(self, d, o):
- try:
- d.delete()
- print("Deleted %s (%s)" % (d, d.__class__.__name__))
- except BaseException:
- pass
- return
-
- def load_model_policies(self, policies_dir):
- policies = []
- for fn in os.listdir(policies_dir):
- if fn.startswith("test"):
- # don't try to import unit tests!
- continue
- pathname = os.path.join(policies_dir, fn)
- if (
- os.path.isfile(pathname)
- and fn.endswith(".py")
- and (fn != "__init__.py")
- ):
- module = imp.load_source(fn[:-3], pathname)
- for classname in dir(module):
- c = getattr(module, classname, None)
-
- # make sure 'c' is a descendent of Policy and has a
- # provides field (this eliminates the abstract base classes
- # since they don't have a provides)
-
- if (
- inspect.isclass(c)
- and issubclass(c, Policy)
- and hasattr(c, "model_name")
- and (c not in policies)
- ):
- if not c.model_name:
- log.info(
- "load_model_policies: skipping model policy",
- classname=classname,
- )
- continue
- if not model_accessor.has_model_class(c.model_name):
- log.error(
- "load_model_policies: unable to find model policy",
- classname=classname,
- model=c.model_name,
- )
- c.model = model_accessor.get_model_class(c.model_name)
- policies.append(c)
-
- log.info("Loaded model policies", policies=policies)
- return policies
-
- def execute_model_policy(self, instance, action):
- # These are the models whose children get deleted when they are
- delete_policy_models = ["Slice", "Instance", "Network"]
- sender_name = getattr(instance, "model_name", instance.__class__.__name__)
-
- # if (action != "deleted"):
- # walk_inv_deps(self.update_dep, instance)
- # walk_deps(self.update_wp, instance)
- # elif (sender_name in delete_policy_models):
- # walk_inv_deps(self.delete_if_inactive, instance)
-
- policies_failed = False
- for policy in self.policies_by_name.get(sender_name, None):
- method_name = "handle_%s" % action
- if hasattr(policy, method_name):
- try:
- log.debug(
- "MODEL POLICY: calling handler",
- sender_name=sender_name,
- instance=instance,
- policy=policy.__name__,
- method=method_name,
- )
- getattr(policy(), method_name)(instance)
- log.debug(
- "MODEL POLICY: completed handler",
- sender_name=sender_name,
- instance=instance,
- policy_name=policy.__name__,
- method=method_name,
- )
- except Exception as e:
- log.exception("MODEL POLICY: Exception when running handler", e=e)
- policies_failed = True
-
- try:
- instance.policy_status = "%s" % traceback.format_exc(limit=1)
- instance.policy_code = 2
- instance.save(update_fields=["policy_status", "policy_code"])
- except Exception as e:
- log.exception(
- "MODEL_POLICY: Exception when storing policy_status", e=e
- )
-
- if not policies_failed:
- try:
- instance.policed = max(instance.updated, instance.changed_by_step)
- instance.policy_status = "done"
- instance.policy_code = 1
-
- instance.save(update_fields=["policed", "policy_status", "policy_code"])
-
- if hasattr(policy, "after_policy_save"):
- policy().after_policy_save(instance)
-
- log.info("MODEL_POLICY: Saved", o=instance)
- except BaseException:
- log.exception(
- "MODEL POLICY: Object failed to update policed timestamp",
- instance=instance,
- )
-
- def noop(self, o, p):
- pass
-
- def run(self):
- while True:
- start = time.time()
- try:
- self.run_policy_once()
- except Exception as e:
- log.exception("MODEL_POLICY: Exception in run()", e=e)
- if time.time() - start < 5:
- time.sleep(5)
-
- # TODO: This loop is different from the synchronizer event_loop, but they both do mostly the same thing. Look for
- # ways to combine them.
-
- def run_policy_once(self):
- models = self.policies_by_class.keys()
-
- model_accessor.check_db_connection_okay()
-
- objects = model_accessor.fetch_policies(models, False)
- deleted_objects = model_accessor.fetch_policies(models, True)
-
- for o in objects:
- if o.deleted:
- # This shouldn't happen, but previous code was examining o.deleted. Verify.
- continue
- if not o.policed:
- self.execute_model_policy(o, "create")
- else:
- self.execute_model_policy(o, "update")
-
- for o in deleted_objects:
- self.execute_model_policy(o, "delete")
-
- try:
- model_accessor.reset_queries()
- except Exception as e:
- # this shouldn't happen, but in case it does, catch it...
- log.exception("MODEL POLICY: exception in reset_queries", e)
diff --git a/xos/synchronizers/new_base/modelaccessor.py b/xos/synchronizers/new_base/modelaccessor.py
deleted file mode 100644
index 926a17b..0000000
--- a/xos/synchronizers/new_base/modelaccessor.py
+++ /dev/null
@@ -1,312 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-""" ModelAccessor
-
- A class for abstracting access to models. Used to get any djangoisms out
- of the synchronizer code base.
-
- This module will import all models into this module's global scope, so doing
- a "from modelaccessor import *" from a calling module ought to import all
- models into the calling module's scope.
-"""
-
-import functools
-import importlib
-import os
-import signal
-import sys
-import time
-from loadmodels import ModelLoadClient
-
-from xosconfig import Config
-from multistructlog import create_logger
-from xosutil.autodiscover_version import autodiscover_version_of_main
-
-log = create_logger(Config().get("logging"))
-
-orig_sigint = None
-model_accessor = None
-
-
-class ModelAccessor(object):
- def __init__(self):
- self.all_model_classes = self.get_all_model_classes()
-
- def get_all_model_classes(self):
- """ Build a dictionary of all model class names """
- raise Exception("Not Implemented")
-
- def get_model_class(self, name):
- """ Given a class name, return that model class """
- return self.all_model_classes[name]
-
- def has_model_class(self, name):
- """ Given a class name, return that model class """
- return name in self.all_model_classes
-
- def fetch_pending(self, main_objs, deletion=False):
- """ Execute the default fetch_pending query """
- raise Exception("Not Implemented")
-
- def fetch_policies(self, main_objs, deletion=False):
- """ Execute the default fetch_pending query """
- raise Exception("Not Implemented")
-
- def reset_queries(self):
- """ Reset any state between passes of synchronizer. For django, to
- limit memory consumption of cached queries.
- """
- pass
-
- def connection_close(self):
- """ Close any active database connection. For django, to limit memory
- consumption.
- """
- pass
-
- def check_db_connection_okay(self):
- """ Checks to make sure the db connection is okay """
- pass
-
- def obj_exists(self, o):
- """ Return True if the object exists in the data model """
- raise Exception("Not Implemented")
-
- def obj_in_list(self, o, olist):
- """ Return True if o is the same as one of the objects in olist """
- raise Exception("Not Implemented")
-
- def now(self):
- """ Return the current time for timestamping purposes """
- raise Exception("Not Implemented")
-
- def is_type(self, obj, name):
- """ returns True is obj is of model type "name" """
- raise Exception("Not Implemented")
-
- def is_instance(self, obj, name):
- """ returns True if obj is of model type "name" or is a descendant """
- raise Exception("Not Implemented")
-
- def get_content_type_id(self, obj):
- raise Exception("Not Implemented")
-
- def journal_object(self, o, operation, msg=None, timestamp=None):
- pass
-
- def create_obj(self, cls, **kwargs):
- raise Exception("Not Implemented")
-
-
-def import_models_to_globals():
- # add all models to globals
- for (k, v) in model_accessor.all_model_classes.items():
- globals()[k] = v
-
- # xosbase doesn't exist from the synchronizer's perspective, so fake out
- # ModelLink.
- if "ModelLink" not in globals():
-
- class ModelLink:
- def __init__(self, dest, via, into=None):
- self.dest = dest
- self.via = via
- self.into = into
-
- globals()["ModelLink"] = ModelLink
-
-
-def keep_trying(client, reactor):
- # Keep checking the connection to wait for it to become unavailable.
- # Then reconnect. The strategy is to send NoOp operations, one per second, until eventually a NoOp throws an
- # exception. This will indicate the server has reset. When that happens, we force the client to reconnect, and
- # it will download a new API from the server.
-
- from xosapi.xos_grpc_client import Empty
-
- try:
- client.utility.NoOp(Empty())
- except Exception as e:
- # If we caught an exception, then the API has become unavailable.
- # So reconnect.
-
- log.exception("exception in NoOp", e=e)
- log.info("restarting synchronizer")
-
- os.execv(sys.executable, ["python"] + sys.argv)
- return
-
- reactor.callLater(1, functools.partial(keep_trying, client, reactor))
-
-
-def grpcapi_reconnect(client, reactor):
- global model_accessor
-
- # Make sure to try to load models before trying to initialize the ORM. It might be the ORM is broken because it
- # is waiting on our models.
-
- if Config.get("models_dir"):
- version = autodiscover_version_of_main(max_parent_depth=0) or "unknown"
- log.info("Service version is %s" % version)
- try:
- ModelLoadClient(client).upload_models(
- Config.get("name"), Config.get("models_dir"), version=version
- )
- except Exception as e: # TODO: narrow exception scope
- if (
- hasattr(e, "code")
- and callable(e.code)
- and hasattr(e.code(), "name")
- and (e.code().name) == "UNAVAILABLE"
- ):
- # We need to make sure we force a reconnection, as it's possible that we will end up downloading a
- # new xos API.
- log.info("grpc unavailable during loadmodels. Force a reconnect")
- client.connected = False
- client.connect()
- return
- log.exception("failed to onboard models")
- # If it's some other error, then we don't need to force a reconnect. Just try the LoadModels() again.
- reactor.callLater(10, functools.partial(grpcapi_reconnect, client, reactor))
- return
-
- # If the ORM is broken, then wait for the orm to become available.
-
- if not getattr(client, "xos_orm", None):
- log.warning("No xos_orm. Will keep trying...")
- reactor.callLater(1, functools.partial(keep_trying, client, reactor))
- return
-
- # this will prevent updated timestamps from being automatically updated
- client.xos_orm.caller_kind = "synchronizer"
-
- client.xos_orm.restart_on_disconnect = True
-
- from apiaccessor import CoreApiModelAccessor
-
- model_accessor = CoreApiModelAccessor(orm=client.xos_orm)
-
- # If required_models is set, then check to make sure the required_models
- # are present. If not, then the synchronizer needs to go to sleep until
- # the models show up.
-
- required_models = Config.get("required_models")
- if required_models:
- required_models = [x.strip() for x in required_models]
-
- missing = []
- found = []
- for model in required_models:
- if model_accessor.has_model_class(model):
- found.append(model)
- else:
- missing.append(model)
-
- log.info("required_models, found:", models=", ".join(found))
- if missing:
- log.warning("required_models: missing", models=", ".join(missing))
- # We're missing a required model. Give up and wait for the connection
- # to reconnect, and hope our missing model has shown up.
- reactor.callLater(1, functools.partial(keep_trying, client, reactor))
- return
-
- # import all models to global space
- import_models_to_globals()
-
- # Synchronizer framework isn't ready to embrace reactor yet...
- reactor.stop()
-
- # Restore the sigint handler
- signal.signal(signal.SIGINT, orig_sigint)
-
-
-def config_accessor_grpcapi():
- global orig_sigint
-
- log.info("Connecting to the gRPC API")
-
- grpcapi_endpoint = Config.get("accessor.endpoint")
- grpcapi_username = Config.get("accessor.username")
- grpcapi_password = Config.get("accessor.password")
-
- # if password starts with "@", then retreive the password from a file
- if grpcapi_password.startswith("@"):
- fn = grpcapi_password[1:]
- if not os.path.exists(fn):
- raise Exception("%s does not exist" % fn)
- grpcapi_password = open(fn).readline().strip()
-
- from xosapi.xos_grpc_client import SecureClient
- from twisted.internet import reactor
-
- grpcapi_client = SecureClient(
- endpoint=grpcapi_endpoint, username=grpcapi_username, password=grpcapi_password
- )
- grpcapi_client.set_reconnect_callback(
- functools.partial(grpcapi_reconnect, grpcapi_client, reactor)
- )
- grpcapi_client.start()
-
- # Start reactor. This will cause the client to connect and then execute
- # grpcapi_callback().
-
- # Reactor will take over SIGINT during reactor.run(), but does not return it when reactor.stop() is called.
-
- orig_sigint = signal.getsignal(signal.SIGINT)
-
- # Start reactor. This will cause the client to connect and then execute
- # grpcapi_callback().
-
- reactor.run()
-
-
-def config_accessor_mock():
- global model_accessor
- from mock_modelaccessor import model_accessor as mock_model_accessor
-
- model_accessor = mock_model_accessor
-
- # mock_model_accessor doesn't have an all_model_classes field, so make one.
- import mock_modelaccessor as mma
-
- all_model_classes = {}
- for k in dir(mma):
- v = getattr(mma, k)
- if hasattr(v, "leaf_model_name"):
- all_model_classes[k] = v
-
- model_accessor.all_model_classes = all_model_classes
-
- import_models_to_globals()
-
-
-def config_accessor():
- accessor_kind = Config.get("accessor.kind")
-
- if accessor_kind == "testframework":
- config_accessor_mock()
- elif accessor_kind == "grpcapi":
- config_accessor_grpcapi()
- else:
- raise Exception("Unknown accessor kind %s" % accessor_kind)
-
- # now import any wrappers that the synchronizer needs to add to the ORM
- if Config.get("wrappers"):
- for wrapper_name in Config.get("wrappers"):
- importlib.import_module(wrapper_name)
-
-
-config_accessor()
diff --git a/xos/synchronizers/new_base/policy.py b/xos/synchronizers/new_base/policy.py
deleted file mode 100644
index b455c79..0000000
--- a/xos/synchronizers/new_base/policy.py
+++ /dev/null
@@ -1,40 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-""" policy.py
-
- Base Classes for Model Policies
-"""
-
-from xosconfig import Config
-from multistructlog import create_logger
-
-log = create_logger(Config().get("logging"))
-
-
-class Policy(object):
- """ An XOS Model Policy
-
- Set the class member model_name to the name of the model that this policy will act on.
-
- The following functions will be invoked if they are defined:
-
- handle_create ... called when a model is created
- handle_update ... called when a model is updated
- handle_delete ... called when a model is deleted
- """
-
- def __init__(self):
- self.logger = log
diff --git a/xos/synchronizers/new_base/pull_step_engine.py b/xos/synchronizers/new_base/pull_step_engine.py
deleted file mode 100644
index 6d61cac..0000000
--- a/xos/synchronizers/new_base/pull_step_engine.py
+++ /dev/null
@@ -1,102 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import imp
-import inspect
-import os
-import threading
-import time
-from xosconfig import Config
-from multistructlog import create_logger
-
-log = create_logger(Config().get("logging"))
-
-
-class XOSPullStepScheduler:
- """ XOSPullStepThread
-
- A Thread for servicing pull steps. There is one event_step associated with one XOSPullStepThread.
- The thread's pull_records() function is called for every five seconds.
- """
-
- def __init__(self, steps, *args, **kwargs):
- self.steps = steps
-
- def run(self):
- while True:
- time.sleep(5)
- self.run_once()
-
- def run_once(self):
- log.debug("Starting pull steps", steps=self.steps)
-
- threads = []
- for step in self.steps:
- thread = threading.Thread(target=step().pull_records, name="pull_step")
- threads.append(thread)
-
- for t in threads:
- t.start()
-
- for t in threads:
- t.join()
-
- log.debug("Done with pull steps", steps=self.steps)
-
-
-class XOSPullStepEngine:
- """ XOSPullStepEngine
-
- Load pull step modules. Two methods are defined:
-
- load_pull_step_modules(dir) ... look for step modules in the given directory, and load objects that are
- descendant from PullStep.
-
- start() ... Launch threads to handle processing of the PullSteps. It's expected that load_step_modules()
- will be called before start().
- """
-
- def __init__(self):
- self.pull_steps = []
-
- def load_pull_step_modules(self, pull_step_dir):
- self.pull_steps = []
- log.info("Loading pull steps", pull_step_dir=pull_step_dir)
-
- # NOTE we'll load all the classes that inherit from PullStep
- for fn in os.listdir(pull_step_dir):
- pathname = os.path.join(pull_step_dir, fn)
- if (
- os.path.isfile(pathname)
- and fn.endswith(".py")
- and (fn != "__init__.py")
- and ("test" not in fn)
- ):
- event_module = imp.load_source(fn[:-3], pathname)
-
- for classname in dir(event_module):
- c = getattr(event_module, classname, None)
-
- if inspect.isclass(c):
- base_names = [b.__name__ for b in c.__bases__]
- if "PullStep" in base_names:
- self.pull_steps.append(c)
- log.info("Loaded pull steps", steps=self.pull_steps)
-
- def start(self):
- log.info("Starting pull steps engine", steps=self.pull_steps)
-
- for step in self.pull_steps:
- sched = XOSPullStepScheduler(steps=self.pull_steps)
- sched.run()
diff --git a/xos/synchronizers/new_base/pullstep.py b/xos/synchronizers/new_base/pullstep.py
deleted file mode 100644
index adbc0b1..0000000
--- a/xos/synchronizers/new_base/pullstep.py
+++ /dev/null
@@ -1,33 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-class PullStep(object):
- """
- All the pull steps defined in each synchronizer needs to inherit from this class in order to be loaded
- """
-
- def __init__(self, **kwargs):
- """
- Initialize a pull step
- :param kwargs:
- -- observed_model: name of the model that is being polled
- """
- self.observed_model = kwargs.get("observed_model")
-
- def pull_records(self):
- self.log.debug(
- "There is no default pull_records, please provide a pull_records method for %s"
- % self.observed_model
- )
diff --git a/xos/synchronizers/new_base/run_ansible b/xos/synchronizers/new_base/run_ansible
deleted file mode 100755
index 662f798..0000000
--- a/xos/synchronizers/new_base/run_ansible
+++ /dev/null
@@ -1,4 +0,0 @@
-#!/bin/bash
-
-export REQUESTS_CA_BUNDLE=/usr/local/share/ca-certificates/local_certs.crt
-ansible-playbook -v "$@"
diff --git a/xos/synchronizers/new_base/run_ansible_verbose b/xos/synchronizers/new_base/run_ansible_verbose
deleted file mode 100755
index d72b12d..0000000
--- a/xos/synchronizers/new_base/run_ansible_verbose
+++ /dev/null
@@ -1,4 +0,0 @@
-#!/bin/bash
-
-export REQUESTS_CA_BUNDLE=/usr/local/share/ca-certificates/local_certs.crt
-ansible-playbook -vvv "$@"
diff --git a/xos/synchronizers/new_base/steps/__init__.py b/xos/synchronizers/new_base/steps/__init__.py
deleted file mode 100644
index 4ea5d64..0000000
--- a/xos/synchronizers/new_base/steps/__init__.py
+++ /dev/null
@@ -1,21 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-# from .sync_controller_sites import SyncControllerSites
-# from .sync_controller_slices import SyncControllerSlices
-# from .sync_controller_users import SyncControllerUsers
-# from .sync_controller_site_privileges import SyncControllerSitePrivileges
-# from .sync_controller_slice_privileges import SyncControllerSlicePrivileges
-# from .sync_controller_networks import SyncControllerNetworks
diff --git a/xos/synchronizers/new_base/steps/sync_object.py b/xos/synchronizers/new_base/steps/sync_object.py
deleted file mode 100644
index 1fb5894..0000000
--- a/xos/synchronizers/new_base/steps/sync_object.py
+++ /dev/null
@@ -1,25 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from synchronizers.new_base.syncstep import *
-
-
-class SyncObject(SyncStep):
- provides = [] # Caller fills this in
- requested_interval = 0
- observes = [] # Caller fills this in
-
- def sync_record(self, r):
- raise DeferredException("Waiting for Service dependency: %r" % r)
diff --git a/xos/synchronizers/new_base/syncstep.py b/xos/synchronizers/new_base/syncstep.py
deleted file mode 100644
index 8c92f71..0000000
--- a/xos/synchronizers/new_base/syncstep.py
+++ /dev/null
@@ -1,158 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import os
-import base64
-
-from xosconfig import Config
-from synchronizers.new_base.modelaccessor import *
-from synchronizers.new_base.ansible_helper import run_template
-
-# from tests.steps.mock_modelaccessor import model_accessor
-
-import json
-import time
-import pdb
-
-from xosconfig import Config
-from functools import reduce
-
-
-def f7(seq):
- seen = set()
- seen_add = seen.add
- return [x for x in seq if not (x in seen or seen_add(x))]
-
-
-def elim_dups(backend_str):
- strs = backend_str.split(" // ")
- strs2 = f7(strs)
- return " // ".join(strs2)
-
-
-def deepgetattr(obj, attr):
- return reduce(getattr, attr.split("."), obj)
-
-
-def obj_class_name(obj):
- return getattr(obj, "model_name", obj.__class__.__name__)
-
-
-class InnocuousException(Exception):
- pass
-
-
-class DeferredException(Exception):
- pass
-
-
-class FailedDependency(Exception):
- pass
-
-
-class SyncStep(object):
- """ An XOS Sync step.
-
- Attributes:
- psmodel Model name the step synchronizes
- dependencies list of names of models that must be synchronized first if the current model depends on them
- """
-
- # map_sync_outputs can return this value to cause a step to be marked
- # successful without running ansible. Used for sync_network_controllers
- # on nat networks.
- SYNC_WITHOUT_RUNNING = "sync_without_running"
-
- slow = False
-
- def get_prop(self, prop):
- # NOTE config_dir is never define, is this used?
- sync_config_dir = Config.get("config_dir")
- prop_config_path = "/".join(sync_config_dir, self.name, prop)
- return open(prop_config_path).read().rstrip()
-
- def __init__(self, **args):
- """Initialize a sync step
- Keyword arguments:
- name -- Name of the step
- provides -- XOS models sync'd by this step
- """
- dependencies = []
- self.driver = args.get("driver")
- self.error_map = args.get("error_map")
-
- try:
- self.soft_deadline = int(self.get_prop("soft_deadline_seconds"))
- except BaseException:
- self.soft_deadline = 5 # 5 seconds
-
- if "log" in args:
- self.log = args.get("log")
-
- return
-
- def fetch_pending(self, deletion=False):
- # This is the most common implementation of fetch_pending
- # Steps should override it if they have their own logic
- # for figuring out what objects are outstanding.
-
- return model_accessor.fetch_pending(self.observes, deletion)
-
- def sync_record(self, o):
- self.log.debug("In default sync record", **o.tologdict())
-
- tenant_fields = self.map_sync_inputs(o)
- if tenant_fields == SyncStep.SYNC_WITHOUT_RUNNING:
- return
-
- main_objs = self.observes
- if isinstance(main_objs, list):
- main_objs = main_objs[0]
-
- path = "".join(main_objs.__name__).lower()
- res = run_template(self.playbook, tenant_fields, path=path, object=o)
-
- if hasattr(self, "map_sync_outputs"):
- self.map_sync_outputs(o, res)
-
- self.log.debug("Finished default sync record", **o.tologdict())
-
- def delete_record(self, o):
- self.log.debug("In default delete record", **o.tologdict())
-
- # If there is no map_delete_inputs, then assume deleting a record is a no-op.
- if not hasattr(self, "map_delete_inputs"):
- return
-
- tenant_fields = self.map_delete_inputs(o)
-
- main_objs = self.observes
- if isinstance(main_objs, list):
- main_objs = main_objs[0]
-
- path = "".join(main_objs.__name__).lower()
-
- tenant_fields["delete"] = True
- res = run_template(self.playbook, tenant_fields, path=path)
-
- if hasattr(self, "map_delete_outputs"):
- self.map_delete_outputs(o, res)
- else:
- # "rc" is often only returned when something bad happens, so assume that no "rc" implies a successful rc
- # of 0.
- if res[0].get("rc", 0) != 0:
- raise Exception("Nonzero rc from Ansible during delete_record")
-
- self.log.debug("Finished default delete record", **o.tologdict())
diff --git a/xos/synchronizers/new_base/templates/container.conf.j2 b/xos/synchronizers/new_base/templates/container.conf.j2
deleted file mode 100644
index a912a85..0000000
--- a/xos/synchronizers/new_base/templates/container.conf.j2
+++ /dev/null
@@ -1,28 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# Upstart script for container
-description "container"
-author "smbaker@gmail.com"
-start on filesystem and started docker
-stop on runlevel [!2345]
-respawn
-
-script
- /usr/local/sbin/start-container-{{ container_name }}.sh ATTACH
-end script
-
-post-stop script
- /usr/local/sbin/stop-container-{{ container_name }}.sh
-end script
\ No newline at end of file
diff --git a/xos/synchronizers/new_base/templates/container.service.j2 b/xos/synchronizers/new_base/templates/container.service.j2
deleted file mode 100644
index 22f1a43..0000000
--- a/xos/synchronizers/new_base/templates/container.service.j2
+++ /dev/null
@@ -1,25 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-[Unit]
-Description={{ container_name }}
-After=docker.service
-
-[Service]
-ExecStart=/bin/bash -c "/usr/local/sbin/start-container-{{ container_name }}.sh ATTACH"
-ExecStop=/bin/bash -c "/usr/local/sbin/stop-container-{{ container_name }}.sh"
-SuccessExitStatus=0 137
-
-[Install]
-WantedBy=multi-user.target
diff --git a/xos/synchronizers/new_base/templates/start-container.sh.j2 b/xos/synchronizers/new_base/templates/start-container.sh.j2
deleted file mode 100644
index feb4f04..0000000
--- a/xos/synchronizers/new_base/templates/start-container.sh.j2
+++ /dev/null
@@ -1,150 +0,0 @@
-#!/bin/bash
-
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-iptables -L > /dev/null
-ip6tables -L > /dev/null
-
-CONTAINER={{ container_name }}
-IMAGE={{ docker_image }}
-
-function mac_to_iface {
- PARENT_MAC=$1
- ifconfig|grep $PARENT_MAC| awk '{print $1}'|grep -v '\.'
-}
-
-function encapsulate_stag {
- LAN_IFACE=$1
- STAG=$2
- ifconfig $LAN_IFACE >> /dev/null
- if [ "$?" == 0 ]; then
- STAG_IFACE=$LAN_IFACE.$STAG
- ifconfig $LAN_IFACE up
- ifconfig $STAG_IFACE
- if [ "$?" == 0 ]; then
- echo $STAG_IFACE is already created
- else
- ifconfig $STAG_IFACE >> /dev/null || ip link add link $LAN_IFACE name $STAG_IFACE type vlan id $STAG
- fi
- ifconfig $STAG_IFACE up
- else
- echo There is no $LAN_IFACE. Aborting.
- exit -1
- fi
-}
-
-
-{% if volumes %}
-{% for volume in volumes %}
-DEST_DIR=/var/container_volumes/$CONTAINER/{{ volume }}
-mkdir -p $DEST_DIR
-VOLUME_ARGS="$VOLUME_ARGS -v $DEST_DIR:{{ volume }}"
-{% endfor %}
-{% endif %}
-
-docker inspect $CONTAINER > /dev/null 2>&1
-if [ "$?" == 1 ]
-then
- docker pull $IMAGE
-{% if network_method=="host" %}
- docker run -d --name=$CONTAINER --privileged=true --net=host $VOLUME_ARGS $IMAGE
-{% elif network_method=="bridged" %}
- docker run -d --name=$CONTAINER --privileged=true --net=bridge $VOLUME_ARGS $IMAGE
-{% else %}
- docker run -d --name=$CONTAINER --privileged=true --net=none $VOLUME_ARGS $IMAGE
-{% endif %}
-else
- docker start $CONTAINER
-fi
-
-{% if ports %}
-{% for port in ports %}
-
-{% if port.next_hop %}
-NEXTHOP_ARG="@{{ port.next_hop }}"
-{% else %}
-NEXTHOP_ARG=""
-{% endif %}
-
-{% if port.c_tag %}
-CTAG_ARG="@{{ port.c_tag }}"
-{% else %}
-CTAG_ARG=""
-{% endif %}
-
-{% if port.parent_mac %}
-# container-in-VM
-SRC_DEV=$( mac_to_iface "{{ port.parent_mac }}" )
-CMD="docker exec $CONTAINER ifconfig $SRC_DEV >> /dev/null || pipework $SRC_DEV -i {{ port.device }} $CONTAINER {{ port.ip }}/24$NEXTHOP_ARG {{ port.mac }} $CTAG_ARG"
-echo $CMD
-eval $CMD
-
-{% else %}
-# container-on-metal
-IP="{{ port.ip }}"
-{% if port.mac %}
-MAC="{{ port.mac }}"
-{% else %}
-MAC=""
-{% endif %}
-
-DEVICE="{{ port.device }}"
-BRIDGE="{{ port.bridge }}"
-{% if port.s_tag %}
-# This is intended for lan_network. Assume that BRIDGE is set to br_lan. We
-# create a device that strips off the S-TAG.
-STAG="{{ port.s_tag }}"
-encapsulate_stag $BRIDGE $STAG
-SRC_DEV=$STAG_IFACE
-{% else %}
-# This is for a standard neutron private network. We use a donor VM to setup
-# openvswitch for us, and we snoop at its devices and create a tap using the
-# same settings.
-XOS_NETWORK_ID="{{ port.xos_network_id }}"
-INSTANCE_MAC="{{ port.snoop_instance_mac }}"
-INSTANCE_ID="{{ port.snoop_instance_id }}"
-INSTANCE_TAP=`virsh domiflist $INSTANCE_ID | grep -i $INSTANCE_MAC | awk '{print $1}'`
-INSTANCE_TAP=${INSTANCE_TAP:3}
-VLAN_ID=`ovs-vsctl show | grep -i -A 1 port.*$INSTANCE_TAP | grep -i tag | awk '{print $2}'`
-# One tap for all containers per XOS/neutron network. Included the VLAN_ID in the
-# hash, to cover the case where XOS is reinstalled and the XOS network ids
-# get reused.
-TAP="con`echo ${XOS_NETWORK_ID}_$VLAN_ID|md5sum|awk '{print $1}'`"
-TAP=${TAP:0:10}
-echo im=$INSTANCE_MAC ii=$INSTANCE_ID it=$INSTANCE_TAP vlan=$VLAN_ID tap=$TAP con=$CONTAINER dev=$DEVICE mac=$MAC
-ovs-vsctl show | grep -i $TAP
-if [[ $? == 1 ]]; then
- echo creating tap
- ovs-vsctl add-port $BRIDGE $TAP tag=$VLAN_ID -- set interface $TAP type=internal
-else
- echo tap exists
-fi
-SRC_DEV=$TAP
-{% endif %}
-
-CMD="docker exec $CONTAINER ifconfig $DEVICE >> /dev/null || pipework $SRC_DEV -i $DEVICE $CONTAINER $IP/24$NEXTHOP_ARG $MAC $CTAG_ARG"
-echo $CMD
-eval $CMD
-{% endif %}
-{% endfor %}
-{% endif %}
-
-# Attach to container
-# (this is only done when using upstart, since upstart expects to be attached
-# to a running service)
-if [[ "$1" == "ATTACH" ]]; then
- docker start -a $CONTAINER
-fi
-
diff --git a/xos/synchronizers/new_base/templates/stop-container.sh.j2 b/xos/synchronizers/new_base/templates/stop-container.sh.j2
deleted file mode 100644
index bfe4b71..0000000
--- a/xos/synchronizers/new_base/templates/stop-container.sh.j2
+++ /dev/null
@@ -1,21 +0,0 @@
-#!/bin/bash
-
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-CONTAINER={{ container_name }}
-
-docker stop $CONTAINER
-docker rm $CONTAINER
diff --git a/xos/synchronizers/new_base/tests/__init__.py b/xos/synchronizers/new_base/tests/__init__.py
deleted file mode 100644
index b0fb0b2..0000000
--- a/xos/synchronizers/new_base/tests/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
diff --git a/xos/synchronizers/new_base/tests/event_steps/event_step.py b/xos/synchronizers/new_base/tests/event_steps/event_step.py
deleted file mode 100644
index 1fa47e1..0000000
--- a/xos/synchronizers/new_base/tests/event_steps/event_step.py
+++ /dev/null
@@ -1,29 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from __future__ import print_function
-from synchronizers.new_base.eventstep import EventStep
-from mock_modelaccessor import *
-
-
-class TestEventStep(EventStep):
- technology = "kafka"
- topics = ["sometopic"]
- pattern = None
-
- def __init__(self, log, *args, **kwargs):
- super(TestEventStep, self).__init__(log, *args, **kwargs)
-
- def process_event(self, event):
- print("received an event", event)
diff --git a/xos/synchronizers/new_base/tests/model-deps b/xos/synchronizers/new_base/tests/model-deps
deleted file mode 100644
index 247a190..0000000
--- a/xos/synchronizers/new_base/tests/model-deps
+++ /dev/null
@@ -1,656 +0,0 @@
-{
-
-
- "User": [
-
-
-
-
-
-
-
-
-
-
-
-
-
- ["ControllerUser", "controllerusers", "user"],
-
-
- ["Site", "site", "users"],
- ["DashboardView", "dashboards", "user"]
-
- ],
-
- "Privilege": [
-
-
-
-
-
-
-
-
-
-
-
-
-
- ["ControllerPrivilege", "controllerprivileges", "privilege"]
-
-
-
- ],
-
- "AddressPool": [
-
-
-
-
-
- ["Service", "service", "addresspools"]
-
- ],
-
-
- "ControllerDashboardView": [
-
-
-
-
-
- ["Controller", "controller", "controllerdashboardviews"],
- ["DashboardView", "dashboardView", "controllerdashboardviews"]
-
- ],
-
- "ControllerImages": [
-
-
-
-
-
- ["Image", "image", "controllerimages"],
- ["Controller", "controller", "controllerimages"]
-
- ],
-
- "ControllerNetwork": [
-
-
-
-
-
- ["Network", "network", "controllernetworks"],
- ["Controller", "controller", "controllernetworks"]
-
- ],
-
- "ControllerRole": [
-
-
-
-
-
-
- ],
-
- "ControllerSite": [
-
-
-
-
-
- ["Site", "site", "controllersite"],
- ["Controller", "controller", "controllersite"]
-
- ],
-
- "ControllerPrivilege": [
-
-
-
-
-
- ["Controller", "controller", "controllerprivileges"],
- ["Privilege", "privilege", "controllerprivileges"]
-
- ],
-
- "ControllerSitePrivilege": [
-
-
-
-
-
- ["Controller", "controller", "controllersiteprivileges"],
- ["SitePrivilege", "site_privilege", "controllersiteprivileges"]
-
- ],
-
- "ControllerSlice": [
-
-
-
-
-
- ["Controller", "controller", "controllerslices"],
- ["Slice", "slice", "controllerslices"]
-
- ],
-
- "ControllerSlicePrivilege": [
-
-
-
-
-
- ["Controller", "controller", "controllersliceprivileges"],
- ["SlicePrivilege", "slice_privilege", "controllersliceprivileges"]
-
- ],
-
- "ControllerUser": [
-
-
-
-
-
- ["User", "user", "controllerusers"],
- ["Controller", "controller", "controllersusers"]
-
- ],
-
- "DashboardView": [
-
-
-
-
-
-
-
-
-
-
-
-
-
- ["ControllerDashboardView", "controllerdashboardviews", "dashboardView"],
-
-
- ["Controller", "controllers", "dashboardviews"],
- ["Deployment", "deployments", "dashboardviews"]
-
- ],
-
- "Deployment": [
-
-
-
-
-
-
- ],
-
- "DeploymentPrivilege": [
-
-
-
-
-
- ["User", "user", "deploymentprivileges"],
- ["Deployment", "deployment", "deploymentprivileges"],
- ["DeploymentRole", "role", "deploymentprivileges"]
-
- ],
-
- "DeploymentRole": [
-
-
-
-
-
-
- ],
-
- "Flavor": [
-
-
-
-
-
-
- ],
-
- "Image": [
-
-
-
-
-
-
-
-
-
-
-
-
-
- ["ControllerImages", "controllerimages", "image"]
-
-
-
- ],
-
- "ImageDeployments": [
-
-
-
-
-
- ["Image", "image", "imagedeployments"],
- ["Deployment", "deployment", "imagedeployments"]
-
- ],
-
- "Instance": [
-
-
-
-
-
- ["Image", "image", "instances"],
- ["User", "creator", "instances"],
- ["Slice", "slice", "instances"],
- ["Deployment", "deployment", "instance_deployment"],
- ["Node", "node", "instances"],
- ["Flavor", "flavor", "instance"],
- ["Instance", "parent", "instance"]
-
- ],
-
- "Network": [
-
-
-
-
-
-
-
-
-
-
-
-
-
- ["ControllerNetwork", "controllernetworks", "network"],
-
-
- ["NetworkTemplate", "template", "network"],
- ["Slice", "owner", "ownedNetworks"],
- ["Slice", "permitted_slices", "availableNetworks"]
- ],
-
- "NetworkParameter": [
-
-
-
-
-
- ["NetworkParameterType", "parameter", "networkparameters"]
-
- ],
-
- "NetworkParameterType": [
-
-
-
-
-
-
- ],
-
- "NetworkSlice": [
-
-
-
-
-
- ["Network", "network", "networkslices"],
- ["Slice", "slice", "networkslices"]
-
- ],
-
- "NetworkTemplate": [
-
-
-
-
-
-
- ],
-
- "Node": [
-
-
-
-
-
- ["SiteDeployment", "site_deployment", "nodes"]
-
- ],
-
- "NodeLabel": [
-
-
-
-
-
- ["Node", "node", "nodelabels"]
-
- ],
-
- "Port": [
-
-
-
-
-
- ["Network", "network", "links"],
- ["Instance", "instance", "ports"]
-
- ],
-
- "Role": [
-
-
-
-
-
-
-
-
-
-
-
- ],
-
- "Service": [
-
-
-
-
-
-
- ],
-
- "ServiceAttribute": [
-
-
-
-
-
- ["Service", "service", "serviceattributes"]
-
- ],
-
- "ServiceDependency": [
-
-
-
-
-
- ["Service", "provider_service", "provided_dependencies"],
- ["Service", "subscriber_service", "subscribed_dependencies"]
-
- ],
-
- "ServiceMonitoringAgentInfo": [
-
-
-
-
-
- ["Service", "service", "servicemonitoringagents"]
-
- ],
-
- "ServicePrivilege": [
-
-
-
-
-
- ["User", "user", "serviceprivileges"],
- ["Service", "service", "serviceprivileges"],
- ["ServiceRole", "role", "serviceprivileges"]
-
- ],
-
- "ServiceRole": [
-
-
-
-
-
-
- ],
-
- "Site": [
-
-
-
-
-
-
-
-
-
-
-
-
-
- ["ControllerSite", "controllersite", "site"],
-
-
- ["Deployment", "deployments", "sites"]
-
- ],
-
- "SiteDeployment": [
-
-
-
-
-
- ["Site", "site", "sitedeployments"],
- ["Deployment", "deployment", "sitedeployments"],
- ["Controller", "controller", "sitedeployments"]
-
- ],
-
- "SitePrivilege": [
-
-
-
-
-
-
-
-
-
-
-
-
-
- ["ControllerSitePrivilege", "controllersiteprivileges", "site_privilege"],
-
-
- ["User", "user", "siteprivileges"],
- ["Site", "site", "siteprivileges"],
- ["SiteRole", "role", "siteprivileges"]
-
- ],
-
- "SiteRole": [
-
-
-
-
-
-
- ],
-
- "Slice": [
-
-
-
-
-
-
-
-
-
-
-
-
-
- ["ControllerSlice", "controllerslices", "slice"],
-
-
- ["Site", "site", "slices"],
- ["Service", "service", "slices"],
- ["User", "creator", "slices"],
- ["Flavor", "default_flavor", "slices"],
- ["Image", "default_image", "slices"],
- ["Node", "default_node", "slices"]
-
- ],
-
- "SlicePrivilege": [
-
-
-
-
-
-
-
-
-
-
-
-
-
- ["ControllerSlicePrivilege", "controllersliceprivileges", "slice_privilege"],
-
-
- ["User", "user", "sliceprivileges"],
- ["Slice", "slice", "sliceprivileges"],
- ["SliceRole", "role", "sliceprivileges"]
-
- ],
-
- "SliceRole": [
-
-
-
-
-
-
- ],
-
- "Tag": [
-
-
-
-
-
- ["Service", "service", "tags"]
-
- ],
-
- "InterfaceType": [
-
-
-
-
-
-
- ],
-
- "ServiceInterface": [
-
-
-
-
-
- ["Service", "service", "service_interfaces"],
- ["InterfaceType", "interface_type", "service_interfaces"]
-
- ],
-
- "ServiceInstance": [
-
-
-
-
-
- ["Service", "owner", "service_instances"]
-
- ],
-
- "ServiceInstanceLink": [
-
-
-
-
-
- ["ServiceInstance", "provider_service_instance", "provided_links"],
- ["ServiceInterface", "provider_service_interface", "provided_links"],
- ["ServiceInstance", "subscriber_service_instance", "subscribed_links"],
- ["Service", "subscriber_service", "subscribed_links"],
- ["Network", "subscriber_network", "subscribed_links"]
-
- ],
-
- "ServiceInstanceAttribute": [
-
-
-
-
-
- ["ServiceInstance", "service_instance", "service_instance_attributes"]
-
- ],
-
- "TenantWithContainer": [
-
-
-
-
-
- ["Service", "owner", "service_instances"],
- ["Instance", "instance", "+"],
- ["User", "creator", "+"]
-
- ],
-
- "XOS": [
-
-
-
-
-
-
- ],
-
- "XOSGuiExtension": [
-
-
-
-
-
-
- ]
-}
diff --git a/xos/synchronizers/new_base/tests/model-deps-onos b/xos/synchronizers/new_base/tests/model-deps-onos
deleted file mode 100644
index 03d9c9f..0000000
--- a/xos/synchronizers/new_base/tests/model-deps-onos
+++ /dev/null
@@ -1,5 +0,0 @@
-{
- "ONOSApp": [
- ["ONOSService", "", ""]
- ]
-}
diff --git a/xos/synchronizers/new_base/tests/pull_steps/__init__.py b/xos/synchronizers/new_base/tests/pull_steps/__init__.py
deleted file mode 100644
index b0fb0b2..0000000
--- a/xos/synchronizers/new_base/tests/pull_steps/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
diff --git a/xos/synchronizers/new_base/tests/pull_steps/pull_step.py b/xos/synchronizers/new_base/tests/pull_steps/pull_step.py
deleted file mode 100644
index 0f29433..0000000
--- a/xos/synchronizers/new_base/tests/pull_steps/pull_step.py
+++ /dev/null
@@ -1,21 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from synchronizers.new_base.pullstep import PullStep
-from mock_modelaccessor import *
-
-
-class TestPullStep(PullStep):
- def __init__(self):
- super(TestPullStep, self).__init__(observed_model=Instance)
diff --git a/xos/synchronizers/new_base/tests/steps/__init__.py b/xos/synchronizers/new_base/tests/steps/__init__.py
deleted file mode 100644
index b0fb0b2..0000000
--- a/xos/synchronizers/new_base/tests/steps/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
diff --git a/xos/synchronizers/new_base/tests/steps/sync_container.py b/xos/synchronizers/new_base/tests/steps/sync_container.py
deleted file mode 100644
index 51bf872..0000000
--- a/xos/synchronizers/new_base/tests/steps/sync_container.py
+++ /dev/null
@@ -1,56 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import hashlib
-import os
-import socket
-import sys
-import base64
-import time
-from synchronizers.new_base.SyncInstanceUsingAnsible import SyncInstanceUsingAnsible
-from synchronizers.new_base.syncstep import DeferredException
-from synchronizers.new_base.ansible_helper import run_template_ssh
-from mock_modelaccessor import *
-from synchronizers.new_base.syncstep import SyncStep
-
-# hpclibrary will be in steps/..
-parentdir = os.path.join(os.path.dirname(__file__), "..")
-sys.path.insert(0, parentdir)
-
-
-class SyncContainer(SyncInstanceUsingAnsible):
- provides = [Instance]
- observes = Instance
- template_name = "sync_container.yaml"
-
- def __init__(self, *args, **kwargs):
- super(SyncContainer, self).__init__(*args, **kwargs)
-
- def fetch_pending(self, deletion=False):
- i = Instance()
- i.name = "Spectacular Sponge"
- j = Instance()
- j.name = "Spontaneous Tent"
- k = Instance()
- k.name = "Embarrassed Cat"
-
- objs = [i, j, k]
- return objs
-
- def sync_record(self, o):
- pass
-
- def delete_record(self, o):
- pass
diff --git a/xos/synchronizers/new_base/tests/steps/sync_controller_images.py b/xos/synchronizers/new_base/tests/steps/sync_controller_images.py
deleted file mode 100644
index 5545c74..0000000
--- a/xos/synchronizers/new_base/tests/steps/sync_controller_images.py
+++ /dev/null
@@ -1,56 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import os
-import base64
-from synchronizers.new_base.syncstep import *
-from synchronizers.new_base.ansible_helper import *
-from mock_modelaccessor import *
-
-
-class SyncControllerImages(SyncStep):
- provides = [ControllerImages]
- observes = ControllerImages
- requested_interval = 0
- playbook = "sync_controller_images.yaml"
-
- def fetch_pending(self, deleted):
- ci = ControllerImages()
- i = Image()
- i.name = "Lush Loss"
- ci.i = i
- return [ci]
-
- def map_sync_inputs(self, controller_image):
- image_fields = {
- "endpoint": controller_image.controller.auth_url,
- "endpoint_v3": controller_image.controller.auth_url_v3,
- "admin_user": controller_image.controller.admin_user,
- "admin_password": controller_image.controller.admin_password,
- "domain": controller_image.controller.domain,
- "name": controller_image.image.name,
- "filepath": controller_image.image.path,
- # name of ansible playbook
- "ansible_tag": "%s@%s"
- % (controller_image.image.name, controller_image.controller.name),
- }
-
- return image_fields
-
- def map_sync_outputs(self, controller_image, res):
- image_id = res[0]["id"]
- controller_image.glance_image_id = image_id
- controller_image.backend_status = "1 - OK"
- controller_image.save()
diff --git a/xos/synchronizers/new_base/tests/steps/sync_controller_networks.py b/xos/synchronizers/new_base/tests/steps/sync_controller_networks.py
deleted file mode 100644
index c81ab6a..0000000
--- a/xos/synchronizers/new_base/tests/steps/sync_controller_networks.py
+++ /dev/null
@@ -1,62 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import os
-import base64
-import struct
-import socket
-from netaddr import IPAddress, IPNetwork
-from synchronizers.new_base.syncstep import *
-from synchronizers.new_base.ansible_helper import *
-from mock_modelaccessor import *
-
-
-class SyncControllerNetworks(SyncStep):
- requested_interval = 0
- provides = [Network]
- observes = ControllerNetwork
- external_dependencies = [User]
- playbook = "sync_controller_networks.yaml"
-
- def fetch_pending(self, deleted):
- ci = ControllerNetwork()
- i = Network()
- i.name = "Lush Loss"
- s = Slice()
- s.name = "Ghastly Notebook"
- i.owner = s
- ci.i = i
- return [ci]
-
- def map_sync_outputs(self, controller_network, res):
- network_id = res[0]["network"]["id"]
- subnet_id = res[1]["subnet"]["id"]
- controller_network.net_id = network_id
- controller_network.subnet = self.cidr
- controller_network.subnet_id = subnet_id
- controller_network.backend_status = "1 - OK"
- if not controller_network.segmentation_id:
- controller_network.segmentation_id = str(
- self.get_segmentation_id(controller_network)
- )
- controller_network.save()
-
- def map_sync_inputs(self, controller_network):
- pass
-
- def map_delete_inputs(self, controller_network):
- network_fields = {"endpoint": None, "delete": True}
-
- return network_fields
diff --git a/xos/synchronizers/new_base/tests/steps/sync_controller_site_privileges.py b/xos/synchronizers/new_base/tests/steps/sync_controller_site_privileges.py
deleted file mode 100644
index 5f4e50f..0000000
--- a/xos/synchronizers/new_base/tests/steps/sync_controller_site_privileges.py
+++ /dev/null
@@ -1,109 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import os
-import base64
-import json
-from synchronizers.new_base.syncstep import *
-from synchronizers.new_base.ansible_helper import *
-from mock_modelaccessor import *
-
-
-class SyncControllerSitePrivileges(SyncStep):
- provides = [SitePrivilege]
- requested_interval = 0
- observes = ControllerSitePrivilege
- playbook = "sync_controller_users.yaml"
-
- def map_sync_inputs(self, controller_site_privilege):
- controller_register = json.loads(
- controller_site_privilege.controller.backend_register
- )
- if not controller_site_privilege.controller.admin_user:
- return
-
- roles = [controller_site_privilege.site_privilege.role.role]
- # setup user home site roles at controller
- if not controller_site_privilege.site_privilege.user.site:
- raise Exception(
- "Siteless user %s" % controller_site_privilege.site_privilege.user.email
- )
- else:
- # look up tenant id for the user's site at the controller
- # ctrl_site_deployments = SiteDeployment.objects.filter(
- # site_deployment__site=controller_site_privilege.user.site,
- # controller=controller_site_privilege.controller)
-
- # if ctrl_site_deployments:
- # # need the correct tenant id for site at the controller
- # tenant_id = ctrl_site_deployments[0].tenant_id
- # tenant_name = ctrl_site_deployments[0].site_deployment.site.login_base
- user_fields = {
- "endpoint": controller_site_privilege.controller.auth_url,
- "endpoint_v3": controller_site_privilege.controller.auth_url_v3,
- "domain": controller_site_privilege.controller.domain,
- "name": controller_site_privilege.site_privilege.user.email,
- "email": controller_site_privilege.site_privilege.user.email,
- "password": controller_site_privilege.site_privilege.user.remote_password,
- "admin_user": controller_site_privilege.controller.admin_user,
- "admin_password": controller_site_privilege.controller.admin_password,
- "ansible_tag": "%s@%s"
- % (
- controller_site_privilege.site_privilege.user.email.replace(
- "@", "-at-"
- ),
- controller_site_privilege.controller.name,
- ),
- "admin_tenant": controller_site_privilege.controller.admin_tenant,
- "roles": roles,
- "tenant": controller_site_privilege.site_privilege.site.login_base,
- }
-
- return user_fields
-
- def map_sync_outputs(self, controller_site_privilege, res):
- # results is an array in which each element corresponds to an
- # "ok" string received per operation. If we get as many oks as
- # the number of operations we issued, that means a grand success.
- # Otherwise, the number of oks tell us which operation failed.
- controller_site_privilege.role_id = res[0]["id"]
- controller_site_privilege.save()
-
- def delete_record(self, controller_site_privilege):
- controller_register = json.loads(
- controller_site_privilege.controller.backend_register
- )
- if controller_register.get("disabled", False):
- raise InnocuousException(
- "Controller %s is disabled" % controller_site_privilege.controller.name
- )
-
- if controller_site_privilege.role_id:
- driver = self.driver.admin_driver(
- controller=controller_site_privilege.controller
- )
- user = ControllerUser.objects.get(
- controller=controller_site_privilege.controller,
- user=controller_site_privilege.site_privilege.user,
- )
- site = ControllerSite.objects.get(
- controller=controller_site_privilege.controller,
- user=controller_site_privilege.site_privilege.user,
- )
- driver.delete_user_role(
- user.kuser_id,
- site.tenant_id,
- controller_site_privilege.site_prvilege.role.role,
- )
diff --git a/xos/synchronizers/new_base/tests/steps/sync_controller_sites.py b/xos/synchronizers/new_base/tests/steps/sync_controller_sites.py
deleted file mode 100644
index 5da9ca7..0000000
--- a/xos/synchronizers/new_base/tests/steps/sync_controller_sites.py
+++ /dev/null
@@ -1,90 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import os
-import base64
-from synchronizers.new_base.syncstep import *
-from synchronizers.new_base.ansible_helper import *
-import json
-from mock_modelaccessor import *
-
-
-class SyncControllerSites(SyncStep):
- requested_interval = 0
- provides = [Site]
- observes = ControllerSite
- playbook = "sync_controller_sites.yaml"
-
- def fetch_pending(self, deleted=False):
- lobjs = super(SyncControllerSites, self).fetch_pending(deleted)
-
- if not deleted:
- # filter out objects with null controllers
- lobjs = [x for x in lobjs if x.controller]
-
- return lobjs
-
- def map_sync_inputs(self, controller_site):
- tenant_fields = {
- "endpoint": controller_site.controller.auth_url,
- "endpoint_v3": controller_site.controller.auth_url_v3,
- "domain": controller_site.controller.domain,
- "admin_user": controller_site.controller.admin_user,
- "admin_password": controller_site.controller.admin_password,
- "admin_tenant": controller_site.controller.admin_tenant,
- # name of ansible playbook
- "ansible_tag": "%s@%s"
- % (controller_site.site.login_base, controller_site.controller.name),
- "tenant": controller_site.site.login_base,
- "tenant_description": controller_site.site.name,
- }
- return tenant_fields
-
- def map_sync_outputs(self, controller_site, res):
- controller_site.tenant_id = res[0]["id"]
- controller_site.backend_status = "1 - OK"
- controller_site.save()
-
- def delete_record(self, controller_site):
- controller_register = json.loads(controller_site.controller.backend_register)
- if controller_register.get("disabled", False):
- raise InnocuousException(
- "Controller %s is disabled" % controller_site.controller.name
- )
-
- if controller_site.tenant_id:
- driver = self.driver.admin_driver(controller=controller_site.controller)
- driver.delete_tenant(controller_site.tenant_id)
-
- """
- Ansible does not support tenant deletion yet
-
- import pdb
- pdb.set_trace()
- template = os_template_env.get_template('delete_controller_sites.yaml')
- tenant_fields = {'endpoint':controller_site.controller.auth_url,
- 'admin_user': controller_site.controller.admin_user,
- 'admin_password': controller_site.controller.admin_password,
- 'admin_tenant': 'admin',
- 'ansible_tag': 'controller_sites/%s@%s'%(controller_site.controller_site.site.login_base,controller_site.controller_site.deployment.name), # name of ansible playbook
- 'tenant': controller_site.controller_site.site.login_base,
- 'delete': True}
-
- rendered = template.render(tenant_fields)
- res = run_template('sync_controller_sites.yaml', tenant_fields)
-
- if (len(res)!=1):
- raise Exception('Could not assign roles for user %s'%tenant_fields['tenant'])
- """
diff --git a/xos/synchronizers/new_base/tests/steps/sync_controller_slice_privileges.py b/xos/synchronizers/new_base/tests/steps/sync_controller_slice_privileges.py
deleted file mode 100644
index 95a0aad..0000000
--- a/xos/synchronizers/new_base/tests/steps/sync_controller_slice_privileges.py
+++ /dev/null
@@ -1,97 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import os
-import base64
-import json
-from synchronizers.new_base.ansible_helper import *
-from mock_modelaccessor import *
-import syncstep
-
-
-class SyncControllerSlicePrivileges(syncstep.SyncStep):
- provides = [SlicePrivilege]
- requested_interval = 0
- observes = ControllerSlicePrivilege
- playbook = "sync_controller_users.yaml"
-
- def map_sync_inputs(self, controller_slice_privilege):
- if not controller_slice_privilege.controller.admin_user:
- return
-
- template = os_template_env.get_template("sync_controller_users.yaml")
- roles = [controller_slice_privilege.slice_privilege.role.role]
- # setup user home slice roles at controller
- if not controller_slice_privilege.slice_privilege.user.site:
- raise Exception(
- "Sliceless user %s"
- % controller_slice_privilege.slice_privilege.user.email
- )
- else:
- user_fields = {
- "endpoint": controller_slice_privilege.controller.auth_url,
- "endpoint_v3": controller_slice_privilege.controller.auth_url_v3,
- "domain": controller_slice_privilege.controller.domain,
- "name": controller_slice_privilege.slice_privilege.user.email,
- "email": controller_slice_privilege.slice_privilege.user.email,
- "password": controller_slice_privilege.slice_privilege.user.remote_password,
- "admin_user": controller_slice_privilege.controller.admin_user,
- "admin_password": controller_slice_privilege.controller.admin_password,
- "ansible_tag": "%s@%s@%s"
- % (
- controller_slice_privilege.slice_privilege.user.email.replace(
- "@", "-at-"
- ),
- controller_slice_privilege.slice_privilege.slice.name,
- controller_slice_privilege.controller.name,
- ),
- "admin_tenant": controller_slice_privilege.controller.admin_tenant,
- "roles": roles,
- "tenant": controller_slice_privilege.slice_privilege.slice.name,
- }
- return user_fields
-
- def map_sync_outputs(self, controller_slice_privilege, res):
- controller_slice_privilege.role_id = res[0]["id"]
- controller_slice_privilege.save()
-
- def delete_record(self, controller_slice_privilege):
- controller_register = json.loads(
- controller_slice_privilege.controller.backend_register
- )
- if controller_register.get("disabled", False):
- raise InnocuousException(
- "Controller %s is disabled" % controller_slice_privilege.controller.name
- )
-
- if controller_slice_privilege.role_id:
- driver = self.driver.admin_driver(
- controller=controller_slice_privilege.controller
- )
- user = ControllerUser.objects.filter(
- controller_id=controller_slice_privilege.controller.id,
- user_id=controller_slice_privilege.slice_privilege.user.id,
- )
- user = user[0]
- slice = ControllerSlice.objects.filter(
- controller_id=controller_slice_privilege.controller.id,
- user_id=controller_slice_privilege.slice_privilege.user.id,
- )
- slice = slice[0]
- driver.delete_user_role(
- user.kuser_id,
- slice.tenant_id,
- controller_slice_privilege.slice_prvilege.role.role,
- )
diff --git a/xos/synchronizers/new_base/tests/steps/sync_controller_slices.py b/xos/synchronizers/new_base/tests/steps/sync_controller_slices.py
deleted file mode 100644
index 929dd1c..0000000
--- a/xos/synchronizers/new_base/tests/steps/sync_controller_slices.py
+++ /dev/null
@@ -1,48 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import os
-import base64
-import syncstep
-from synchronizers.new_base.ansible_helper import *
-from mock_modelaccessor import *
-
-
-class SyncControllerSlices(syncstep.SyncStep):
- provides = [Slice]
- requested_interval = 0
- observes = ControllerSlice
- playbook = "sync_controller_slices.yaml"
-
- def map_sync_inputs(self, controller_slice):
- if getattr(controller_slice, "force_fail", None):
- raise Exception("Forced failure")
- elif getattr(controller_slice, "force_defer", None):
- raise syncstep.DeferredException("Forced defer")
-
- tenant_fields = {"endpoint": "endpoint", "name": "Flagrant Haircut"}
-
- return tenant_fields
-
- def map_sync_outputs(self, controller_slice, res):
- controller_slice.save()
-
- def map_delete_inputs(self, controller_slice):
- tenant_fields = {
- "endpoint": "endpoint",
- "name": "Conscientious Plastic",
- "delete": True,
- }
- return tenant_fields
diff --git a/xos/synchronizers/new_base/tests/steps/sync_controller_users.py b/xos/synchronizers/new_base/tests/steps/sync_controller_users.py
deleted file mode 100644
index 1c722b5..0000000
--- a/xos/synchronizers/new_base/tests/steps/sync_controller_users.py
+++ /dev/null
@@ -1,73 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import os
-import base64
-from synchronizers.new_base.syncstep import *
-from synchronizers.new_base.ansible_helper import *
-from mock_modelaccessor import *
-
-
-class SyncControllerUsers(SyncStep):
- provides = [User]
- requested_interval = 0
- observes = ControllerUser
- playbook = "sync_controller_users.yaml"
-
- def map_sync_inputs(self, controller_user):
- if not controller_user.controller.admin_user:
- return
-
- # All users will have at least the 'user' role at their home site/tenant.
- # We must also check if the user should have the admin role
-
- roles = ["user"]
- if controller_user.user.is_admin:
- driver = self.driver.admin_driver(controller=controller_user.controller)
- roles.append(driver.get_admin_role().name)
-
- # setup user home site roles at controller
- if not controller_user.user.site:
- raise Exception("Siteless user %s" % controller_user.user.email)
- else:
- user_fields = {
- "endpoint": controller_user.controller.auth_url,
- "endpoint_v3": controller_user.controller.auth_url_v3,
- "domain": controller_user.controller.domain,
- "name": controller_user.user.email,
- "email": controller_user.user.email,
- "password": controller_user.user.remote_password,
- "admin_user": controller_user.controller.admin_user,
- "admin_password": controller_user.controller.admin_password,
- "ansible_tag": "%s@%s"
- % (
- controller_user.user.email.replace("@", "-at-"),
- controller_user.controller.name,
- ),
- "admin_project": controller_user.controller.admin_tenant,
- "roles": roles,
- "project": controller_user.user.site.login_base,
- }
- return user_fields
-
- def map_sync_outputs(self, controller_user, res):
- controller_user.kuser_id = res[0]["user"]["id"]
- controller_user.backend_status = "1 - OK"
- controller_user.save()
-
- def delete_record(self, controller_user):
- if controller_user.kuser_id:
- driver = self.driver.admin_driver(controller=controller_user.controller)
- driver.delete_user(controller_user.kuser_id)
diff --git a/xos/synchronizers/new_base/tests/steps/sync_images.py b/xos/synchronizers/new_base/tests/steps/sync_images.py
deleted file mode 100644
index 3a56f92..0000000
--- a/xos/synchronizers/new_base/tests/steps/sync_images.py
+++ /dev/null
@@ -1,29 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import os
-import base64
-from mock_modelaccessor import *
-from synchronizers.new_base.syncstep import SyncStep
-
-
-class SyncImages(SyncStep):
- provides = [Image]
- requested_interval = 0
- observes = [Image]
-
- def sync_record(self, role):
- # do nothing
- pass
diff --git a/xos/synchronizers/new_base/tests/steps/sync_instances.py b/xos/synchronizers/new_base/tests/steps/sync_instances.py
deleted file mode 100644
index 74984ae..0000000
--- a/xos/synchronizers/new_base/tests/steps/sync_instances.py
+++ /dev/null
@@ -1,66 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import os
-import base64
-import socket
-from synchronizers.new_base.ansible_helper import *
-import syncstep
-from mock_modelaccessor import *
-
-RESTAPI_HOSTNAME = socket.gethostname()
-RESTAPI_PORT = "8000"
-
-
-def escape(s):
- s = s.replace("\n", r"\n").replace('"', r"\"")
- return s
-
-
-class SyncInstances(syncstep.SyncStep):
- provides = [Instance]
- requested_interval = 0
- observes = Instance
- playbook = "sync_instances.yaml"
-
- def fetch_pending(self, deletion=False):
- objs = super(SyncInstances, self).fetch_pending(deletion)
- objs = [x for x in objs if x.isolation == "vm"]
- return objs
-
- def map_sync_inputs(self, instance):
- inputs = {}
- metadata_update = {}
-
- fields = {"name": instance.name, "delete": False}
- return fields
-
- def map_sync_outputs(self, instance, res):
- instance.save()
-
- def map_delete_inputs(self, instance):
- input = {
- "endpoint": "endpoint",
- "admin_user": "admin_user",
- "admin_password": "admin_password",
- "project_name": "project_name",
- "tenant": "tenant",
- "tenant_description": "tenant_description",
- "name": instance.name,
- "ansible_tag": "ansible_tag",
- "delete": True,
- }
-
- return input
diff --git a/xos/synchronizers/new_base/tests/steps/sync_ports.py b/xos/synchronizers/new_base/tests/steps/sync_ports.py
deleted file mode 100644
index e301ea4..0000000
--- a/xos/synchronizers/new_base/tests/steps/sync_ports.py
+++ /dev/null
@@ -1,37 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import os
-import base64
-from mock_modelaccessor import *
-from synchronizers.new_base.syncstep import SyncStep
-
-
-class SyncPort(SyncStep):
- requested_interval = 0 # 3600
- provides = [Port]
- observes = Port
-
- def call(self, failed=[], deletion=False):
- if deletion:
- self.delete_ports()
- else:
- self.sync_ports()
-
- def sync_ports(self):
- open("/tmp/sync_ports", "w").write("Sync successful")
-
- def delete_ports(self):
- open("/tmp/delete_ports", "w").write("Delete successful")
diff --git a/xos/synchronizers/new_base/tests/steps/sync_roles.py b/xos/synchronizers/new_base/tests/steps/sync_roles.py
deleted file mode 100644
index ea0c77b..0000000
--- a/xos/synchronizers/new_base/tests/steps/sync_roles.py
+++ /dev/null
@@ -1,33 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import os
-import base64
-from mock_modelaccessor import *
-import syncstep
-
-
-class SyncRoles(syncstep.SyncStep):
- provides = [Role]
- requested_interval = 0
- observes = [SiteRole, SliceRole, ControllerRole]
-
- def sync_record(self, role):
- if not role.enacted:
- controllers = Controller.objects.all()
- for controller in controllers:
- driver = self.driver.admin_driver(controller=controller)
- driver.create_role(role.role)
- role.save()
diff --git a/xos/synchronizers/new_base/tests/test_config.yaml b/xos/synchronizers/new_base/tests/test_config.yaml
deleted file mode 100644
index 0a4fece..0000000
--- a/xos/synchronizers/new_base/tests/test_config.yaml
+++ /dev/null
@@ -1,37 +0,0 @@
----
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-name: test-synchronizer
-accessor:
- username: xosadmin@opencord.org
- password: "sample"
- kind: testframework
-event_bus:
- endpoint: "fake"
- kind: kafka
-logging:
- version: 1
- handlers:
- console:
- class: logging.StreamHandler
- loggers:
- '':
- handlers:
- - console
- level: DEBUG
-dependency_graph: "tests/model-deps"
-steps_dir: "tests/steps"
-pull_steps_dir: "tests/pull_steps"
-event_steps_dir: "tests/event_steps"
diff --git a/xos/synchronizers/new_base/tests/test_config_onos.yaml b/xos/synchronizers/new_base/tests/test_config_onos.yaml
deleted file mode 100644
index c3eb562..0000000
--- a/xos/synchronizers/new_base/tests/test_config_onos.yaml
+++ /dev/null
@@ -1,24 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-name: test-synchronizer
-accessor:
- username: xosadmin@opencord.org
- password: "sample"
- kind: testframework
-logging:
- version: 1
-dependency_graph: "tests/model-deps-onos"
-steps_dir: "tests/steps"
diff --git a/xos/synchronizers/new_base/xos-policy.py b/xos/synchronizers/new_base/xos-policy.py
deleted file mode 100644
index cfb05d6..0000000
--- a/xos/synchronizers/new_base/xos-policy.py
+++ /dev/null
@@ -1,68 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-""" xos-policy.py
-
- Standalone interface to model_policy engine.
-
- Normally model policies are run by the synchronizer. This file allows them to be run independently as an aid
- to development.
-"""
-
-import os
-import sys
-import time
-
-sys.path.append("/opt/xos")
-os.environ.setdefault("DJANGO_SETTINGS_MODULE", "xos.settings")
-
-from synchronizers.new_base.modelaccessor import *
-from synchronizers.new_base.model_policy_loop import XOSPolicyEngine
-
-from xosconfig import Config
-from multistructlog import create_logger
-
-
-def main():
-
- log = create_logger(Config().get("logging"))
-
- models_active = False
- wait = False
- while not models_active:
- try:
- _ = Instance.objects.first()
- _ = NetworkTemplate.objects.first()
- models_active = True
- except Exception as e:
- log.exception("Exception", e=e)
- log.info("Waiting for data model to come up before starting...")
- time.sleep(10)
- wait = True
-
- if wait:
- time.sleep(
- 60
- ) # Safety factor, seeing that we stumbled waiting for the data model to come up.
-
- # start model policies thread
- policies_dir = Config.get("model_policies_dir")
-
- XOSPolicyEngine(policies_dir=policies_dir, log=log).run()
-
-
-if __name__ == "__main__":
- main()
diff --git a/xos/synchronizers/new_base/xos-synchronizer.py b/xos/synchronizers/new_base/xos-synchronizer.py
deleted file mode 100644
index 145c46e..0000000
--- a/xos/synchronizers/new_base/xos-synchronizer.py
+++ /dev/null
@@ -1,58 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import os
-import sys
-import time
-
-sys.path.append("/opt/xos")
-os.environ.setdefault("DJANGO_SETTINGS_MODULE", "xos.settings")
-
-from synchronizers.new_base.backend import Backend
-from synchronizers.new_base.modelaccessor import *
-
-from xosconfig import Config
-from multistructlog import create_logger
-log = create_logger(Config().get("logging"))
-
-
-def main():
-
- models_active = False
- wait = False
- while not models_active:
- try:
- _i = Instance.objects.first()
- _n = NetworkTemplate.objects.first()
- models_active = True
- except Exception as e:
- log.info("Exception", e=e)
- log.info("Waiting for data model to come up before starting...")
- time.sleep(10)
- wait = True
-
- if wait:
- time.sleep(
- 60
- ) # Safety factor, seeing that we stumbled waiting for the data model to come up.
-
- log_closure = log.bind(synchronizer_name=Config().get("name"))
- backend = Backend(log=log_closure)
- backend.run()
-
-
-if __name__ == "__main__":
- main()
diff --git a/xos/synchronizers/shared_templates/sync_service_composition.yaml b/xos/synchronizers/shared_templates/sync_service_composition.yaml
deleted file mode 100644
index 767cd50..0000000
--- a/xos/synchronizers/shared_templates/sync_service_composition.yaml
+++ /dev/null
@@ -1,40 +0,0 @@
-
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
----
-- hosts: {{ instance_name }}
- #gather_facts: False
- connection: ssh
- user: ubuntu
- become: yes
- vars:
- target_subnet : {{ target_subnet }}
- src_intf_ip : {{ src_intf_ip }}
-
-
- tasks:
- - name: Find the interface that has specified src ip
- shell: ifconfig | grep -B1 {{ src_intf_ip }} | head -n1 | awk '{print $1}'
- register: src_intf
-
- - name: debug
- debug: var=src_intf.stdout
-
- - name: set up the network
- shell: "{{ '{{' }} item {{ '}}' }}"
- with_items:
- - sudo ip route add {{ target_subnet }} dev {{ '{{' }} src_intf.stdout {{ '}}' }}
-