SEBA-405 Convert synchronizer framework to library
Change-Id: If8562f23dc15c7d18d7a8b040b33756708b3c5ec
diff --git a/VERSION b/VERSION
index 205460e..bfac6fc 100644
--- a/VERSION
+++ b/VERSION
@@ -1,2 +1,2 @@
-2.1.35
+2.1.36
diff --git a/containers/chameleon/Dockerfile.chameleon b/containers/chameleon/Dockerfile.chameleon
index cb13124..56da03e 100644
--- a/containers/chameleon/Dockerfile.chameleon
+++ b/containers/chameleon/Dockerfile.chameleon
@@ -13,7 +13,7 @@
# limitations under the License.
# xosproject/chameleon
-FROM xosproject/xos-base:2.1.35
+FROM xosproject/xos-base:2.1.36
# xos-base already has protoc and dependencies installed
diff --git a/containers/xos/Dockerfile.client b/containers/xos/Dockerfile.client
index c8bdeef..eed6feb 100644
--- a/containers/xos/Dockerfile.client
+++ b/containers/xos/Dockerfile.client
@@ -13,7 +13,7 @@
# limitations under the License.
# xosproject/xos-client
-FROM xosproject/xos-libraries:2.1.35
+FROM xosproject/xos-libraries:2.1.36
# Install XOS client
COPY lib/xos-api /tmp/xos-api
diff --git a/containers/xos/Dockerfile.libraries b/containers/xos/Dockerfile.libraries
index ac83f28..c96f241 100644
--- a/containers/xos/Dockerfile.libraries
+++ b/containers/xos/Dockerfile.libraries
@@ -13,7 +13,7 @@
# limitations under the License.
# xosproject/xos-libraries
-FROM xosproject/xos-base:2.1.35
+FROM xosproject/xos-base:2.1.36
# Add libraries
COPY lib /opt/xos/lib
diff --git a/containers/xos/Dockerfile.synchronizer-base b/containers/xos/Dockerfile.synchronizer-base
index 3fa14f5..2ae3ed4 100644
--- a/containers/xos/Dockerfile.synchronizer-base
+++ b/containers/xos/Dockerfile.synchronizer-base
@@ -13,7 +13,7 @@
# limitations under the License.
# xosproject/xos-synchronizer-base
-FROM xosproject/xos-client:2.1.35
+FROM xosproject/xos-client:2.1.36
COPY xos/synchronizers/new_base /opt/xos/synchronizers/new_base
COPY xos/xos/logger.py /opt/xos/xos/logger.py
@@ -23,6 +23,11 @@
# Copy over ansible hosts
COPY containers/xos/ansible-hosts /etc/ansible/hosts
+# For library-based synchronizers
+COPY lib/xos-synchronizer /tmp/xos-synchronizer
+COPY VERSION tmp/xos-synchronizer
+RUN cd /tmp/xos-synchronizer && python setup.py install
+
# Label image
ARG org_label_schema_schema_version=1.0
ARG org_label_schema_name=xos-synchronizer-base
diff --git a/containers/xos/Dockerfile.xos-core b/containers/xos/Dockerfile.xos-core
index a8f70a8..aa1eb15 100644
--- a/containers/xos/Dockerfile.xos-core
+++ b/containers/xos/Dockerfile.xos-core
@@ -13,7 +13,7 @@
# limitations under the License.
# xosproject/xos-core
-FROM xosproject/xos-libraries:2.1.35
+FROM xosproject/xos-libraries:2.1.36
# Install XOS
ADD xos /opt/xos
diff --git a/lib/xos-synchronizer/setup.py b/lib/xos-synchronizer/setup.py
new file mode 100644
index 0000000..305e7f6
--- /dev/null
+++ b/lib/xos-synchronizer/setup.py
@@ -0,0 +1,37 @@
+#!/usr/bin/env python
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from xosutil.autoversion_setup import setup_with_auto_version
+from xosutil.version import __version__
+
+setup_with_auto_version(
+ name="XosSynchronizer",
+ version=__version__,
+ description="XOS Synchronizer Framework",
+ author="Scott Baker",
+ author_email="scottb@opennetworking.org",
+ packages=[
+ "xossynchronizer",
+ "xossynchronizer.steps",
+ "xossynchronizer.event_steps",
+ "xossynchronizer.pull_steps",
+ "xossynchronizer.model_policies"],
+ include_package_data=True,
+ test_suite='nose2.collector.collector',
+ tests_require=['nose2'],
+ install_requires=["xosconfig>=2.1.35",],
+)
diff --git a/lib/xos-synchronizer/tests/__init__.py b/lib/xos-synchronizer/tests/__init__.py
new file mode 100644
index 0000000..b0fb0b2
--- /dev/null
+++ b/lib/xos-synchronizer/tests/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/lib/xos-synchronizer/tests/event_steps/event_step.py b/lib/xos-synchronizer/tests/event_steps/event_step.py
new file mode 100644
index 0000000..601b8df
--- /dev/null
+++ b/lib/xos-synchronizer/tests/event_steps/event_step.py
@@ -0,0 +1,29 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import print_function
+from xossynchronizer.event_steps.eventstep import EventStep
+from xossynchronizer.mock_modelaccessor import *
+
+
+class TestEventStep(EventStep):
+ technology = "kafka"
+ topics = ["sometopic"]
+ pattern = None
+
+ def __init__(self, log, *args, **kwargs):
+ super(TestEventStep, self).__init__(log, *args, **kwargs)
+
+ def process_event(self, event):
+ print("received an event", event)
diff --git a/lib/xos-synchronizer/tests/model-deps b/lib/xos-synchronizer/tests/model-deps
new file mode 100644
index 0000000..247a190
--- /dev/null
+++ b/lib/xos-synchronizer/tests/model-deps
@@ -0,0 +1,656 @@
+{
+
+
+ "User": [
+
+
+
+
+
+
+
+
+
+
+
+
+
+ ["ControllerUser", "controllerusers", "user"],
+
+
+ ["Site", "site", "users"],
+ ["DashboardView", "dashboards", "user"]
+
+ ],
+
+ "Privilege": [
+
+
+
+
+
+
+
+
+
+
+
+
+
+ ["ControllerPrivilege", "controllerprivileges", "privilege"]
+
+
+
+ ],
+
+ "AddressPool": [
+
+
+
+
+
+ ["Service", "service", "addresspools"]
+
+ ],
+
+
+ "ControllerDashboardView": [
+
+
+
+
+
+ ["Controller", "controller", "controllerdashboardviews"],
+ ["DashboardView", "dashboardView", "controllerdashboardviews"]
+
+ ],
+
+ "ControllerImages": [
+
+
+
+
+
+ ["Image", "image", "controllerimages"],
+ ["Controller", "controller", "controllerimages"]
+
+ ],
+
+ "ControllerNetwork": [
+
+
+
+
+
+ ["Network", "network", "controllernetworks"],
+ ["Controller", "controller", "controllernetworks"]
+
+ ],
+
+ "ControllerRole": [
+
+
+
+
+
+
+ ],
+
+ "ControllerSite": [
+
+
+
+
+
+ ["Site", "site", "controllersite"],
+ ["Controller", "controller", "controllersite"]
+
+ ],
+
+ "ControllerPrivilege": [
+
+
+
+
+
+ ["Controller", "controller", "controllerprivileges"],
+ ["Privilege", "privilege", "controllerprivileges"]
+
+ ],
+
+ "ControllerSitePrivilege": [
+
+
+
+
+
+ ["Controller", "controller", "controllersiteprivileges"],
+ ["SitePrivilege", "site_privilege", "controllersiteprivileges"]
+
+ ],
+
+ "ControllerSlice": [
+
+
+
+
+
+ ["Controller", "controller", "controllerslices"],
+ ["Slice", "slice", "controllerslices"]
+
+ ],
+
+ "ControllerSlicePrivilege": [
+
+
+
+
+
+ ["Controller", "controller", "controllersliceprivileges"],
+ ["SlicePrivilege", "slice_privilege", "controllersliceprivileges"]
+
+ ],
+
+ "ControllerUser": [
+
+
+
+
+
+ ["User", "user", "controllerusers"],
+ ["Controller", "controller", "controllersusers"]
+
+ ],
+
+ "DashboardView": [
+
+
+
+
+
+
+
+
+
+
+
+
+
+ ["ControllerDashboardView", "controllerdashboardviews", "dashboardView"],
+
+
+ ["Controller", "controllers", "dashboardviews"],
+ ["Deployment", "deployments", "dashboardviews"]
+
+ ],
+
+ "Deployment": [
+
+
+
+
+
+
+ ],
+
+ "DeploymentPrivilege": [
+
+
+
+
+
+ ["User", "user", "deploymentprivileges"],
+ ["Deployment", "deployment", "deploymentprivileges"],
+ ["DeploymentRole", "role", "deploymentprivileges"]
+
+ ],
+
+ "DeploymentRole": [
+
+
+
+
+
+
+ ],
+
+ "Flavor": [
+
+
+
+
+
+
+ ],
+
+ "Image": [
+
+
+
+
+
+
+
+
+
+
+
+
+
+ ["ControllerImages", "controllerimages", "image"]
+
+
+
+ ],
+
+ "ImageDeployments": [
+
+
+
+
+
+ ["Image", "image", "imagedeployments"],
+ ["Deployment", "deployment", "imagedeployments"]
+
+ ],
+
+ "Instance": [
+
+
+
+
+
+ ["Image", "image", "instances"],
+ ["User", "creator", "instances"],
+ ["Slice", "slice", "instances"],
+ ["Deployment", "deployment", "instance_deployment"],
+ ["Node", "node", "instances"],
+ ["Flavor", "flavor", "instance"],
+ ["Instance", "parent", "instance"]
+
+ ],
+
+ "Network": [
+
+
+
+
+
+
+
+
+
+
+
+
+
+ ["ControllerNetwork", "controllernetworks", "network"],
+
+
+ ["NetworkTemplate", "template", "network"],
+ ["Slice", "owner", "ownedNetworks"],
+ ["Slice", "permitted_slices", "availableNetworks"]
+ ],
+
+ "NetworkParameter": [
+
+
+
+
+
+ ["NetworkParameterType", "parameter", "networkparameters"]
+
+ ],
+
+ "NetworkParameterType": [
+
+
+
+
+
+
+ ],
+
+ "NetworkSlice": [
+
+
+
+
+
+ ["Network", "network", "networkslices"],
+ ["Slice", "slice", "networkslices"]
+
+ ],
+
+ "NetworkTemplate": [
+
+
+
+
+
+
+ ],
+
+ "Node": [
+
+
+
+
+
+ ["SiteDeployment", "site_deployment", "nodes"]
+
+ ],
+
+ "NodeLabel": [
+
+
+
+
+
+ ["Node", "node", "nodelabels"]
+
+ ],
+
+ "Port": [
+
+
+
+
+
+ ["Network", "network", "links"],
+ ["Instance", "instance", "ports"]
+
+ ],
+
+ "Role": [
+
+
+
+
+
+
+
+
+
+
+
+ ],
+
+ "Service": [
+
+
+
+
+
+
+ ],
+
+ "ServiceAttribute": [
+
+
+
+
+
+ ["Service", "service", "serviceattributes"]
+
+ ],
+
+ "ServiceDependency": [
+
+
+
+
+
+ ["Service", "provider_service", "provided_dependencies"],
+ ["Service", "subscriber_service", "subscribed_dependencies"]
+
+ ],
+
+ "ServiceMonitoringAgentInfo": [
+
+
+
+
+
+ ["Service", "service", "servicemonitoringagents"]
+
+ ],
+
+ "ServicePrivilege": [
+
+
+
+
+
+ ["User", "user", "serviceprivileges"],
+ ["Service", "service", "serviceprivileges"],
+ ["ServiceRole", "role", "serviceprivileges"]
+
+ ],
+
+ "ServiceRole": [
+
+
+
+
+
+
+ ],
+
+ "Site": [
+
+
+
+
+
+
+
+
+
+
+
+
+
+ ["ControllerSite", "controllersite", "site"],
+
+
+ ["Deployment", "deployments", "sites"]
+
+ ],
+
+ "SiteDeployment": [
+
+
+
+
+
+ ["Site", "site", "sitedeployments"],
+ ["Deployment", "deployment", "sitedeployments"],
+ ["Controller", "controller", "sitedeployments"]
+
+ ],
+
+ "SitePrivilege": [
+
+
+
+
+
+
+
+
+
+
+
+
+
+ ["ControllerSitePrivilege", "controllersiteprivileges", "site_privilege"],
+
+
+ ["User", "user", "siteprivileges"],
+ ["Site", "site", "siteprivileges"],
+ ["SiteRole", "role", "siteprivileges"]
+
+ ],
+
+ "SiteRole": [
+
+
+
+
+
+
+ ],
+
+ "Slice": [
+
+
+
+
+
+
+
+
+
+
+
+
+
+ ["ControllerSlice", "controllerslices", "slice"],
+
+
+ ["Site", "site", "slices"],
+ ["Service", "service", "slices"],
+ ["User", "creator", "slices"],
+ ["Flavor", "default_flavor", "slices"],
+ ["Image", "default_image", "slices"],
+ ["Node", "default_node", "slices"]
+
+ ],
+
+ "SlicePrivilege": [
+
+
+
+
+
+
+
+
+
+
+
+
+
+ ["ControllerSlicePrivilege", "controllersliceprivileges", "slice_privilege"],
+
+
+ ["User", "user", "sliceprivileges"],
+ ["Slice", "slice", "sliceprivileges"],
+ ["SliceRole", "role", "sliceprivileges"]
+
+ ],
+
+ "SliceRole": [
+
+
+
+
+
+
+ ],
+
+ "Tag": [
+
+
+
+
+
+ ["Service", "service", "tags"]
+
+ ],
+
+ "InterfaceType": [
+
+
+
+
+
+
+ ],
+
+ "ServiceInterface": [
+
+
+
+
+
+ ["Service", "service", "service_interfaces"],
+ ["InterfaceType", "interface_type", "service_interfaces"]
+
+ ],
+
+ "ServiceInstance": [
+
+
+
+
+
+ ["Service", "owner", "service_instances"]
+
+ ],
+
+ "ServiceInstanceLink": [
+
+
+
+
+
+ ["ServiceInstance", "provider_service_instance", "provided_links"],
+ ["ServiceInterface", "provider_service_interface", "provided_links"],
+ ["ServiceInstance", "subscriber_service_instance", "subscribed_links"],
+ ["Service", "subscriber_service", "subscribed_links"],
+ ["Network", "subscriber_network", "subscribed_links"]
+
+ ],
+
+ "ServiceInstanceAttribute": [
+
+
+
+
+
+ ["ServiceInstance", "service_instance", "service_instance_attributes"]
+
+ ],
+
+ "TenantWithContainer": [
+
+
+
+
+
+ ["Service", "owner", "service_instances"],
+ ["Instance", "instance", "+"],
+ ["User", "creator", "+"]
+
+ ],
+
+ "XOS": [
+
+
+
+
+
+
+ ],
+
+ "XOSGuiExtension": [
+
+
+
+
+
+
+ ]
+}
diff --git a/lib/xos-synchronizer/tests/steps/__init__.py b/lib/xos-synchronizer/tests/steps/__init__.py
new file mode 100644
index 0000000..b0fb0b2
--- /dev/null
+++ b/lib/xos-synchronizer/tests/steps/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/lib/xos-synchronizer/tests/steps/sync_container.py b/lib/xos-synchronizer/tests/steps/sync_container.py
new file mode 100644
index 0000000..baf108f
--- /dev/null
+++ b/lib/xos-synchronizer/tests/steps/sync_container.py
@@ -0,0 +1,54 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import hashlib
+import os
+import socket
+import sys
+import base64
+import time
+from xossynchronizer.steps.SyncInstanceUsingAnsible import SyncInstanceUsingAnsible
+from xossynchronizer.steps.syncstep import DeferredException
+from xossynchronizer.mock_modelaccessor import *
+
+# hpclibrary will be in steps/..
+parentdir = os.path.join(os.path.dirname(__file__), "..")
+sys.path.insert(0, parentdir)
+
+
+class SyncContainer(SyncInstanceUsingAnsible):
+ provides = [Instance]
+ observes = Instance
+ template_name = "sync_container.yaml"
+
+ def __init__(self, *args, **kwargs):
+ super(SyncContainer, self).__init__(*args, **kwargs)
+
+ def fetch_pending(self, deletion=False):
+ i = Instance()
+ i.name = "Spectacular Sponge"
+ j = Instance()
+ j.name = "Spontaneous Tent"
+ k = Instance()
+ k.name = "Embarrassed Cat"
+
+ objs = [i, j, k]
+ return objs
+
+ def sync_record(self, o):
+ pass
+
+ def delete_record(self, o):
+ pass
diff --git a/lib/xos-synchronizer/tests/steps/sync_controller_images.py b/lib/xos-synchronizer/tests/steps/sync_controller_images.py
new file mode 100644
index 0000000..84a43b1
--- /dev/null
+++ b/lib/xos-synchronizer/tests/steps/sync_controller_images.py
@@ -0,0 +1,54 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import base64
+from xossynchronizer.steps.syncstep import SyncStep
+from xossynchronizer.mock_modelaccessor import *
+
+class SyncControllerImages(SyncStep):
+ provides = [ControllerImages]
+ observes = ControllerImages
+ requested_interval = 0
+ playbook = "sync_controller_images.yaml"
+
+ def fetch_pending(self, deleted):
+ ci = ControllerImages()
+ i = Image()
+ i.name = "Lush Loss"
+ ci.i = i
+ return [ci]
+
+ def map_sync_inputs(self, controller_image):
+ image_fields = {
+ "endpoint": controller_image.controller.auth_url,
+ "endpoint_v3": controller_image.controller.auth_url_v3,
+ "admin_user": controller_image.controller.admin_user,
+ "admin_password": controller_image.controller.admin_password,
+ "domain": controller_image.controller.domain,
+ "name": controller_image.image.name,
+ "filepath": controller_image.image.path,
+ # name of ansible playbook
+ "ansible_tag": "%s@%s"
+ % (controller_image.image.name, controller_image.controller.name),
+ }
+
+ return image_fields
+
+ def map_sync_outputs(self, controller_image, res):
+ image_id = res[0]["id"]
+ controller_image.glance_image_id = image_id
+ controller_image.backend_status = "1 - OK"
+ controller_image.save()
diff --git a/lib/xos-synchronizer/tests/steps/sync_controller_networks.py b/lib/xos-synchronizer/tests/steps/sync_controller_networks.py
new file mode 100644
index 0000000..1133545
--- /dev/null
+++ b/lib/xos-synchronizer/tests/steps/sync_controller_networks.py
@@ -0,0 +1,60 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import base64
+import struct
+import socket
+from netaddr import IPAddress, IPNetwork
+from xossynchronizer.steps.syncstep import SyncStep
+from xossynchronizer.mock_modelaccessor import *
+
+class SyncControllerNetworks(SyncStep):
+ requested_interval = 0
+ provides = [Network]
+ observes = ControllerNetwork
+ external_dependencies = [User]
+ playbook = "sync_controller_networks.yaml"
+
+ def fetch_pending(self, deleted):
+ ci = ControllerNetwork()
+ i = Network()
+ i.name = "Lush Loss"
+ s = Slice()
+ s.name = "Ghastly Notebook"
+ i.owner = s
+ ci.i = i
+ return [ci]
+
+ def map_sync_outputs(self, controller_network, res):
+ network_id = res[0]["network"]["id"]
+ subnet_id = res[1]["subnet"]["id"]
+ controller_network.net_id = network_id
+ controller_network.subnet = self.cidr
+ controller_network.subnet_id = subnet_id
+ controller_network.backend_status = "1 - OK"
+ if not controller_network.segmentation_id:
+ controller_network.segmentation_id = str(
+ self.get_segmentation_id(controller_network)
+ )
+ controller_network.save()
+
+ def map_sync_inputs(self, controller_network):
+ pass
+
+ def map_delete_inputs(self, controller_network):
+ network_fields = {"endpoint": None, "delete": True}
+
+ return network_fields
diff --git a/lib/xos-synchronizer/tests/steps/sync_controller_site_privileges.py b/lib/xos-synchronizer/tests/steps/sync_controller_site_privileges.py
new file mode 100644
index 0000000..65d3985
--- /dev/null
+++ b/lib/xos-synchronizer/tests/steps/sync_controller_site_privileges.py
@@ -0,0 +1,107 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import base64
+import json
+from xossynchronizer.steps.syncstep import SyncStep
+from xossynchronizer.mock_modelaccessor import *
+
+class SyncControllerSitePrivileges(SyncStep):
+ provides = [SitePrivilege]
+ requested_interval = 0
+ observes = ControllerSitePrivilege
+ playbook = "sync_controller_users.yaml"
+
+ def map_sync_inputs(self, controller_site_privilege):
+ controller_register = json.loads(
+ controller_site_privilege.controller.backend_register
+ )
+ if not controller_site_privilege.controller.admin_user:
+ return
+
+ roles = [controller_site_privilege.site_privilege.role.role]
+ # setup user home site roles at controller
+ if not controller_site_privilege.site_privilege.user.site:
+ raise Exception(
+ "Siteless user %s" % controller_site_privilege.site_privilege.user.email
+ )
+ else:
+ # look up tenant id for the user's site at the controller
+ # ctrl_site_deployments = SiteDeployment.objects.filter(
+ # site_deployment__site=controller_site_privilege.user.site,
+ # controller=controller_site_privilege.controller)
+
+ # if ctrl_site_deployments:
+ # # need the correct tenant id for site at the controller
+ # tenant_id = ctrl_site_deployments[0].tenant_id
+ # tenant_name = ctrl_site_deployments[0].site_deployment.site.login_base
+ user_fields = {
+ "endpoint": controller_site_privilege.controller.auth_url,
+ "endpoint_v3": controller_site_privilege.controller.auth_url_v3,
+ "domain": controller_site_privilege.controller.domain,
+ "name": controller_site_privilege.site_privilege.user.email,
+ "email": controller_site_privilege.site_privilege.user.email,
+ "password": controller_site_privilege.site_privilege.user.remote_password,
+ "admin_user": controller_site_privilege.controller.admin_user,
+ "admin_password": controller_site_privilege.controller.admin_password,
+ "ansible_tag": "%s@%s"
+ % (
+ controller_site_privilege.site_privilege.user.email.replace(
+ "@", "-at-"
+ ),
+ controller_site_privilege.controller.name,
+ ),
+ "admin_tenant": controller_site_privilege.controller.admin_tenant,
+ "roles": roles,
+ "tenant": controller_site_privilege.site_privilege.site.login_base,
+ }
+
+ return user_fields
+
+ def map_sync_outputs(self, controller_site_privilege, res):
+ # results is an array in which each element corresponds to an
+ # "ok" string received per operation. If we get as many oks as
+ # the number of operations we issued, that means a grand success.
+ # Otherwise, the number of oks tell us which operation failed.
+ controller_site_privilege.role_id = res[0]["id"]
+ controller_site_privilege.save()
+
+ def delete_record(self, controller_site_privilege):
+ controller_register = json.loads(
+ controller_site_privilege.controller.backend_register
+ )
+ if controller_register.get("disabled", False):
+ raise InnocuousException(
+ "Controller %s is disabled" % controller_site_privilege.controller.name
+ )
+
+ if controller_site_privilege.role_id:
+ driver = self.driver.admin_driver(
+ controller=controller_site_privilege.controller
+ )
+ user = ControllerUser.objects.get(
+ controller=controller_site_privilege.controller,
+ user=controller_site_privilege.site_privilege.user,
+ )
+ site = ControllerSite.objects.get(
+ controller=controller_site_privilege.controller,
+ user=controller_site_privilege.site_privilege.user,
+ )
+ driver.delete_user_role(
+ user.kuser_id,
+ site.tenant_id,
+ controller_site_privilege.site_prvilege.role.role,
+ )
diff --git a/lib/xos-synchronizer/tests/steps/sync_controller_sites.py b/lib/xos-synchronizer/tests/steps/sync_controller_sites.py
new file mode 100644
index 0000000..509a45c
--- /dev/null
+++ b/lib/xos-synchronizer/tests/steps/sync_controller_sites.py
@@ -0,0 +1,88 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import base64
+import json
+from xossynchronizer.steps.syncstep import SyncStep
+from xossynchronizer.mock_modelaccessor import *
+
+class SyncControllerSites(SyncStep):
+ requested_interval = 0
+ provides = [Site]
+ observes = ControllerSite
+ playbook = "sync_controller_sites.yaml"
+
+ def fetch_pending(self, deleted=False):
+ lobjs = super(SyncControllerSites, self).fetch_pending(deleted)
+
+ if not deleted:
+ # filter out objects with null controllers
+ lobjs = [x for x in lobjs if x.controller]
+
+ return lobjs
+
+ def map_sync_inputs(self, controller_site):
+ tenant_fields = {
+ "endpoint": controller_site.controller.auth_url,
+ "endpoint_v3": controller_site.controller.auth_url_v3,
+ "domain": controller_site.controller.domain,
+ "admin_user": controller_site.controller.admin_user,
+ "admin_password": controller_site.controller.admin_password,
+ "admin_tenant": controller_site.controller.admin_tenant,
+ # name of ansible playbook
+ "ansible_tag": "%s@%s"
+ % (controller_site.site.login_base, controller_site.controller.name),
+ "tenant": controller_site.site.login_base,
+ "tenant_description": controller_site.site.name,
+ }
+ return tenant_fields
+
+ def map_sync_outputs(self, controller_site, res):
+ controller_site.tenant_id = res[0]["id"]
+ controller_site.backend_status = "1 - OK"
+ controller_site.save()
+
+ def delete_record(self, controller_site):
+ controller_register = json.loads(controller_site.controller.backend_register)
+ if controller_register.get("disabled", False):
+ raise InnocuousException(
+ "Controller %s is disabled" % controller_site.controller.name
+ )
+
+ if controller_site.tenant_id:
+ driver = self.driver.admin_driver(controller=controller_site.controller)
+ driver.delete_tenant(controller_site.tenant_id)
+
+ """
+ Ansible does not support tenant deletion yet
+
+ import pdb
+ pdb.set_trace()
+ template = os_template_env.get_template('delete_controller_sites.yaml')
+ tenant_fields = {'endpoint':controller_site.controller.auth_url,
+ 'admin_user': controller_site.controller.admin_user,
+ 'admin_password': controller_site.controller.admin_password,
+ 'admin_tenant': 'admin',
+ 'ansible_tag': 'controller_sites/%s@%s'%(controller_site.controller_site.site.login_base,controller_site.controller_site.deployment.name), # name of ansible playbook
+ 'tenant': controller_site.controller_site.site.login_base,
+ 'delete': True}
+
+ rendered = template.render(tenant_fields)
+ res = run_template('sync_controller_sites.yaml', tenant_fields)
+
+ if (len(res)!=1):
+ raise Exception('Could not assign roles for user %s'%tenant_fields['tenant'])
+ """
diff --git a/lib/xos-synchronizer/tests/steps/sync_controller_slice_privileges.py b/lib/xos-synchronizer/tests/steps/sync_controller_slice_privileges.py
new file mode 100644
index 0000000..ec0667c
--- /dev/null
+++ b/lib/xos-synchronizer/tests/steps/sync_controller_slice_privileges.py
@@ -0,0 +1,95 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import base64
+import json
+from xossynchronizer.steps.syncstep import SyncStep
+from xossynchronizer.mock_modelaccessor import *
+
+class SyncControllerSlicePrivileges(SyncStep):
+ provides = [SlicePrivilege]
+ requested_interval = 0
+ observes = ControllerSlicePrivilege
+ playbook = "sync_controller_users.yaml"
+
+ def map_sync_inputs(self, controller_slice_privilege):
+ if not controller_slice_privilege.controller.admin_user:
+ return
+
+ template = os_template_env.get_template("sync_controller_users.yaml")
+ roles = [controller_slice_privilege.slice_privilege.role.role]
+ # setup user home slice roles at controller
+ if not controller_slice_privilege.slice_privilege.user.site:
+ raise Exception(
+ "Sliceless user %s"
+ % controller_slice_privilege.slice_privilege.user.email
+ )
+ else:
+ user_fields = {
+ "endpoint": controller_slice_privilege.controller.auth_url,
+ "endpoint_v3": controller_slice_privilege.controller.auth_url_v3,
+ "domain": controller_slice_privilege.controller.domain,
+ "name": controller_slice_privilege.slice_privilege.user.email,
+ "email": controller_slice_privilege.slice_privilege.user.email,
+ "password": controller_slice_privilege.slice_privilege.user.remote_password,
+ "admin_user": controller_slice_privilege.controller.admin_user,
+ "admin_password": controller_slice_privilege.controller.admin_password,
+ "ansible_tag": "%s@%s@%s"
+ % (
+ controller_slice_privilege.slice_privilege.user.email.replace(
+ "@", "-at-"
+ ),
+ controller_slice_privilege.slice_privilege.slice.name,
+ controller_slice_privilege.controller.name,
+ ),
+ "admin_tenant": controller_slice_privilege.controller.admin_tenant,
+ "roles": roles,
+ "tenant": controller_slice_privilege.slice_privilege.slice.name,
+ }
+ return user_fields
+
+ def map_sync_outputs(self, controller_slice_privilege, res):
+ controller_slice_privilege.role_id = res[0]["id"]
+ controller_slice_privilege.save()
+
+ def delete_record(self, controller_slice_privilege):
+ controller_register = json.loads(
+ controller_slice_privilege.controller.backend_register
+ )
+ if controller_register.get("disabled", False):
+ raise InnocuousException(
+ "Controller %s is disabled" % controller_slice_privilege.controller.name
+ )
+
+ if controller_slice_privilege.role_id:
+ driver = self.driver.admin_driver(
+ controller=controller_slice_privilege.controller
+ )
+ user = ControllerUser.objects.filter(
+ controller_id=controller_slice_privilege.controller.id,
+ user_id=controller_slice_privilege.slice_privilege.user.id,
+ )
+ user = user[0]
+ slice = ControllerSlice.objects.filter(
+ controller_id=controller_slice_privilege.controller.id,
+ user_id=controller_slice_privilege.slice_privilege.user.id,
+ )
+ slice = slice[0]
+ driver.delete_user_role(
+ user.kuser_id,
+ slice.tenant_id,
+ controller_slice_privilege.slice_prvilege.role.role,
+ )
diff --git a/lib/xos-synchronizer/tests/steps/sync_controller_slices.py b/lib/xos-synchronizer/tests/steps/sync_controller_slices.py
new file mode 100644
index 0000000..0f43bad
--- /dev/null
+++ b/lib/xos-synchronizer/tests/steps/sync_controller_slices.py
@@ -0,0 +1,46 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import base64
+from xossynchronizer.steps.syncstep import SyncStep, DeferredException
+from xossynchronizer.mock_modelaccessor import *
+
+class SyncControllerSlices(SyncStep):
+ provides = [Slice]
+ requested_interval = 0
+ observes = ControllerSlice
+ playbook = "sync_controller_slices.yaml"
+
+ def map_sync_inputs(self, controller_slice):
+ if getattr(controller_slice, "force_fail", None):
+ raise Exception("Forced failure")
+ elif getattr(controller_slice, "force_defer", None):
+ raise DeferredException("Forced defer")
+
+ tenant_fields = {"endpoint": "endpoint", "name": "Flagrant Haircut"}
+
+ return tenant_fields
+
+ def map_sync_outputs(self, controller_slice, res):
+ controller_slice.save()
+
+ def map_delete_inputs(self, controller_slice):
+ tenant_fields = {
+ "endpoint": "endpoint",
+ "name": "Conscientious Plastic",
+ "delete": True,
+ }
+ return tenant_fields
diff --git a/lib/xos-synchronizer/tests/steps/sync_controller_users.py b/lib/xos-synchronizer/tests/steps/sync_controller_users.py
new file mode 100644
index 0000000..881e78a
--- /dev/null
+++ b/lib/xos-synchronizer/tests/steps/sync_controller_users.py
@@ -0,0 +1,72 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import base64
+from xossynchronizer.steps.syncstep import SyncStep
+from xossynchronizer.mock_modelaccessor import *
+
+
+class SyncControllerUsers(SyncStep):
+ provides = [User]
+ requested_interval = 0
+ observes = ControllerUser
+ playbook = "sync_controller_users.yaml"
+
+ def map_sync_inputs(self, controller_user):
+ if not controller_user.controller.admin_user:
+ return
+
+ # All users will have at least the 'user' role at their home site/tenant.
+ # We must also check if the user should have the admin role
+
+ roles = ["user"]
+ if controller_user.user.is_admin:
+ driver = self.driver.admin_driver(controller=controller_user.controller)
+ roles.append(driver.get_admin_role().name)
+
+ # setup user home site roles at controller
+ if not controller_user.user.site:
+ raise Exception("Siteless user %s" % controller_user.user.email)
+ else:
+ user_fields = {
+ "endpoint": controller_user.controller.auth_url,
+ "endpoint_v3": controller_user.controller.auth_url_v3,
+ "domain": controller_user.controller.domain,
+ "name": controller_user.user.email,
+ "email": controller_user.user.email,
+ "password": controller_user.user.remote_password,
+ "admin_user": controller_user.controller.admin_user,
+ "admin_password": controller_user.controller.admin_password,
+ "ansible_tag": "%s@%s"
+ % (
+ controller_user.user.email.replace("@", "-at-"),
+ controller_user.controller.name,
+ ),
+ "admin_project": controller_user.controller.admin_tenant,
+ "roles": roles,
+ "project": controller_user.user.site.login_base,
+ }
+ return user_fields
+
+ def map_sync_outputs(self, controller_user, res):
+ controller_user.kuser_id = res[0]["user"]["id"]
+ controller_user.backend_status = "1 - OK"
+ controller_user.save()
+
+ def delete_record(self, controller_user):
+ if controller_user.kuser_id:
+ driver = self.driver.admin_driver(controller=controller_user.controller)
+ driver.delete_user(controller_user.kuser_id)
diff --git a/lib/xos-synchronizer/tests/steps/sync_images.py b/lib/xos-synchronizer/tests/steps/sync_images.py
new file mode 100644
index 0000000..2284ed2
--- /dev/null
+++ b/lib/xos-synchronizer/tests/steps/sync_images.py
@@ -0,0 +1,28 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import base64
+from xossynchronizer.steps.syncstep import SyncStep
+from xossynchronizer.mock_modelaccessor import *
+
+class SyncImages(SyncStep):
+ provides = [Image]
+ requested_interval = 0
+ observes = [Image]
+
+ def sync_record(self, role):
+ # do nothing
+ pass
diff --git a/lib/xos-synchronizer/tests/steps/sync_instances.py b/lib/xos-synchronizer/tests/steps/sync_instances.py
new file mode 100644
index 0000000..479b87d
--- /dev/null
+++ b/lib/xos-synchronizer/tests/steps/sync_instances.py
@@ -0,0 +1,65 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import base64
+import socket
+from xossynchronizer.steps import syncstep
+from xossynchronizer.mock_modelaccessor import *
+
+RESTAPI_HOSTNAME = socket.gethostname()
+RESTAPI_PORT = "8000"
+
+
+def escape(s):
+ s = s.replace("\n", r"\n").replace('"', r"\"")
+ return s
+
+
+class SyncInstances(syncstep.SyncStep):
+ provides = [Instance]
+ requested_interval = 0
+ observes = Instance
+ playbook = "sync_instances.yaml"
+
+ def fetch_pending(self, deletion=False):
+ objs = super(SyncInstances, self).fetch_pending(deletion)
+ objs = [x for x in objs if x.isolation == "vm"]
+ return objs
+
+ def map_sync_inputs(self, instance):
+ inputs = {}
+ metadata_update = {}
+
+ fields = {"name": instance.name, "delete": False}
+ return fields
+
+ def map_sync_outputs(self, instance, res):
+ instance.save()
+
+ def map_delete_inputs(self, instance):
+ input = {
+ "endpoint": "endpoint",
+ "admin_user": "admin_user",
+ "admin_password": "admin_password",
+ "project_name": "project_name",
+ "tenant": "tenant",
+ "tenant_description": "tenant_description",
+ "name": instance.name,
+ "ansible_tag": "ansible_tag",
+ "delete": True,
+ }
+
+ return input
diff --git a/lib/xos-synchronizer/tests/steps/sync_ports.py b/lib/xos-synchronizer/tests/steps/sync_ports.py
new file mode 100644
index 0000000..77209a5
--- /dev/null
+++ b/lib/xos-synchronizer/tests/steps/sync_ports.py
@@ -0,0 +1,37 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import base64
+from xossynchronizer.steps.syncstep import SyncStep
+from xossynchronizer.mock_modelaccessor import *
+
+
+class SyncPort(SyncStep):
+ requested_interval = 0 # 3600
+ provides = [Port]
+ observes = Port
+
+ def call(self, failed=[], deletion=False):
+ if deletion:
+ self.delete_ports()
+ else:
+ self.sync_ports()
+
+ def sync_ports(self):
+ open("/tmp/sync_ports", "w").write("Sync successful")
+
+ def delete_ports(self):
+ open("/tmp/delete_ports", "w").write("Delete successful")
diff --git a/lib/xos-synchronizer/tests/steps/sync_roles.py b/lib/xos-synchronizer/tests/steps/sync_roles.py
new file mode 100644
index 0000000..e8b1364
--- /dev/null
+++ b/lib/xos-synchronizer/tests/steps/sync_roles.py
@@ -0,0 +1,33 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import base64
+from xossynchronizer.steps.syncstep import SyncStep
+from xossynchronizer.mock_modelaccessor import *
+
+
+class SyncRoles(SyncStep):
+ provides = [Role]
+ requested_interval = 0
+ observes = [SiteRole, SliceRole, ControllerRole]
+
+ def sync_record(self, role):
+ if not role.enacted:
+ controllers = Controller.objects.all()
+ for controller in controllers:
+ driver = self.driver.admin_driver(controller=controller)
+ driver.create_role(role.role)
+ role.save()
diff --git a/lib/xos-synchronizer/tests/test_config.yaml b/lib/xos-synchronizer/tests/test_config.yaml
new file mode 100644
index 0000000..0a4fece
--- /dev/null
+++ b/lib/xos-synchronizer/tests/test_config.yaml
@@ -0,0 +1,37 @@
+---
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: test-synchronizer
+accessor:
+ username: xosadmin@opencord.org
+ password: "sample"
+ kind: testframework
+event_bus:
+ endpoint: "fake"
+ kind: kafka
+logging:
+ version: 1
+ handlers:
+ console:
+ class: logging.StreamHandler
+ loggers:
+ '':
+ handlers:
+ - console
+ level: DEBUG
+dependency_graph: "tests/model-deps"
+steps_dir: "tests/steps"
+pull_steps_dir: "tests/pull_steps"
+event_steps_dir: "tests/event_steps"
diff --git a/lib/xos-synchronizer/tests/test_controller_dependencies.py b/lib/xos-synchronizer/tests/test_controller_dependencies.py
new file mode 100644
index 0000000..e358fea
--- /dev/null
+++ b/lib/xos-synchronizer/tests/test_controller_dependencies.py
@@ -0,0 +1,221 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from mock import patch
+import mock
+import pdb
+import networkx as nx
+
+import os
+import sys
+
+#test_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+#xos_dir = os.path.join(test_path, "..", "..", "..")
+
+
+class TestControllerDependencies(unittest.TestCase):
+
+ __test__ = False
+
+ def setUp(self):
+ global mock_enumerator, event_loop
+
+ self.sys_path_save = sys.path
+ self.cwd_save = os.getcwd()
+
+ config = os.path.join(test_path, "test_config.yaml")
+ from xosconfig import Config
+
+ Config.clear()
+ Config.init(config, "synchronizer-config-schema.yaml")
+
+ from xossynchronizer.mock_modelaccessor_build import (
+ build_mock_modelaccessor,
+ )
+
+ build_mock_modelaccessor(xos_dir, services_dir=None, service_xprotos=[])
+
+ os.chdir(os.path.join(test_path, "..")) # config references tests/model-deps
+
+ import event_loop
+
+ reload(event_loop)
+ import backend
+
+ reload(backend)
+ from mock_modelaccessor import mock_enumerator
+ from modelaccessor import model_accessor
+
+ # import all class names to globals
+ for (k, v) in model_accessor.all_model_classes.items():
+ globals()[k] = v
+
+ b = backend.Backend()
+ steps_dir = Config.get("steps_dir")
+ self.steps = b.load_sync_step_modules(steps_dir)
+ self.synchronizer = event_loop.XOSObserver(self.steps)
+
+ def tearDown(self):
+ sys.path = self.sys_path_save
+ os.chdir(self.cwd_save)
+
+ def test_multi_controller_path(self):
+ csl = ControllerSlice()
+ csi = ControllerSite()
+ site = Site()
+ slice = Slice()
+ slice.site = site
+ csl.slice = slice
+ csi.site = site
+ slice.controllerslices = mock_enumerator([csl])
+ site.controllersite = mock_enumerator([csi])
+
+ verdict, edge_type = self.synchronizer.concrete_path_exists(csl, csi)
+ self.assertTrue(verdict)
+ self.assertEqual(edge_type, event_loop.PROXY_EDGE)
+
+ def test_controller_path_simple(self):
+ p = Instance()
+ s = Slice()
+ t = Site()
+ ct = ControllerSite()
+ p.slice = s
+ s.site = t
+ ct.site = t
+ t.controllersite = mock_enumerator([ct])
+ cohorts = self.synchronizer.compute_dependent_cohorts([p, ct], False)
+ self.assertEqual([ct, p], cohorts[0])
+ cohorts = self.synchronizer.compute_dependent_cohorts([ct, p], False)
+ self.assertEqual([ct, p], cohorts[0])
+
+ def test_controller_deletion_path(self):
+ p = Instance()
+ s = Slice()
+ t = Site()
+ ct = ControllerSite()
+ ct.site = t
+ p.slice = s
+ s.site = t
+
+ t.controllersite = mock_enumerator([ct])
+
+ cohorts = self.synchronizer.compute_dependent_cohorts([p, s, t, ct], False)
+ self.assertEqual([t, ct, s, p], cohorts[0])
+ cohorts = self.synchronizer.compute_dependent_cohorts([p, s, t, ct], True)
+ self.assertEqual([p, s, ct, t], cohorts[0])
+
+ def test_multi_controller_schedule(self):
+ csl = ControllerSlice()
+ csi = ControllerSite()
+ site = Site()
+ slice = Slice()
+ slice.site = site
+ csl.slice = slice
+ csi.site = site
+ slice.controllerslices = mock_enumerator([csl])
+ site.controllersite = mock_enumerator([csi])
+ i = Instance()
+ i.slice = slice
+
+ cohorts = self.synchronizer.compute_dependent_cohorts(
+ [i, slice, site, csl, csi], False
+ )
+ self.assertEqual([site, csi, slice, csl, i], cohorts[0])
+
+ def test_multi_controller_path_negative(self):
+ csl = ControllerSlice()
+ csi = ControllerSite()
+ site = Site()
+ slice = Slice()
+ slice.site = site
+ csl.slice = slice
+ csi.site = site
+ slice.controllerslices = mock_enumerator([])
+ site.controllersite = mock_enumerator([])
+
+ verdict, edge_type = self.synchronizer.concrete_path_exists(csl, csi)
+ self.assertFalse(verdict)
+ self.assertEqual(edge_type, None)
+
+ def test_controller_path_simple_negative(self):
+ p = Instance()
+ s = Slice()
+ t = Site()
+ ct = ControllerSite()
+ p.slice = s
+ s.site = t
+ ct.site = t
+ t.controllersite = mock_enumerator([])
+ cohorts = self.synchronizer.compute_dependent_cohorts([p, ct], False)
+ self.assertIn([ct], cohorts)
+ self.assertIn([p], cohorts)
+
+ def test_controller_deletion_path_negative(self):
+ p = Instance()
+ s = Slice()
+ t = Site()
+ ct = ControllerSite()
+ s.site = t
+
+ t.controllersite = mock_enumerator([])
+
+ cohorts = self.synchronizer.compute_dependent_cohorts([p, s, t, ct], False)
+ self.assertIn([t, s], cohorts)
+ self.assertIn([p], cohorts)
+ self.assertIn([ct], cohorts)
+ cohorts = self.synchronizer.compute_dependent_cohorts([p, s, t, ct], True)
+ self.assertIn([s, t], cohorts)
+ self.assertIn([p], cohorts)
+ self.assertIn([ct], cohorts)
+
+ def test_multi_controller_deletion_schedule(self):
+ csl = ControllerSlice()
+ cn = ControllerNetwork()
+ site = Site()
+ slice = Slice()
+ slice.site = site
+ slice.controllerslices = mock_enumerator([])
+ site.controllersite = mock_enumerator([])
+ i = Instance()
+ i.slice = slice
+
+ cohorts = self.synchronizer.compute_dependent_cohorts(
+ [i, slice, site, csl, csi], False
+ )
+ self.assertIn([site, slice, i], cohorts)
+ self.assertIn([csl], cohorts)
+ self.assertIn([csi], cohorts)
+
+ def test_multi_controller_schedule_negative(self):
+ csl = ControllerSlice()
+ csi = ControllerSite()
+ site = Site()
+ slice = Slice()
+ slice.site = site
+ slice.controllerslices = mock_enumerator([])
+ site.controllersite = mock_enumerator([])
+ i = Instance()
+ i.slice = slice
+
+ cohorts = self.synchronizer.compute_dependent_cohorts(
+ [i, slice, site, csl, csi], False
+ )
+ self.assertIn([site, slice, i], cohorts)
+ self.assertIn([csl], cohorts)
+ self.assertIn([csi], cohorts)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/lib/xos-synchronizer/tests/test_diffs.py b/lib/xos-synchronizer/tests/test_diffs.py
new file mode 100644
index 0000000..9e09c0f
--- /dev/null
+++ b/lib/xos-synchronizer/tests/test_diffs.py
@@ -0,0 +1,110 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from mock import patch, call, Mock, PropertyMock
+import json
+
+import os
+import sys
+
+test_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+sync_lib_dir = os.path.join(test_path, "..", "xossynchronizer")
+xos_dir = os.path.join(test_path, "..", "..", "..", "xos")
+services_dir = os.path.join(xos_dir, "../../xos_services")
+
+class TestDiffs(unittest.TestCase):
+
+ """ These tests are for the mock modelaccessor, to make sure it behaves like the real one """
+
+ def setUp(self):
+
+ self.sys_path_save = sys.path
+ # Setting up the config module
+ from xosconfig import Config
+
+ config = os.path.join(test_path, "test_config.yaml")
+ Config.clear()
+ Config.init(config, "synchronizer-config-schema.yaml")
+ # END Setting up the config module
+
+ from xossynchronizer.mock_modelaccessor_build import (
+ build_mock_modelaccessor,
+ )
+
+ # FIXME this is to get jenkins to pass the tests, somehow it is running tests in a different order
+ # and apparently it is not overriding the generated model accessor
+ build_mock_modelaccessor(sync_lib_dir, xos_dir, services_dir, [])
+ import xossynchronizer.modelaccessor
+
+ # import all class names to globals
+ for (
+ k,
+ v,
+ ) in (
+ xossynchronizer.modelaccessor.model_accessor.all_model_classes.items()
+ ):
+ globals()[k] = v
+
+ self.log = Mock()
+
+ def tearDown(self):
+ sys.path = self.sys_path_save
+
+ def test_new_diff(self):
+ site = Site(name="mysite")
+
+ self.assertEqual(site.is_new, True)
+ self.assertEqual(site._dict, {"name": "mysite"})
+ self.assertEqual(site.diff, {})
+ self.assertEqual(site.changed_fields, ["name"])
+ self.assertEqual(site.has_field_changed("name"), False)
+ self.assertEqual(site.has_field_changed("login_base"), False)
+
+ site.login_base = "bar"
+
+ self.assertEqual(site._dict, {"login_base": "bar", "name": "mysite"})
+ self.assertEqual(site.diff, {"login_base": (None, "bar")})
+ self.assertIn("name", site.changed_fields)
+ self.assertIn("login_base", site.changed_fields)
+ self.assertEqual(site.has_field_changed("name"), False)
+ self.assertEqual(site.has_field_changed("login_base"), True)
+ self.assertEqual(site.get_field_diff("login_base"), (None, "bar"))
+
+ def test_existing_diff(self):
+ site = Site(name="mysite", login_base="foo")
+
+ # this is what would happen after saving and re-loading
+ site.is_new = False
+ site.id = 1
+ site._initial = site._dict
+
+ self.assertEqual(site.is_new, False)
+ self.assertEqual(site._dict, {"id": 1, "name": "mysite", "login_base": "foo"})
+ self.assertEqual(site.diff, {})
+ self.assertEqual(site.changed_fields, [])
+ self.assertEqual(site.has_field_changed("name"), False)
+ self.assertEqual(site.has_field_changed("login_base"), False)
+
+ site.login_base = "bar"
+
+ self.assertEqual(site._dict, {"id": 1, "login_base": "bar", "name": "mysite"})
+ self.assertEqual(site.diff, {"login_base": ("foo", "bar")})
+ self.assertIn("login_base", site.changed_fields)
+ self.assertEqual(site.has_field_changed("name"), False)
+ self.assertEqual(site.has_field_changed("login_base"), True)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/lib/xos-synchronizer/tests/test_event_engine.py b/lib/xos-synchronizer/tests/test_event_engine.py
new file mode 100644
index 0000000..13972c6
--- /dev/null
+++ b/lib/xos-synchronizer/tests/test_event_engine.py
@@ -0,0 +1,345 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import confluent_kafka
+import functools
+import unittest
+
+from mock import patch, PropertyMock, ANY
+
+import os
+import sys
+import time
+
+log = None
+
+test_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+sync_lib_dir = os.path.join(test_path, "..", "xossynchronizer")
+xos_dir = os.path.join(test_path, "..", "..", "..", "xos")
+
+print os.getcwd()
+
+def config_get_mock(orig, overrides, key):
+ if key in overrides:
+ return overrides[key]
+ else:
+ return orig(key)
+
+
+class FakeKafkaConsumer:
+ def __init__(self, values=[]):
+ self.values = values
+
+ def subscribe(self, topics):
+ pass
+
+ def poll(self, timeout=1.0):
+ if self.values:
+ return FakeKafkaMessage(self.values.pop())
+ # block forever
+ time.sleep(1000)
+
+
+class FakeKafkaMessage:
+ """ Works like Message in confluent_kafka
+ https://docs.confluent.io/current/clients/confluent-kafka-python/#message
+ """
+
+ def __init__(
+ self,
+ timestamp=None,
+ topic="faketopic",
+ key="fakekey",
+ value="fakevalue",
+ error=False,
+ ):
+
+ if timestamp is None:
+ self.fake_ts_type = confluent_kafka.TIMESTAMP_NOT_AVAILABLE
+ self.fake_timestamp = None
+ else:
+ self.fake_ts_type = confluent_kafka.TIMESTAMP_CREATE_TIME
+ self.fake_timestamp = timestamp
+
+ self.fake_topic = topic
+ self.fake_key = key
+ self.fake_value = value
+ self.fake_error = error
+
+ def error(self):
+ return self.fake_error
+
+ def timestamp(self):
+ return (self.fake_ts_type, self.fake_timestamp)
+
+ def topic(self):
+ return self.fake_topic
+
+ def key(self):
+ return self.fake_key
+
+ def value(self):
+ return self.fake_value
+
+
+class TestEventEngine(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+
+ global log
+
+ config = os.path.join(test_path, "test_config.yaml")
+ from xosconfig import Config
+
+ Config.clear()
+ Config.init(config, "synchronizer-config-schema.yaml")
+
+ if not log:
+ from multistructlog import create_logger
+
+ log = create_logger(Config().get("logging"))
+
+ def setUp(self):
+ global XOSKafkaThread, Config, log
+
+ self.sys_path_save = sys.path
+ self.cwd_save = os.getcwd()
+
+ config = os.path.join(test_path, "test_config.yaml")
+ from xosconfig import Config
+
+ Config.clear()
+ Config.init(config, "synchronizer-config-schema.yaml")
+
+ from xossynchronizer.mock_modelaccessor_build import (
+ build_mock_modelaccessor,
+ )
+
+ build_mock_modelaccessor(sync_lib_dir, xos_dir, services_dir=None, service_xprotos=[])
+
+ # The test config.yaml references files in `test/` so make sure we're in the parent directory of the
+ # test directory.
+ os.chdir(os.path.join(test_path, ".."))
+
+ from xossynchronizer.event_engine import XOSKafkaThread, XOSEventEngine
+
+ self.event_steps_dir = Config.get("event_steps_dir")
+ self.event_engine = XOSEventEngine(log)
+
+ def tearDown(self):
+ sys.path = self.sys_path_save
+ os.chdir(self.cwd_save)
+
+ def test_load_event_step_modules(self):
+ self.event_engine.load_event_step_modules(self.event_steps_dir)
+ self.assertEqual(len(self.event_engine.event_steps), 1)
+
+ def test_start(self):
+ self.event_engine.load_event_step_modules(self.event_steps_dir)
+
+ with patch.object(
+ XOSKafkaThread, "create_kafka_consumer"
+ ) as create_kafka_consumer, patch.object(
+ FakeKafkaConsumer, "subscribe"
+ ) as fake_subscribe, patch.object(
+ self.event_engine.event_steps[0], "process_event"
+ ) as process_event:
+
+ create_kafka_consumer.return_value = FakeKafkaConsumer(
+ values=["sampleevent"]
+ )
+ self.event_engine.start()
+
+ self.assertEqual(len(self.event_engine.threads), 1)
+
+ # Since event_engine.start() launches threads, give them a hundred milliseconds to do something...
+ time.sleep(0.1)
+
+ # We should have subscribed to the fake consumer
+ fake_subscribe.assert_called_once()
+
+ # The fake consumer will have returned one event
+ process_event.assert_called_once()
+
+ def test_start_with_pattern(self):
+ self.event_engine.load_event_step_modules(self.event_steps_dir)
+
+ with patch.object(
+ XOSKafkaThread, "create_kafka_consumer"
+ ) as create_kafka_consumer, patch.object(
+ FakeKafkaConsumer, "subscribe"
+ ) as fake_subscribe, patch.object(
+ self.event_engine.event_steps[0], "process_event"
+ ) as process_event, patch.object(
+ self.event_engine.event_steps[0], "pattern", new_callable=PropertyMock
+ ) as pattern, patch.object(
+ self.event_engine.event_steps[0], "topics", new_callable=PropertyMock
+ ) as topics:
+
+ pattern.return_value = "somepattern"
+ topics.return_value = []
+
+ create_kafka_consumer.return_value = FakeKafkaConsumer(
+ values=["sampleevent"]
+ )
+ self.event_engine.start()
+
+ self.assertEqual(len(self.event_engine.threads), 1)
+
+ # Since event_engine.start() launches threads, give them a hundred milliseconds to do something...
+ time.sleep(0.1)
+
+ # We should have subscribed to the fake consumer
+ fake_subscribe.assert_called_with("somepattern")
+
+ # The fake consumer will have returned one event
+ process_event.assert_called_once()
+
+ def test_start_bad_tech(self):
+ """ Set an unknown Technology in the event_step. XOSEventEngine.start() should print an error message and
+ not create any threads.
+ """
+
+ self.event_engine.load_event_step_modules(self.event_steps_dir)
+
+ with patch.object(
+ XOSKafkaThread, "create_kafka_consumer"
+ ) as create_kafka_consumer, patch.object(
+ log, "error"
+ ) as log_error, patch.object(
+ self.event_engine.event_steps[0], "technology"
+ ) as technology:
+ technology.return_value = "not_kafka"
+ create_kafka_consumer.return_value = FakeKafkaConsumer()
+ self.event_engine.start()
+
+ self.assertEqual(len(self.event_engine.threads), 0)
+
+ log_error.assert_called_with(
+ "Unknown technology. Skipping step",
+ step="TestEventStep",
+ technology=ANY,
+ )
+
+ def test_start_bad_no_topics(self):
+ """ Set no topics in the event_step. XOSEventEngine.start() will launch a thread, but the thread will fail
+ with an exception before calling subscribe.
+ """
+
+ self.event_engine.load_event_step_modules(self.event_steps_dir)
+
+ with patch.object(
+ XOSKafkaThread, "create_kafka_consumer"
+ ) as create_kafka_consumer, patch.object(
+ FakeKafkaConsumer, "subscribe"
+ ) as fake_subscribe, patch.object(
+ self.event_engine.event_steps[0], "topics", new_callable=PropertyMock
+ ) as topics:
+ topics.return_value = []
+ create_kafka_consumer.return_value = FakeKafkaConsumer()
+ self.event_engine.start()
+
+ # the thread does get launched, but it will fail with an exception
+ self.assertEqual(len(self.event_engine.threads), 1)
+
+ time.sleep(0.1)
+
+ fake_subscribe.assert_not_called()
+
+ def test_start_bad_topics_and_pattern(self):
+ """ Set no topics in the event_step. XOSEventEngine.start() will launch a thread, but the thread will fail
+ with an exception before calling subscribe.
+ """
+
+ self.event_engine.load_event_step_modules(self.event_steps_dir)
+
+ with patch.object(
+ XOSKafkaThread, "create_kafka_consumer"
+ ) as create_kafka_consumer, patch.object(
+ FakeKafkaConsumer, "subscribe"
+ ) as fake_subscribe, patch.object(
+ self.event_engine.event_steps[0], "pattern", new_callable=PropertyMock
+ ) as pattern:
+ pattern.return_value = "foo"
+ create_kafka_consumer.return_value = FakeKafkaConsumer()
+ self.event_engine.start()
+
+ # the thread does get launched, but it will fail with an exception
+ self.assertEqual(len(self.event_engine.threads), 1)
+
+ time.sleep(0.1)
+
+ fake_subscribe.assert_not_called()
+
+ def test_start_config_no_eventbus_kind(self):
+ """ Set a blank event_bus.kind in Config. XOSEventEngine.start() should print an error message and
+ not create any threads.
+ """
+
+ self.event_engine.load_event_step_modules(self.event_steps_dir)
+
+ config_get_orig = Config.get
+ with patch.object(
+ XOSKafkaThread, "create_kafka_consumer"
+ ) as create_kafka_consumer, patch.object(
+ log, "error"
+ ) as log_error, patch.object(
+ Config,
+ "get",
+ new=functools.partial(
+ config_get_mock, config_get_orig, {"event_bus.kind": None}
+ ),
+ ):
+
+ create_kafka_consumer.return_value = FakeKafkaConsumer()
+ self.event_engine.start()
+
+ self.assertEqual(len(self.event_engine.threads), 0)
+
+ log_error.assert_called_with(
+ "Eventbus kind is not configured in synchronizer config file."
+ )
+
+ def test_start_config_bad_eventbus_kind(self):
+ """ Set an unknown event_bus.kind in Config. XOSEventEngine.start() should print an error message and
+ not create any threads.
+ """
+
+ self.event_engine.load_event_step_modules(self.event_steps_dir)
+
+ config_get_orig = Config.get
+ with patch.object(
+ XOSKafkaThread, "create_kafka_consumer"
+ ) as create_kafka_consumer, patch.object(
+ log, "error"
+ ) as log_error, patch.object(
+ Config,
+ "get",
+ new=functools.partial(
+ config_get_mock, config_get_orig, {"event_bus.kind": "not_kafka"}
+ ),
+ ):
+ create_kafka_consumer.return_value = FakeKafkaConsumer()
+ self.event_engine.start()
+
+ self.assertEqual(len(self.event_engine.threads), 0)
+
+ log_error.assert_called_with(
+ "Eventbus kind is set to a technology we do not implement.",
+ eventbus_kind="not_kafka",
+ )
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/lib/xos-synchronizer/tests/test_load.py b/lib/xos-synchronizer/tests/test_load.py
new file mode 100644
index 0000000..8f9813d
--- /dev/null
+++ b/lib/xos-synchronizer/tests/test_load.py
@@ -0,0 +1,108 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from mock import patch
+import mock
+import pdb
+import networkx as nx
+
+import os
+import sys
+
+test_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+sync_lib_dir = os.path.join(test_path, "..", "xossynchronizer")
+xos_dir = os.path.join(test_path, "..", "..", "..", "xos")
+
+class TestScheduling(unittest.TestCase):
+ def setUp(self):
+ self.sys_path_save = sys.path
+ self.cwd_save = os.getcwd()
+
+ config = os.path.join(test_path, "test_config.yaml")
+ from xosconfig import Config
+
+ Config.clear()
+ Config.init(config, "synchronizer-config-schema.yaml")
+
+ from xossynchronizer.mock_modelaccessor_build import (
+ build_mock_modelaccessor,
+ )
+
+ build_mock_modelaccessor(sync_lib_dir, xos_dir, services_dir=None, service_xprotos=[])
+
+ # The test config.yaml references files in `test/` so make sure we're in the parent directory of the
+ # test directory.
+ os.chdir(os.path.join(test_path, ".."))
+
+ import xossynchronizer.event_loop
+ reload(xossynchronizer.event_loop)
+
+ import xossynchronizer.backend
+ reload(xossynchronizer.backend)
+
+ b = xossynchronizer.backend.Backend()
+ steps_dir = Config.get("steps_dir")
+ self.steps = b.load_sync_step_modules(steps_dir)
+ self.synchronizer = xossynchronizer.event_loop.XOSObserver(self.steps)
+
+ def tearDown(self):
+ sys.path = self.sys_path_save
+ os.chdir(self.cwd_save)
+
+ def test_load_steps(self):
+ step_names = [s.__name__ for s in self.steps]
+ self.assertIn("SyncControllerSlices", step_names)
+
+ def test_load_deps(self):
+ self.synchronizer.load_dependency_graph()
+ graph = self.synchronizer.model_dependency_graph
+ self.assertTrue(graph[False].has_edge("Instance", "Slice"))
+ self.assertTrue(graph[True].has_edge("Slice", "Instance"))
+ self.assertTrue(graph[False].has_edge("Slice", "ControllerSlice"))
+ self.assertTrue(graph[True].has_edge("ControllerSlice", "Slice"))
+
+ def test_load_dep_accessors(self):
+ self.synchronizer.load_dependency_graph()
+ graph = self.synchronizer.model_dependency_graph
+ self.assertDictContainsSubset(
+ {"src_accessor": "controllerslices"},
+ graph[False]["Slice"]["ControllerSlice"],
+ )
+ self.assertDictContainsSubset(
+ {"src_accessor": "slice", "dst_accessor": "controllerslices"},
+ graph[True]["Slice"]["ControllerSlice"],
+ )
+
+ def test_load_sync_steps(self):
+ self.synchronizer.load_sync_steps()
+ model_to_step = self.synchronizer.model_to_step
+ step_lookup = self.synchronizer.step_lookup
+ self.assertIn(
+ ("ControllerSlice", ["SyncControllerSlices"]), model_to_step.items()
+ )
+ self.assertIn(("SiteRole", ["SyncRoles"]), model_to_step.items())
+
+ for k, v in model_to_step.items():
+ val = v[0]
+ observes = step_lookup[val].observes
+ if not isinstance(observes, list):
+ observes = [observes]
+
+ observed_names = [o.__name__ for o in observes]
+ self.assertIn(k, observed_names)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/lib/xos-synchronizer/tests/test_model_policy_tenantwithcontainer.py b/lib/xos-synchronizer/tests/test_model_policy_tenantwithcontainer.py
new file mode 100644
index 0000000..fa3c774
--- /dev/null
+++ b/lib/xos-synchronizer/tests/test_model_policy_tenantwithcontainer.py
@@ -0,0 +1,289 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import unittest
+from mock import patch
+import mock
+import pdb
+
+import os
+import sys
+from xosconfig import Config
+
+test_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+sync_lib_dir = os.path.join(test_path, "..", "xossynchronizer")
+xos_dir = os.path.join(test_path, "..", "..", "..", "xos")
+
+class TestModelPolicyTenantWithContainer(unittest.TestCase):
+ def setUp(self):
+ global TenantWithContainerPolicy, LeastLoadedNodeScheduler, MockObjectList
+
+ self.sys_path_save = sys.path
+ self.cwd_save = os.getcwd()
+
+ config = basic_conf = os.path.abspath(
+ os.path.dirname(os.path.realpath(__file__)) + "/test_config.yaml"
+ )
+ Config.clear() # in case left unclean by a previous test case
+ Config.init(config, "synchronizer-config-schema.yaml")
+
+ from xossynchronizer.mock_modelaccessor_build import (
+ build_mock_modelaccessor,
+ )
+
+ build_mock_modelaccessor(sync_lib_dir, xos_dir, services_dir=None, service_xprotos=[])
+
+ import xossynchronizer.model_policies.model_policy_tenantwithcontainer
+ from xossynchronizer.model_policies.model_policy_tenantwithcontainer import (
+ TenantWithContainerPolicy,
+ LeastLoadedNodeScheduler,
+ )
+
+ from xossynchronizer.mock_modelaccessor import MockObjectList
+
+ # import all class names to globals
+ for (
+ k,
+ v,
+ ) in xossynchronizer.model_policies.model_policy_tenantwithcontainer.model_accessor.all_model_classes.items():
+ globals()[k] = v
+
+ # TODO: Mock_model_accessor lacks save or delete methods
+ # Instance.save = mock.Mock
+ # Instance.delete = mock.Mock
+ # TenantWithContainer.save = mock.Mock
+
+ self.policy = TenantWithContainerPolicy()
+ self.user = User(email="testadmin@test.org")
+ self.tenant = TenantWithContainer(creator=self.user)
+ self.flavor = Flavor(name="m1.small")
+
+ def tearDown(self):
+ Config.clear()
+ sys.path = self.sys_path_save
+ os.chdir(self.cwd_save)
+
+ def test_manage_container_no_slices(self):
+ with patch.object(TenantWithContainer, "owner") as owner:
+ owner.slices.count.return_value = 0
+ with self.assertRaises(Exception) as e:
+ self.policy.manage_container(self.tenant)
+ self.assertEqual(e.exception.message, "The service has no slices")
+
+ def test_manage_container(self):
+ with patch.object(TenantWithContainer, "owner") as owner, patch.object(
+ TenantWithContainer, "save"
+ ) as tenant_save, patch.object(
+ Node, "site_deployment"
+ ) as site_deployment, patch.object(
+ Instance, "save"
+ ) as instance_save, patch.object(
+ Instance, "delete"
+ ) as instance_delete, patch.object(
+ TenantWithContainerPolicy, "get_image"
+ ) as get_image, patch.object(
+ LeastLoadedNodeScheduler, "pick"
+ ) as pick:
+ # setup mocks
+ node = Node(hostname="my.node.com")
+ slice = Slice(
+ name="mysite_test1", default_flavor=self.flavor, default_isolation="vm"
+ )
+ image = Image(name="trusty-server-multi-nic")
+ deployment = Deployment(name="testdeployment")
+ owner.slices.count.return_value = 1
+ owner.slices.all.return_value = [slice]
+ owner.slices.first.return_value = slice
+ get_image.return_value = image
+ pick.return_value = (node, None)
+ site_deployment.deployment = deployment
+ # done setup mocks
+
+ # call manage_container
+ self.policy.manage_container(self.tenant)
+
+ # make sure manage_container did what it is supposed to do
+ self.assertNotEqual(self.tenant.instance, None)
+ self.assertEqual(self.tenant.instance.creator.email, "testadmin@test.org")
+ self.assertEqual(self.tenant.instance.image.name, "trusty-server-multi-nic")
+ self.assertEqual(self.tenant.instance.flavor.name, "m1.small")
+ self.assertEqual(self.tenant.instance.isolation, "vm")
+ self.assertEqual(self.tenant.instance.node.hostname, "my.node.com")
+ self.assertEqual(self.tenant.instance.slice.name, "mysite_test1")
+ self.assertEqual(self.tenant.instance.parent, None)
+ instance_save.assert_called()
+ instance_delete.assert_not_called()
+ tenant_save.assert_called()
+
+ def test_manage_container_delete(self):
+ self.tenant.deleted = True
+
+ # call manage_container
+ self.policy.manage_container(self.tenant)
+
+ # make sure manage_container did what it is supposed to do
+ self.assertEqual(self.tenant.instance, None)
+
+ def test_manage_container_no_m1_small(self):
+ with patch.object(TenantWithContainer, "owner") as owner, patch.object(
+ Node, "site_deployment"
+ ) as site_deployment, patch.object(
+ Flavor, "objects"
+ ) as flavor_objects, patch.object(
+ TenantWithContainerPolicy, "get_image"
+ ) as get_image, patch.object(
+ LeastLoadedNodeScheduler, "pick"
+ ) as pick:
+ # setup mocks
+ node = Node(hostname="my.node.com")
+ slice = Slice(
+ name="mysite_test1", default_flavor=None, default_isolation="vm"
+ )
+ image = Image(name="trusty-server-multi-nic")
+ deployment = Deployment(name="testdeployment")
+ owner.slices.count.return_value = 1
+ owner.slices.all.return_value = [slice]
+ owner.slices.first.return_value = slice
+ get_image.return_value = image
+ pick.return_value = (node, None)
+ site_deployment.deployment = deployment
+ flavor_objects.filter.return_value = []
+ # done setup mocks
+
+ with self.assertRaises(Exception) as e:
+ self.policy.manage_container(self.tenant)
+ self.assertEqual(e.exception.message, "No m1.small flavor")
+
+ def test_least_loaded_node_scheduler(self):
+ with patch.object(Node.objects, "get_items") as node_objects:
+ slice = Slice(
+ name="mysite_test1", default_flavor=None, default_isolation="vm"
+ )
+ node = Node(hostname="my.node.com", id=4567)
+ node.instances = MockObjectList(initial=[])
+ node_objects.return_value = [node]
+
+ sched = LeastLoadedNodeScheduler(slice)
+ (picked_node, parent) = sched.pick()
+
+ self.assertNotEqual(picked_node, None)
+ self.assertEqual(picked_node.id, node.id)
+
+ def test_least_loaded_node_scheduler_two_nodes(self):
+ with patch.object(Node.objects, "get_items") as node_objects:
+ slice = Slice(
+ name="mysite_test1", default_flavor=None, default_isolation="vm"
+ )
+ instance1 = Instance(id=1)
+ node1 = Node(hostname="my.node.com", id=4567)
+ node1.instances = MockObjectList(initial=[])
+ node2 = Node(hostname="my.node.com", id=8910)
+ node2.instances = MockObjectList(initial=[instance1])
+ node_objects.return_value = [node1, node2]
+
+ # should pick the node with the fewest instance (node1)
+
+ sched = LeastLoadedNodeScheduler(slice)
+ (picked_node, parent) = sched.pick()
+
+ self.assertNotEqual(picked_node, None)
+ self.assertEqual(picked_node.id, node1.id)
+
+ def test_least_loaded_node_scheduler_two_nodes_multi(self):
+ with patch.object(Node.objects, "get_items") as node_objects:
+ slice = Slice(
+ name="mysite_test1", default_flavor=None, default_isolation="vm"
+ )
+ instance1 = Instance(id=1)
+ instance2 = Instance(id=2)
+ instance3 = Instance(id=3)
+ node1 = Node(hostname="my.node.com", id=4567)
+ node1.instances = MockObjectList(initial=[instance2, instance3])
+ node2 = Node(hostname="my.node.com", id=8910)
+ node2.instances = MockObjectList(initial=[instance1])
+ node_objects.return_value = [node1, node2]
+
+ # should pick the node with the fewest instance (node2)
+
+ sched = LeastLoadedNodeScheduler(slice)
+ (picked_node, parent) = sched.pick()
+
+ self.assertNotEqual(picked_node, None)
+ self.assertEqual(picked_node.id, node2.id)
+
+ def test_least_loaded_node_scheduler_with_label(self):
+ with patch.object(Node.objects, "get_items") as node_objects:
+ slice = Slice(
+ name="mysite_test1", default_flavor=None, default_isolation="vm"
+ )
+ instance1 = Instance(id=1)
+ node1 = Node(hostname="my.node.com", id=4567)
+ node1.instances = MockObjectList(initial=[])
+ node2 = Node(hostname="my.node.com", id=8910)
+ node2.instances = MockObjectList(initial=[instance1])
+ # Fake out the existence of a NodeLabel object. TODO: Extend the mock framework to support the model__field
+ # syntax.
+ node1.nodelabels__name = None
+ node2.nodelabels__name = "foo"
+ node_objects.return_value = [node1, node2]
+
+ # should pick the node with the label, even if it has a greater number of instances
+
+ sched = LeastLoadedNodeScheduler(slice, label="foo")
+ (picked_node, parent) = sched.pick()
+
+ self.assertNotEqual(picked_node, None)
+ self.assertEqual(picked_node.id, node2.id)
+
+ def test_least_loaded_node_scheduler_create_label(self):
+ with patch.object(Node.objects, "get_items") as node_objects, patch.object(
+ NodeLabel, "save", autospec=True
+ ) as nodelabel_save, patch.object(NodeLabel, "node") as nodelabel_node_add:
+ slice = Slice(
+ name="mysite_test1", default_flavor=None, default_isolation="vm"
+ )
+ instance1 = Instance(id=1)
+ node1 = Node(hostname="my.node.com", id=4567)
+ node1.instances = MockObjectList(initial=[])
+ node2 = Node(hostname="my.node.com", id=8910)
+ node2.instances = MockObjectList(initial=[instance1])
+ # Fake out the existence of a NodeLabel object. TODO: Extend the mock framework to support the model__field
+ # syntax.
+ node1.nodelabels__name = None
+ node2.nodelabels__name = None
+ node_objects.return_value = [node1, node2]
+
+ # should pick the node with the least number of instances
+
+ sched = LeastLoadedNodeScheduler(
+ slice, label="foo", constrain_by_service_instance=True
+ )
+ (picked_node, parent) = sched.pick()
+
+ self.assertNotEqual(picked_node, None)
+ self.assertEqual(picked_node.id, node1.id)
+
+ # NodeLabel should have been created and saved
+
+ self.assertEqual(nodelabel_save.call_count, 1)
+ self.assertEqual(nodelabel_save.call_args[0][0].name, "foo")
+
+ # The NodeLabel's node field should have been added to
+
+ NodeLabel.node.add.assert_called_with(node1)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/lib/xos-synchronizer/tests/test_payload.py b/lib/xos-synchronizer/tests/test_payload.py
new file mode 100644
index 0000000..6bd1cfc
--- /dev/null
+++ b/lib/xos-synchronizer/tests/test_payload.py
@@ -0,0 +1,346 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import unittest
+from mock import patch
+import mock
+import pdb
+import networkx as nx
+
+import os
+import sys
+
+test_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+sync_lib_dir = os.path.join(test_path, "..", "xossynchronizer")
+xos_dir = os.path.join(test_path, "..", "..", "..", "xos")
+
+ANSIBLE_FILE = "/tmp/payload_test"
+
+log = None
+
+
+def run_fake_ansible_template(*args, **kwargs):
+ opts = args[1]
+ open(ANSIBLE_FILE, "w").write(json.dumps(opts))
+ return [{"rc": 0}]
+
+
+def run_fake_ansible_template_fail(*args, **kwargs):
+ opts = args[1]
+ open(ANSIBLE_FILE, "w").write(json.dumps(opts))
+ return [{"rc": 1}]
+
+
+def get_ansible_output():
+ ansible_str = open(ANSIBLE_FILE).read()
+ return json.loads(ansible_str)
+
+
+class TestPayload(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+
+ global log
+
+ config = os.path.join(test_path, "test_config.yaml")
+ from xosconfig import Config
+
+ Config.clear()
+ Config.init(config, "synchronizer-config-schema.yaml")
+
+ if not log:
+ from multistructlog import create_logger
+
+ log = create_logger(Config().get("logging"))
+
+ def setUp(self):
+
+ global log, steps, event_loop
+
+ self.sys_path_save = sys.path
+ self.cwd_save = os.getcwd()
+
+ config = os.path.join(test_path, "test_config.yaml")
+ from xosconfig import Config
+
+ Config.clear()
+ Config.init(config, "synchronizer-config-schema.yaml")
+
+ from xossynchronizer.mock_modelaccessor_build import (
+ build_mock_modelaccessor,
+ )
+
+ build_mock_modelaccessor(sync_lib_dir, xos_dir, services_dir=None, service_xprotos=[])
+
+ os.chdir(os.path.join(test_path, "..")) # config references tests/model-deps
+
+ import xossynchronizer.event_loop
+
+ reload(xossynchronizer.event_loop)
+ import xossynchronizer.backend
+
+ reload(xossynchronizer.backend)
+ import steps.sync_instances
+ import steps.sync_controller_slices
+ from xossynchronizer.modelaccessor import model_accessor
+
+ # import all class names to globals
+ for (k, v) in model_accessor.all_model_classes.items():
+ globals()[k] = v
+ b = xossynchronizer.backend.Backend()
+ steps_dir = Config.get("steps_dir")
+ self.steps = b.load_sync_step_modules(steps_dir)
+ self.synchronizer = xossynchronizer.event_loop.XOSObserver(self.steps)
+
+ def tearDown(self):
+ sys.path = self.sys_path_save
+ os.chdir(self.cwd_save)
+
+ @mock.patch(
+ "steps.sync_instances.syncstep.run_template",
+ side_effect=run_fake_ansible_template,
+ )
+ @mock.patch("xossynchronizer.event_loop.model_accessor")
+ def test_delete_record(self, mock_run_template, mock_modelaccessor):
+ with mock.patch.object(Instance, "save") as instance_save:
+ o = Instance()
+ o.name = "Sisi Pascal"
+
+ o.synchronizer_step = steps.sync_instances.SyncInstances()
+ self.synchronizer.delete_record(o, log)
+
+ a = get_ansible_output()
+ self.assertDictContainsSubset({"delete": True, "name": o.name}, a)
+ o.save.assert_called_with(update_fields=["backend_need_reap"])
+
+ @mock.patch(
+ "steps.sync_instances.syncstep.run_template",
+ side_effect=run_fake_ansible_template_fail,
+ )
+ @mock.patch("xossynchronizer.event_loop.model_accessor")
+ def test_delete_record_fail(self, mock_run_template, mock_modelaccessor):
+ with mock.patch.object(Instance, "save") as instance_save:
+ o = Instance()
+ o.name = "Sisi Pascal"
+
+ o.synchronizer_step = steps.sync_instances.SyncInstances()
+
+ with self.assertRaises(Exception) as e:
+ self.synchronizer.delete_record(o, log)
+
+ self.assertEqual(
+ e.exception.message, "Nonzero rc from Ansible during delete_record"
+ )
+
+ @mock.patch(
+ "steps.sync_instances.syncstep.run_template",
+ side_effect=run_fake_ansible_template,
+ )
+ @mock.patch("xossynchronizer.event_loop.model_accessor")
+ def test_sync_record(self, mock_run_template, mock_modelaccessor):
+ with mock.patch.object(Instance, "save") as instance_save:
+ o = Instance()
+ o.name = "Sisi Pascal"
+
+ o.synchronizer_step = steps.sync_instances.SyncInstances()
+ self.synchronizer.sync_record(o, log)
+
+ a = get_ansible_output()
+ self.assertDictContainsSubset({"delete": False, "name": o.name}, a)
+ o.save.assert_called_with(
+ update_fields=[
+ "enacted",
+ "backend_status",
+ "backend_register",
+ "backend_code",
+ ]
+ )
+
+ @mock.patch(
+ "steps.sync_instances.syncstep.run_template",
+ side_effect=run_fake_ansible_template,
+ )
+ @mock.patch("xossynchronizer.event_loop.model_accessor")
+ def test_sync_cohort(self, mock_run_template, mock_modelaccessor):
+ with mock.patch.object(Instance, "save") as instance_save, mock.patch.object(
+ ControllerSlice, "save"
+ ) as controllerslice_save:
+ cs = ControllerSlice()
+ s = Slice(name="SP SP")
+ cs.slice = s
+
+ o = Instance()
+ o.name = "Sisi Pascal"
+ o.slice = s
+
+ cohort = [cs, o]
+ o.synchronizer_step = steps.sync_instances.SyncInstances()
+ cs.synchronizer_step = steps.sync_controller_slices.SyncControllerSlices()
+
+ self.synchronizer.sync_cohort(cohort, False)
+
+ a = get_ansible_output()
+ self.assertDictContainsSubset({"delete": False, "name": o.name}, a)
+ o.save.assert_called_with(
+ update_fields=[
+ "enacted",
+ "backend_status",
+ "backend_register",
+ "backend_code",
+ ]
+ )
+ cs.save.assert_called_with(
+ update_fields=[
+ "enacted",
+ "backend_status",
+ "backend_register",
+ "backend_code",
+ ]
+ )
+
+ @mock.patch(
+ "steps.sync_instances.syncstep.run_template",
+ side_effect=run_fake_ansible_template,
+ )
+ @mock.patch("xossynchronizer.event_loop.model_accessor")
+ def test_deferred_exception(self, mock_run_template, mock_modelaccessor):
+ with mock.patch.object(Instance, "save") as instance_save:
+ cs = ControllerSlice()
+ s = Slice(name="SP SP")
+ cs.slice = s
+ cs.force_defer = True
+
+ o = Instance()
+ o.name = "Sisi Pascal"
+ o.slice = s
+
+ cohort = [cs, o]
+ o.synchronizer_step = steps.sync_instances.SyncInstances()
+ cs.synchronizer_step = steps.sync_controller_slices.SyncControllerSlices()
+
+ self.synchronizer.sync_cohort(cohort, False)
+ o.save.assert_called_with(
+ always_update_timestamp=True,
+ update_fields=["backend_status", "backend_register"],
+ )
+ self.assertEqual(cs.backend_code, 0)
+
+ self.assertIn("Force", cs.backend_status)
+ self.assertIn("Failed due to", o.backend_status)
+
+ @mock.patch(
+ "steps.sync_instances.syncstep.run_template",
+ side_effect=run_fake_ansible_template,
+ )
+ @mock.patch("xossynchronizer.event_loop.model_accessor")
+ def test_backend_status(self, mock_run_template, mock_modelaccessor):
+ with mock.patch.object(Instance, "save") as instance_save:
+ cs = ControllerSlice()
+ s = Slice(name="SP SP")
+ cs.slice = s
+ cs.force_fail = True
+
+ o = Instance()
+ o.name = "Sisi Pascal"
+ o.slice = s
+
+ cohort = [cs, o]
+ o.synchronizer_step = steps.sync_instances.SyncInstances()
+ cs.synchronizer_step = steps.sync_controller_slices.SyncControllerSlices()
+
+ self.synchronizer.sync_cohort(cohort, False)
+ o.save.assert_called_with(
+ always_update_timestamp=True,
+ update_fields=["backend_status", "backend_register"],
+ )
+ self.assertIn("Force", cs.backend_status)
+ self.assertIn("Failed due to", o.backend_status)
+
+ @mock.patch(
+ "steps.sync_instances.syncstep.run_template",
+ side_effect=run_fake_ansible_template,
+ )
+ @mock.patch("xossynchronizer.event_loop.model_accessor")
+ def test_fetch_pending(self, mock_run_template, mock_accessor, *_other_accessors):
+ pending_objects, pending_steps = self.synchronizer.fetch_pending()
+ pending_objects2 = list(pending_objects)
+
+ any_cs = next(
+ obj for obj in pending_objects if obj.leaf_model_name == "ControllerSlice"
+ )
+ any_instance = next(
+ obj for obj in pending_objects2 if obj.leaf_model_name == "Instance"
+ )
+
+ slice = Slice()
+ any_instance.slice = slice
+ any_cs.slice = slice
+
+ self.synchronizer.external_dependencies = []
+ cohorts = self.synchronizer.compute_dependent_cohorts(pending_objects, False)
+ flat_objects = [item for cohort in cohorts for item in cohort]
+
+ self.assertEqual(set(flat_objects), set(pending_objects))
+
+ @mock.patch(
+ "steps.sync_instances.syncstep.run_template",
+ side_effect=run_fake_ansible_template,
+ )
+ @mock.patch("xossynchronizer.event_loop.model_accessor")
+ def test_fetch_pending_with_external_dependencies(
+ self, mock_run_template, mock_accessor, *_other_accessors
+ ):
+ pending_objects, pending_steps = self.synchronizer.fetch_pending()
+ pending_objects2 = list(pending_objects)
+
+ any_cn = next(
+ obj for obj in pending_objects if obj.leaf_model_name == "ControllerNetwork"
+ )
+ any_user = next(
+ obj for obj in pending_objects2 if obj.leaf_model_name == "User"
+ )
+
+ cohorts = self.synchronizer.compute_dependent_cohorts(pending_objects, False)
+
+ flat_objects = [item for cohort in cohorts for item in cohort]
+ self.assertEqual(set(flat_objects), set(pending_objects))
+
+ # These cannot be None, but for documentation purposes
+ self.assertIsNotNone(any_cn)
+ self.assertIsNotNone(any_user)
+
+ @mock.patch(
+ "steps.sync_instances.syncstep.run_template",
+ side_effect=run_fake_ansible_template,
+ )
+ @mock.patch("xossynchronizer.event_loop.model_accessor")
+ def test_external_dependency_exception(self, mock_run_template, mock_modelaccessor):
+ cs = ControllerSlice()
+ s = Slice(name="SP SP")
+ cs.slice = s
+
+ o = Instance()
+ o.name = "Sisi Pascal"
+ o.slice = s
+
+ cohort = [cs, o]
+ o.synchronizer_step = None
+ o.synchronizer_step = steps.sync_instances.SyncInstances()
+
+ self.synchronizer.sync_cohort(cohort, False)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/lib/xos-synchronizer/tests/test_run.py b/lib/xos-synchronizer/tests/test_run.py
new file mode 100644
index 0000000..f5815f2
--- /dev/null
+++ b/lib/xos-synchronizer/tests/test_run.py
@@ -0,0 +1,121 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import unittest
+from mock import patch
+import mock
+import pdb
+import networkx as nx
+
+import os
+import sys
+
+test_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+sync_lib_dir = os.path.join(test_path, "..", "xossynchronizer")
+xos_dir = os.path.join(test_path, "..", "..", "..", "xos")
+
+ANSIBLE_FILE = "/tmp/payload_test"
+
+
+def run_fake_ansible_template(*args, **kwargs):
+ opts = args[1]
+ open(ANSIBLE_FILE, "w").write(json.dumps(opts))
+
+
+def get_ansible_output():
+ ansible_str = open(ANSIBLE_FILE).read()
+ return json.loads(ansible_str)
+
+
+class TestRun(unittest.TestCase):
+ def setUp(self):
+ self.sys_path_save = sys.path
+ self.cwd_save = os.getcwd()
+
+ config = os.path.join(test_path, "test_config.yaml")
+ from xosconfig import Config
+
+ Config.clear()
+ Config.init(config, "synchronizer-config-schema.yaml")
+
+ from xossynchronizer.mock_modelaccessor_build import (
+ build_mock_modelaccessor,
+ )
+
+ build_mock_modelaccessor(sync_lib_dir, xos_dir, services_dir=None, service_xprotos=[])
+
+ os.chdir(os.path.join(test_path, "..")) # config references tests/model-deps
+
+ import xossynchronizer.event_loop
+
+ reload(xossynchronizer.event_loop)
+ import xossynchronizer.backend
+
+ reload(xossynchronizer.backend)
+ from xossynchronizer.modelaccessor import model_accessor
+
+ # import all class names to globals
+ for (k, v) in model_accessor.all_model_classes.items():
+ globals()[k] = v
+
+ b = xossynchronizer.backend.Backend()
+ steps_dir = Config.get("steps_dir")
+ self.steps = b.load_sync_step_modules(steps_dir)
+ self.synchronizer = xossynchronizer.event_loop.XOSObserver(self.steps)
+ try:
+ os.remove("/tmp/sync_ports")
+ except OSError:
+ pass
+ try:
+ os.remove("/tmp/delete_ports")
+ except OSError:
+ pass
+
+ def tearDown(self):
+ sys.path = self.sys_path_save
+ os.chdir(self.cwd_save)
+
+ @mock.patch(
+ "steps.sync_instances.syncstep.run_template",
+ side_effect=run_fake_ansible_template,
+ )
+ @mock.patch("xossynchronizer.event_loop.model_accessor")
+ def test_run_once(self, mock_run_template, mock_accessor, *_other_accessors):
+
+ pending_objects, pending_steps = self.synchronizer.fetch_pending()
+ pending_objects2 = list(pending_objects)
+
+ any_cs = next(
+ obj for obj in pending_objects if obj.leaf_model_name == "ControllerSlice"
+ )
+ any_instance = next(
+ obj for obj in pending_objects2 if obj.leaf_model_name == "Instance"
+ )
+
+ slice = Slice()
+ any_instance.slice = slice
+ any_cs.slice = slice
+
+ self.synchronizer.run_once()
+
+ sync_ports = open("/tmp/sync_ports").read()
+ delete_ports = open("/tmp/delete_ports").read()
+
+ self.assertIn("successful", sync_ports)
+ self.assertIn("successful", delete_ports)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/lib/xos-synchronizer/tests/test_scheduler.py b/lib/xos-synchronizer/tests/test_scheduler.py
new file mode 100644
index 0000000..afbf036
--- /dev/null
+++ b/lib/xos-synchronizer/tests/test_scheduler.py
@@ -0,0 +1,272 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from mock import patch
+import mock
+import pdb
+import networkx as nx
+
+import os
+import sys
+
+test_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+sync_lib_dir = os.path.join(test_path, "..", "xossynchronizer")
+xos_dir = os.path.join(test_path, "..", "..", "..", "xos")
+
+class TestScheduling(unittest.TestCase):
+
+ __test__ = False
+
+ def setUp(self):
+ global mock_enumerator, event_loop
+
+ self.sys_path_save = sys.path
+ self.cwd_save = os.getcwd()
+
+ config = os.path.join(test_path, "test_config.yaml")
+ from xosconfig import Config
+
+ Config.clear()
+ Config.init(config, "synchronizer-config-schema.yaml")
+
+ from xossynchronizer.mock_modelaccessor_build import (
+ build_mock_modelaccessor,
+ )
+
+ build_mock_modelaccessor(sync_lib_dir, xos_dir, services_dir=None, service_xprotos=[])
+
+ os.chdir(os.path.join(test_path, "..")) # config references tests/model-deps
+
+ import xossynchronizer.event_loop
+
+ reload(xossynchronizer.event_loop)
+ import xossynchronizer.backend
+
+ reload(xossynchronizer.backend)
+ from xossynchronizer.mock_modelaccessor import mock_enumerator
+ from xossynchronizer.modelaccessor import model_accessor
+
+ # import all class names to globals
+ for (k, v) in model_accessor.all_model_classes.items():
+ globals()[k] = v
+
+ b = xossynchronizer.backend.Backend()
+ steps_dir = Config.get("steps_dir")
+ self.steps = b.load_sync_step_modules(steps_dir)
+ self.synchronizer = xossynchronizer.event_loop.XOSObserver(self.steps)
+
+ def tearDown(self):
+ sys.path = self.sys_path_save
+ os.chdir(self.cwd_save)
+
+ def test_same_object_trivial(self):
+ s = Slice(pk=4)
+ t = Slice(pk=4)
+ same, t = self.synchronizer.same_object(s, t)
+ self.assertTrue(same)
+ self.assertEqual(t, event_loop.DIRECT_EDGE)
+
+ def test_same_object_trivial2(self):
+ s = Slice(pk=4)
+ t = Slice(pk=5)
+ same, t = self.synchronizer.same_object(s, t)
+ self.assertFalse(same)
+
+ def test_same_object_lst(self):
+ s = Slice(pk=5)
+ t = ControllerSlice(slice=s)
+ u = ControllerSlice(slice=s)
+
+ s.controllerslices = mock_enumerator([t, u])
+
+ same, et = self.synchronizer.same_object(s.controllerslices, u)
+ self.assertTrue(same)
+ self.assertEqual(et, event_loop.PROXY_EDGE)
+
+ same, et = self.synchronizer.same_object(s.controllerslices, t)
+
+ self.assertTrue(same)
+ self.assertEqual(et, event_loop.PROXY_EDGE)
+
+ def test_same_object_lst_dc(self):
+ r = Slice(pk=4)
+ s = Slice(pk=5)
+ t = ControllerSlice(slice=r)
+ u = ControllerSlice(slice=s)
+
+ s.controllerslices = mock_enumerator([u])
+
+ same, et = self.synchronizer.same_object(s.controllerslices, t)
+ self.assertFalse(same)
+
+ same, et = self.synchronizer.same_object(s.controllerslices, u)
+ self.assertTrue(same)
+
+ def test_concrete_path_no_model_path(self):
+ p = Port()
+ n = NetworkParameter()
+ verdict, _ = self.synchronizer.concrete_path_exists(p, n)
+ self.assertFalse(verdict)
+
+ def test_concrete_no_object_path_adjacent(self):
+ p = Instance()
+ s1 = Slice()
+ s2 = Slice()
+ p.slice = s2
+ verdict, _ = self.synchronizer.concrete_path_exists(p, s1)
+
+ self.assertFalse(verdict)
+
+ def test_concrete_object_path_adjacent(self):
+ p = Instance()
+ s = Slice()
+ p.slice = s
+ verdict, edge_type = self.synchronizer.concrete_path_exists(p, s)
+
+ self.assertTrue(verdict)
+ self.assertEqual(edge_type, event_loop.DIRECT_EDGE)
+
+ def test_concrete_object_controller_path_adjacent(self):
+ p = Instance()
+ q = Instance()
+ cs = ControllerSlice()
+ cs2 = ControllerSlice()
+ s1 = Slice()
+ s2 = Slice()
+ p.slice = s1
+ q.slice = s2
+ cs.slice = s1
+ s1.controllerslices = mock_enumerator([cs])
+ s2.controllerslices = mock_enumerator([])
+
+ verdict1, edge_type1 = self.synchronizer.concrete_path_exists(p, cs)
+ verdict2, _ = self.synchronizer.concrete_path_exists(q, cs)
+ verdict3, _ = self.synchronizer.concrete_path_exists(p, cs2)
+
+ self.assertTrue(verdict1)
+ self.assertFalse(verdict2)
+ self.assertFalse(verdict3)
+
+ self.assertEqual(edge_type1, event_loop.PROXY_EDGE)
+
+ def test_concrete_object_controller_path_distant(self):
+ p = Instance()
+ s = Slice()
+ t = Site()
+ ct = ControllerSite()
+ ct.site = t
+ p.slice = s
+ s.site = t
+ verdict = self.synchronizer.concrete_path_exists(p, ct)
+ self.assertTrue(verdict)
+
+ def test_concrete_object_path_distant(self):
+ p = Instance()
+ s = Slice()
+ t = Site()
+ p.slice = s
+ s.site = t
+ verdict = self.synchronizer.concrete_path_exists(p, t)
+ self.assertTrue(verdict)
+
+ def test_concrete_no_object_path_distant(self):
+ p = Instance()
+ s = Slice()
+ s.controllerslice = mock_enumerator([])
+
+ t = Site()
+ t.controllersite = mock_enumerator([])
+
+ ct = ControllerSite()
+ ct.site = Site()
+ p.slice = s
+ s.site = t
+
+ verdict, _ = self.synchronizer.concrete_path_exists(p, ct)
+ self.assertFalse(verdict)
+
+ def test_cohorting_independent(self):
+ i = Image()
+
+ p = Slice()
+ c = Instance()
+ c.slice = None
+ c.image = None
+
+ cohorts = self.synchronizer.compute_dependent_cohorts([i, p, c], False)
+ self.assertEqual(len(cohorts), 3)
+
+ def test_cohorting_related(self):
+ i = Image()
+ p = Port()
+ c = Instance()
+ c.image = i
+ s = ControllerSlice()
+
+ cohorts = self.synchronizer.compute_dependent_cohorts([i, p, c, s], False)
+ self.assertIn([i, c], cohorts)
+ self.assertIn([p], cohorts)
+ self.assertIn([s], cohorts)
+
+ def test_cohorting_related_multi(self):
+ i = Image()
+ p = Port()
+ c = Instance()
+ c.image = i
+ cs = ControllerSlice()
+ s = Slice()
+ cs.slice = s
+ s.controllerslices = mock_enumerator([cs])
+ c.slice = s
+
+ cohorts = self.synchronizer.compute_dependent_cohorts([i, p, c, s, cs], False)
+
+ big_cohort = max(cohorts, key=len)
+ self.assertGreater(big_cohort.index(c), big_cohort.index(i))
+ self.assertGreater(big_cohort.index(cs), big_cohort.index(s))
+ self.assertIn([p], cohorts)
+
+ def test_cohorting_related_multi_delete(self):
+ i = Image()
+ p = Port()
+ c = Instance()
+ c.image = i
+ cs = ControllerSlice()
+ s = Slice()
+ cs.slice = s
+ c.slice = s
+
+ cohorts = self.synchronizer.compute_dependent_cohorts([i, p, c, s, cs], True)
+
+ big_cohort = max(cohorts, key=len)
+ self.assertGreater(big_cohort.index(i), big_cohort.index(c))
+ self.assertGreater(big_cohort.index(s), big_cohort.index(cs))
+ self.assertIn([p], cohorts)
+
+ def test_cohorting_related_delete(self):
+ i = Image()
+ p = Port()
+ c = Instance()
+ c.image = i
+ s = ControllerSlice()
+
+ cohorts = self.synchronizer.compute_dependent_cohorts([i, p, c, s], True)
+ self.assertIn([c, i], cohorts)
+ self.assertIn([p], cohorts)
+ self.assertIn([s], cohorts)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/lib/xos-synchronizer/tests/test_services.py b/lib/xos-synchronizer/tests/test_services.py
new file mode 100644
index 0000000..2456c27
--- /dev/null
+++ b/lib/xos-synchronizer/tests/test_services.py
@@ -0,0 +1,87 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from mock import patch
+import mock
+import pdb
+import networkx as nx
+
+import os
+import sys
+
+test_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+sync_lib_dir = os.path.join(test_path, "..", "xossynchronizer")
+xos_dir = os.path.join(test_path, "..", "..", "..", "xos")
+
+
+class TestServices(unittest.TestCase):
+ def setUp(self):
+ self.sys_path_save = sys.path
+ self.cwd_save = os.getcwd()
+
+ config = os.path.join(test_path, "test_config.yaml")
+ from xosconfig import Config
+
+ Config.clear()
+ Config.init(config, "synchronizer-config-schema.yaml")
+
+ from xossynchronizer.mock_modelaccessor_build import (
+ build_mock_modelaccessor,
+ )
+
+ build_mock_modelaccessor(sync_lib_dir, xos_dir, services_dir=None, service_xprotos=[])
+
+ os.chdir(os.path.join(test_path, "..")) # config references tests/model-deps
+
+ import xossynchronizer.event_loop
+
+ reload(xossynchronizer.event_loop)
+ import xossynchronizer.backend
+
+ reload(xossynchronizer.backend)
+ from xossynchronizer.modelaccessor import model_accessor
+
+ # import all class names to globals
+ for (k, v) in model_accessor.all_model_classes.items():
+ globals()[k] = v
+
+ b = xossynchronizer.backend.Backend()
+ steps_dir = Config.get("steps_dir")
+ self.steps = b.load_sync_step_modules(steps_dir)
+ self.synchronizer = xossynchronizer.event_loop.XOSObserver(self.steps)
+
+ def tearDown(self):
+ sys.path = self.sys_path_save
+ os.chdir(self.cwd_save)
+
+ def test_service_models(self):
+ s = Service()
+ a = ServiceInstance(owner=s)
+
+ cohorts = self.synchronizer.compute_dependent_cohorts([a, s], False)
+ self.assertIn([s, a], cohorts)
+
+ cohorts = self.synchronizer.compute_dependent_cohorts([s, a], False)
+ self.assertIn([s, a], cohorts)
+
+ cohorts = self.synchronizer.compute_dependent_cohorts([a, s], True)
+ self.assertIn([a, s], cohorts)
+
+ cohorts = self.synchronizer.compute_dependent_cohorts([s, a], True)
+ self.assertIn([a, s], cohorts)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/lib/xos-synchronizer/xossynchronizer/__init__.py b/lib/xos-synchronizer/xossynchronizer/__init__.py
new file mode 100644
index 0000000..18ab956
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/__init__.py
@@ -0,0 +1,17 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from .synchronizer import Synchronizer
+
+__all__ = ["Synchronizer"]
diff --git a/lib/xos-synchronizer/xossynchronizer/ansible_helper.py b/lib/xos-synchronizer/xossynchronizer/ansible_helper.py
new file mode 100644
index 0000000..c607607
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/ansible_helper.py
@@ -0,0 +1,325 @@
+#!/usr/bin/env python
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import print_function
+import jinja2
+import tempfile
+import os
+import json
+import pickle
+import pdb
+import string
+import random
+import re
+import traceback
+import subprocess
+import threading
+
+from multiprocessing import Process, Queue
+from xosconfig import Config
+
+from multistructlog import create_logger
+
+log = create_logger(Config().get("logging"))
+
+
+step_dir = Config.get("steps_dir")
+sys_dir = Config.get("sys_dir")
+
+os_template_loader = jinja2.FileSystemLoader(
+ searchpath=[step_dir, "/opt/xos/synchronizers/shared_templates"]
+)
+os_template_env = jinja2.Environment(loader=os_template_loader)
+
+
+def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
+ return "".join(random.choice(chars) for _ in range(size))
+
+
+def shellquote(s):
+ return "'" + s.replace("'", "'\\''") + "'"
+
+
+def get_playbook_fn(opts, path):
+ if not opts.get("ansible_tag", None):
+ # if no ansible_tag is in the options, then generate a unique one
+ objname = id_generator()
+ opts = opts.copy()
+ opts["ansible_tag"] = objname
+
+ objname = opts["ansible_tag"]
+
+ pathed_sys_dir = os.path.join(sys_dir, path)
+ if not os.path.isdir(pathed_sys_dir):
+ os.makedirs(pathed_sys_dir)
+
+ # symlink steps/roles into sys/roles so that playbooks can access roles
+ roledir = os.path.join(step_dir, "roles")
+ rolelink = os.path.join(pathed_sys_dir, "roles")
+ if os.path.isdir(roledir) and not os.path.islink(rolelink):
+ os.symlink(roledir, rolelink)
+
+ return (opts, os.path.join(pathed_sys_dir, objname))
+
+
+def run_playbook(ansible_hosts, ansible_config, fqp, opts):
+ args = {
+ "ansible_hosts": ansible_hosts,
+ "ansible_config": ansible_config,
+ "fqp": fqp,
+ "opts": opts,
+ "config_file": Config.get_config_file(),
+ }
+
+ keep_temp_files = Config.get("keep_temp_files")
+
+ dir = tempfile.mkdtemp()
+ args_fn = None
+ result_fn = None
+ try:
+ log.info("creating args file", dir=dir)
+
+ args_fn = os.path.join(dir, "args")
+ result_fn = os.path.join(dir, "result")
+
+ open(args_fn, "w").write(pickle.dumps(args))
+
+ ansible_main_fn = os.path.join(os.path.dirname(__file__), "ansible_main.py")
+
+ os.system("python %s %s %s" % (ansible_main_fn, args_fn, result_fn))
+
+ result = pickle.loads(open(result_fn).read())
+
+ if hasattr(result, "exception"):
+ log.error("Exception in playbook", exception=result["exception"])
+
+ stats = result.get("stats", None)
+ aresults = result.get("aresults", None)
+ except Exception as e:
+ log.exception("Exception running ansible_main")
+ stats = None
+ aresults = None
+ finally:
+ if not keep_temp_files:
+ if args_fn and os.path.exists(args_fn):
+ os.remove(args_fn)
+ if result_fn and os.path.exists(result_fn):
+ os.remove(result_fn)
+ os.rmdir(dir)
+
+ return (stats, aresults)
+
+
+def run_template(
+ name,
+ opts,
+ path="",
+ expected_num=None,
+ ansible_config=None,
+ ansible_hosts=None,
+ run_ansible_script=None,
+ object=None,
+):
+ template = os_template_env.get_template(name)
+ buffer = template.render(opts)
+
+ (opts, fqp) = get_playbook_fn(opts, path)
+
+ f = open(fqp, "w")
+ f.write(buffer)
+ f.flush()
+
+ """
+ q = Queue()
+ p = Process(target=run_playbook, args=(ansible_hosts, ansible_config, fqp, opts, q,))
+ p.start()
+ stats,aresults = q.get()
+ p.join()
+ """
+ stats, aresults = run_playbook(ansible_hosts, ansible_config, fqp, opts)
+
+ error_msg = []
+
+ output_file = fqp + ".out"
+ try:
+ if aresults is None:
+ raise ValueError("Error executing playbook %s" % fqp)
+
+ ok_results = []
+ total_unreachable = 0
+ failed = 0
+
+ ofile = open(output_file, "w")
+
+ for x in aresults:
+ if not x.is_failed() and not x.is_unreachable() and not x.is_skipped():
+ ok_results.append(x)
+ elif x.is_unreachable():
+ failed += 1
+ total_unreachable += 1
+ try:
+ error_msg.append(x._result["msg"])
+ except BaseException:
+ pass
+ elif x.is_failed():
+ failed += 1
+ try:
+ error_msg.append(x._result["msg"])
+ except BaseException:
+ pass
+
+ # FIXME (zdw, 2017-02-19) - may not be needed with new callback logging
+
+ ofile.write("%s: %s\n" % (x._task, str(x._result)))
+
+ if object:
+ oprops = object.tologdict()
+ ansible = x._result
+ oprops["xos_type"] = "ansible"
+ oprops["ansible_result"] = json.dumps(ansible)
+
+ if failed == 0:
+ oprops["ansible_status"] = "OK"
+ else:
+ oprops["ansible_status"] = "FAILED"
+
+ log.info("Ran Ansible task", task=x._task, **oprops)
+
+ ofile.close()
+
+ if (expected_num is not None) and (len(ok_results) != expected_num):
+ raise ValueError(
+ "Unexpected num %s!=%d" % (str(expected_num), len(ok_results))
+ )
+
+ if failed:
+ raise ValueError("Ansible playbook failed.")
+
+ # NOTE(smbaker): Playbook errors are slipping through where `aresults` does not show any failed tasks, but
+ # `stats` does show them. See CORD-3169.
+ hosts = sorted(stats.processed.keys())
+ for h in hosts:
+ t = stats.summarize(h)
+ if t["unreachable"] > 0:
+ raise ValueError(
+ "Ansible playbook reported unreachable for host %s" % h
+ )
+ if t["failures"] > 0:
+ raise ValueError("Ansible playbook reported failures for host %s" % h)
+
+ except ValueError as e:
+ if error_msg:
+ try:
+ error = " // ".join(error_msg)
+ except BaseException:
+ error = "failed to join error_msg"
+ raise Exception(error)
+ else:
+ raise
+
+ processed_results = map(lambda x: x._result, ok_results)
+ return processed_results[1:] # 0 is setup
+
+
+def run_template_ssh(name, opts, path="", expected_num=None, object=None):
+ instance_name = opts["instance_name"]
+ hostname = opts["hostname"]
+ private_key = opts["private_key"]
+ baremetal_ssh = opts.get("baremetal_ssh", False)
+ if baremetal_ssh:
+ # no instance_id or ssh_ip for baremetal
+ # we never proxy to baremetal
+ proxy_ssh = False
+ else:
+ instance_id = opts["instance_id"]
+ ssh_ip = opts["ssh_ip"]
+ proxy_ssh = Config.get("proxy_ssh.enabled")
+
+ if not ssh_ip:
+ raise Exception("IP of ssh proxy not available. Synchronization deferred")
+
+ (opts, fqp) = get_playbook_fn(opts, path)
+ private_key_pathname = fqp + ".key"
+ config_pathname = fqp + ".cfg"
+ hosts_pathname = fqp + ".hosts"
+
+ f = open(private_key_pathname, "w")
+ f.write(private_key)
+ f.close()
+
+ f = open(config_pathname, "w")
+ f.write("[ssh_connection]\n")
+ if proxy_ssh:
+ proxy_ssh_key = Config.get("proxy_ssh.key")
+ proxy_ssh_user = Config.get("proxy_ssh.user")
+ if proxy_ssh_key:
+ # If proxy_ssh_key is known, then we can proxy into the compute
+ # node without needing to have the OpenCloud sshd machinery in
+ # place.
+ proxy_command = (
+ "ProxyCommand ssh -q -i %s -o StrictHostKeyChecking=no %s@%s nc %s 22"
+ % (proxy_ssh_key, proxy_ssh_user, hostname, ssh_ip)
+ )
+ else:
+ proxy_command = (
+ "ProxyCommand ssh -q -i %s -o StrictHostKeyChecking=no %s@%s"
+ % (private_key_pathname, instance_id, hostname)
+ )
+ f.write('ssh_args = -o "%s"\n' % proxy_command)
+ f.write("scp_if_ssh = True\n")
+ f.write("pipelining = True\n")
+ f.write("\n[defaults]\n")
+ f.write("host_key_checking = False\n")
+ f.write("timeout = 30\n")
+ f.close()
+
+ f = open(hosts_pathname, "w")
+ f.write("[%s]\n" % instance_name)
+ f.write("%s ansible_ssh_private_key_file=%s\n" % (ssh_ip, private_key_pathname))
+ f.close()
+
+ # SSH will complain if private key is world or group readable
+ os.chmod(private_key_pathname, 0o600)
+
+ print("ANSIBLE_CONFIG=%s" % config_pathname)
+ print("ANSIBLE_HOSTS=%s" % hosts_pathname)
+
+ return run_template(
+ name,
+ opts,
+ path,
+ ansible_config=config_pathname,
+ ansible_hosts=hosts_pathname,
+ run_ansible_script="/opt/xos/synchronizers/base/run_ansible_verbose",
+ object=object,
+ )
+
+
+def main():
+ run_template(
+ "ansible/sync_user_deployments.yaml",
+ {
+ "endpoint": "http://172.31.38.128:5000/v2.0/",
+ "name": "Sapan Bhatia",
+ "email": "gwsapan@gmail.com",
+ "password": "foobar",
+ "admin_user": "admin",
+ "admin_password": "6a789bf69dd647e2",
+ "admin_tenant": "admin",
+ "tenant": "demo",
+ "roles": ["user", "admin"],
+ },
+ )
diff --git a/lib/xos-synchronizer/xossynchronizer/ansible_main.py b/lib/xos-synchronizer/xossynchronizer/ansible_main.py
new file mode 100644
index 0000000..08283a4
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/ansible_main.py
@@ -0,0 +1,80 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import pickle
+import sys
+
+# import json
+import traceback
+from xosconfig import Config
+
+sys.path.append("/opt/xos")
+
+
+def run_playbook(ansible_hosts, ansible_config, fqp, opts):
+ try:
+ if ansible_config:
+ os.environ["ANSIBLE_CONFIG"] = ansible_config
+ else:
+ try:
+ del os.environ["ANSIBLE_CONFIG"]
+ except KeyError:
+ pass
+
+ if ansible_hosts:
+ os.environ["ANSIBLE_HOSTS"] = ansible_hosts
+ else:
+ try:
+ del os.environ["ANSIBLE_HOSTS"]
+ except KeyError:
+ pass
+
+ import ansible_runner
+
+ reload(ansible_runner)
+
+ # Dropped support for observer_pretend - to be redone
+ runner = ansible_runner.Runner(
+ playbook=fqp, run_data=opts, host_file=ansible_hosts
+ )
+
+ stats, aresults = runner.run()
+ except Exception as e:
+ return {"stats": None, "aresults": None, "exception": traceback.format_exc()}
+
+ return {"stats": stats, "aresults": aresults}
+
+
+def main():
+ input_fn = sys.argv[1]
+ result_fn = sys.argv[2]
+
+ args = pickle.loads(open(input_fn).read())
+
+ Config.init(args["config_file"], "synchronizer-config-schema.yaml")
+
+ ansible_hosts = args["ansible_hosts"]
+ ansible_config = args["ansible_config"]
+ fqp = args["fqp"]
+ opts = args["opts"]
+
+ result = run_playbook(ansible_hosts, ansible_config, fqp, opts)
+
+ open(result_fn, "w").write(pickle.dumps(result))
+
+
+if __name__ == "__main__":
+ main()
diff --git a/lib/xos-synchronizer/xossynchronizer/ansible_runner.py b/lib/xos-synchronizer/xossynchronizer/ansible_runner.py
new file mode 100644
index 0000000..d20feb5
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/ansible_runner.py
@@ -0,0 +1,388 @@
+#!/usr/bin/env python
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from multistructlog import create_logger
+from xosconfig import Config
+from ansible.plugins.callback import CallbackBase
+from ansible.utils.display import Display
+from ansible.executor import playbook_executor
+from ansible.parsing.dataloader import DataLoader
+from ansible.vars.manager import VariableManager
+from ansible.inventory.manager import InventoryManager
+from tempfile import NamedTemporaryFile
+import os
+import sys
+import pdb
+import json
+import uuid
+
+from ansible import constants
+
+constants = reload(constants)
+
+
+log = create_logger(Config().get("logging"))
+
+
+class ResultCallback(CallbackBase):
+
+ CALLBACK_VERSION = 2.0
+ CALLBACK_NAME = "resultcallback"
+ CALLBACK_TYPE = "programmatic"
+
+ def __init__(self):
+ super(ResultCallback, self).__init__()
+ self.results = []
+ self.uuid = str(uuid.uuid1())
+ self.playbook_status = "OK"
+
+ def v2_playbook_on_start(self, playbook):
+ self.playbook = playbook._file_name
+ log_extra = {
+ "xos_type": "ansible",
+ "ansible_uuid": self.uuid,
+ "ansible_type": "playbook start",
+ "ansible_status": "OK",
+ "ansible_playbook": self.playbook,
+ }
+ log.info("PLAYBOOK START", playbook=self.playbook, **log_extra)
+
+ def v2_playbook_on_stats(self, stats):
+ host_stats = {}
+ for host in stats.processed.keys():
+ host_stats[host] = stats.summarize(host)
+
+ log_extra = {
+ "xos_type": "ansible",
+ "ansible_uuid": self.uuid,
+ "ansible_type": "playbook stats",
+ "ansible_status": self.playbook_status,
+ "ansible_playbook": self.playbook,
+ "ansible_result": json.dumps(host_stats),
+ }
+
+ if self.playbook_status == "OK":
+ log.info("PLAYBOOK END", playbook=self.playbook, **log_extra)
+ else:
+ log.error("PLAYBOOK END", playbook=self.playbook, **log_extra)
+
+ def v2_playbook_on_play_start(self, play):
+ log_extra = {
+ "xos_type": "ansible",
+ "ansible_uuid": self.uuid,
+ "ansible_type": "play start",
+ "ansible_status": self.playbook_status,
+ "ansible_playbook": self.playbook,
+ }
+ log.debug("PLAY START", play_name=play.name, **log_extra)
+
+ def v2_runner_on_ok(self, result, **kwargs):
+ log_extra = {
+ "xos_type": "ansible",
+ "ansible_uuid": self.uuid,
+ "ansible_type": "task",
+ "ansible_status": "OK",
+ "ansible_result": json.dumps(result._result),
+ "ansible_task": result._task,
+ "ansible_playbook": self.playbook,
+ "ansible_host": result._host.get_name(),
+ }
+ log.debug("OK", task=str(result._task), **log_extra)
+ self.results.append(result)
+
+ def v2_runner_on_failed(self, result, **kwargs):
+ self.playbook_status = "FAILED"
+ log_extra = {
+ "xos_type": "ansible",
+ "ansible_uuid": self.uuid,
+ "ansible_type": "task",
+ "ansible_status": "FAILED",
+ "ansible_result": json.dumps(result._result),
+ "ansible_task": result._task,
+ "ansible_playbook": self.playbook,
+ "ansible_host": result._host.get_name(),
+ }
+ log.error("FAILED", task=str(result._task), **log_extra)
+ self.results.append(result)
+
+ def v2_runner_on_async_failed(self, result, **kwargs):
+ self.playbook_status = "FAILED"
+ log_extra = {
+ "xos_type": "ansible",
+ "ansible_uuid": self.uuid,
+ "ansible_type": "task",
+ "ansible_status": "ASYNC FAILED",
+ "ansible_result": json.dumps(result._result),
+ "ansible_task": result._task,
+ "ansible_playbook": self.playbook,
+ "ansible_host": result._host.get_name(),
+ }
+ log.error("ASYNC FAILED", task=str(result._task), **log_extra)
+
+ def v2_runner_on_skipped(self, result, **kwargs):
+ log_extra = {
+ "xos_type": "ansible",
+ "ansible_uuid": self.uuid,
+ "ansible_type": "task",
+ "ansible_status": "SKIPPED",
+ "ansible_result": json.dumps(result._result),
+ "ansible_task": result._task,
+ "ansible_playbook": self.playbook,
+ "ansible_host": result._host.get_name(),
+ }
+ log.debug("SKIPPED", task=str(result._task), **log_extra)
+ self.results.append(result)
+
+ def v2_runner_on_unreachable(self, result, **kwargs):
+ log_extra = {
+ "xos_type": "ansible",
+ "ansible_uuid": self.uuid,
+ "ansible_type": "task",
+ "ansible_status": "UNREACHABLE",
+ "ansible_result": json.dumps(result._result),
+ "ansible_task": result._task,
+ "ansible_playbook": self.playbook,
+ "ansible_host": result._host.get_name(),
+ }
+ log.error("UNREACHABLE", task=str(result._task), **log_extra)
+ self.results.append(result)
+
+ def v2_runner_retry(self, result, **kwargs):
+ log_extra = {
+ "xos_type": "ansible",
+ "ansible_uuid": self.uuid,
+ "ansible_type": "task",
+ "ansible_status": "RETRY",
+ "ansible_result": json.dumps(result._result),
+ "ansible_task": result._task,
+ "ansible_playbook": self.playbook,
+ "ansible_host": result._host.get_name(),
+ }
+ log.warning(
+ "RETRYING - attempt",
+ task=str(result._task),
+ attempt=result._result["attempts"],
+ **log_extra
+ )
+ self.results.append(result)
+
+ def v2_playbook_on_handler_task_start(self, task, **kwargs):
+ log_extra = {
+ "xos_type": "ansible",
+ "ansible_uuid": self.uuid,
+ "ansible_type": "task",
+ "ansible_status": "HANDLER",
+ "ansible_task": task.get_name().strip(),
+ "ansible_playbook": self.playbook,
+ # 'ansible_host': result._host.get_name()
+ }
+ log.debug("HANDLER", task=task.get_name().strip(), **log_extra)
+
+ def v2_playbook_on_import_for_host(self, result, imported_file):
+ log_extra = {
+ "xos_type": "ansible",
+ "ansible_uuid": self.uuid,
+ "ansible_type": "import",
+ "ansible_status": "IMPORT",
+ "ansible_result": json.dumps(result._result),
+ "ansible_playbook": self.playbook,
+ "ansible_host": result._host.get_name(),
+ }
+ log.debug("IMPORT", imported_file=imported_file, **log_extra)
+ self.results.append(result)
+
+ def v2_playbook_on_not_import_for_host(self, result, missing_file):
+ log_extra = {
+ "xos_type": "ansible",
+ "ansible_uuid": self.uuid,
+ "ansible_type": "import",
+ "ansible_status": "MISSING IMPORT",
+ "ansible_result": json.dumps(result._result),
+ "ansible_playbook": self.playbook,
+ "ansible_host": result._host.get_name(),
+ }
+ log.debug("MISSING IMPORT", missing=missing_file, **log_extra)
+ self.results.append(result)
+
+
+class Options(object):
+ """
+ Options class to replace Ansible OptParser
+ """
+
+ def __init__(
+ self,
+ ask_pass=None,
+ ask_su_pass=None,
+ ask_sudo_pass=None,
+ become=None,
+ become_ask_pass=None,
+ become_method=None,
+ become_user=None,
+ check=None,
+ connection=None,
+ diff=None,
+ flush_cache=None,
+ force_handlers=None,
+ forks=1,
+ listtags=None,
+ listtasks=None,
+ module_path=None,
+ new_vault_password_file=None,
+ one_line=None,
+ output_file=None,
+ poll_interval=None,
+ private_key_file=None,
+ remote_user=None,
+ scp_extra_args=None,
+ seconds=None,
+ sftp_extra_args=None,
+ skip_tags=None,
+ ssh_common_args=None,
+ ssh_extra_args=None,
+ sudo=None,
+ sudo_user=None,
+ syntax=None,
+ tags=None,
+ timeout=None,
+ tree=None,
+ vault_password_files=None,
+ ask_vault_pass=None,
+ extra_vars=None,
+ inventory=None,
+ listhosts=None,
+ module_paths=None,
+ subset=None,
+ verbosity=None,
+ ):
+
+ if tags:
+ self.tags = tags
+
+ if skip_tags:
+ self.skip_tags = skip_tags
+
+ self.ask_pass = ask_pass
+ self.ask_su_pass = ask_su_pass
+ self.ask_sudo_pass = ask_sudo_pass
+ self.ask_vault_pass = ask_vault_pass
+ self.become = become
+ self.become_ask_pass = become_ask_pass
+ self.become_method = become_method
+ self.become_user = become_user
+ self.check = check
+ self.connection = connection
+ self.diff = diff
+ self.extra_vars = extra_vars
+ self.flush_cache = flush_cache
+ self.force_handlers = force_handlers
+ self.forks = forks
+ self.inventory = inventory
+ self.listhosts = listhosts
+ self.listtags = listtags
+ self.listtasks = listtasks
+ self.module_path = module_path
+ self.module_paths = module_paths
+ self.new_vault_password_file = new_vault_password_file
+ self.one_line = one_line
+ self.output_file = output_file
+ self.poll_interval = poll_interval
+ self.private_key_file = private_key_file
+ self.remote_user = remote_user
+ self.scp_extra_args = scp_extra_args
+ self.seconds = seconds
+ self.sftp_extra_args = sftp_extra_args
+ self.ssh_common_args = ssh_common_args
+ self.ssh_extra_args = ssh_extra_args
+ self.subset = subset
+ self.sudo = sudo
+ self.sudo_user = sudo_user
+ self.syntax = syntax
+ self.timeout = timeout
+ self.tree = tree
+ self.vault_password_files = vault_password_files
+ self.verbosity = verbosity
+
+
+class Runner(object):
+ def __init__(
+ self, playbook, run_data, private_key_file=None, verbosity=0, host_file=None
+ ):
+
+ self.playbook = playbook
+ self.run_data = run_data
+
+ self.options = Options()
+ self.options.output_file = playbook + ".result"
+ self.options.private_key_file = private_key_file
+ self.options.verbosity = verbosity
+ self.options.connection = "ssh" # Need a connection type "smart" or "ssh"
+ # self.options.become = True
+ self.options.become_method = "sudo"
+ self.options.become_user = "root"
+
+ # Set global verbosity
+ self.display = Display()
+ self.display.verbosity = self.options.verbosity
+ # Executor appears to have it's own
+ # verbosity object/setting as well
+ playbook_executor.verbosity = self.options.verbosity
+
+ # Become Pass Needed if not logging in as user root
+ # passwords = {'become_pass': become_pass}
+
+ # Gets data from YAML/JSON files
+ self.loader = DataLoader()
+ try:
+ self.loader.set_vault_password(os.environ["VAULT_PASS"])
+ except AttributeError:
+ pass
+
+ # Set inventory, using most of above objects
+ if host_file:
+ self.inventory = InventoryManager(loader=self.loader, sources=host_file)
+ else:
+ self.inventory = InventoryManager(loader=self.loader)
+
+ # All the variables from all the various places
+ self.variable_manager = VariableManager(
+ loader=self.loader, inventory=self.inventory
+ )
+ self.variable_manager.extra_vars = {} # self.run_data
+
+ # Setup playbook executor, but don't run until run() called
+ self.pbex = playbook_executor.PlaybookExecutor(
+ playbooks=[playbook],
+ inventory=self.inventory,
+ variable_manager=self.variable_manager,
+ loader=self.loader,
+ options=self.options,
+ passwords={},
+ )
+
+ def run(self):
+ os.environ[
+ "REQUESTS_CA_BUNDLE"
+ ] = "/usr/local/share/ca-certificates/local_certs.crt"
+ callback = ResultCallback()
+ self.pbex._tqm._stdout_callback = callback
+
+ self.pbex.run()
+ stats = self.pbex._tqm._stats
+
+ # os.remove(self.hosts.name)
+
+ return stats, callback.results
diff --git a/lib/xos-synchronizer/xossynchronizer/apiaccessor.py b/lib/xos-synchronizer/xossynchronizer/apiaccessor.py
new file mode 100644
index 0000000..a56381b
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/apiaccessor.py
@@ -0,0 +1,92 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from modelaccessor import ModelAccessor
+import datetime
+import time
+
+
+class CoreApiModelAccessor(ModelAccessor):
+ def __init__(self, orm):
+ self.orm = orm
+ super(CoreApiModelAccessor, self).__init__()
+
+ def get_all_model_classes(self):
+ all_model_classes = {}
+ for k in self.orm.all_model_names:
+ all_model_classes[k] = getattr(self.orm, k)
+ return all_model_classes
+
+ def fetch_pending(self, main_objs, deletion=False):
+ if not isinstance(main_objs, list):
+ main_objs = [main_objs]
+
+ objs = []
+ for main_obj in main_objs:
+ if not deletion:
+ lobjs = main_obj.objects.filter_special(
+ main_obj.objects.SYNCHRONIZER_DIRTY_OBJECTS
+ )
+ else:
+ lobjs = main_obj.objects.filter_special(
+ main_obj.objects.SYNCHRONIZER_DELETED_OBJECTS
+ )
+ objs.extend(lobjs)
+
+ return objs
+
+ def fetch_policies(self, main_objs, deletion=False):
+ if not isinstance(main_objs, list):
+ main_objs = [main_objs]
+
+ objs = []
+ for main_obj in main_objs:
+ if not deletion:
+ lobjs = main_obj.objects.filter_special(
+ main_obj.objects.SYNCHRONIZER_DIRTY_POLICIES
+ )
+ else:
+ lobjs = main_obj.objects.filter_special(
+ main_obj.objects.SYNCHRONIZER_DELETED_POLICIES
+ )
+ objs.extend(lobjs)
+
+ return objs
+
+ def obj_exists(self, o):
+ # gRPC will default id to '0' for uninitialized objects
+ return (o.id is not None) and (o.id != 0)
+
+ def obj_in_list(self, o, olist):
+ ids = [x.id for x in olist]
+ return o.id in ids
+
+ def now(self):
+ """ Return the current time for timestamping purposes """
+ return (
+ datetime.datetime.utcnow() - datetime.datetime.fromtimestamp(0)
+ ).total_seconds()
+
+ def is_type(self, obj, name):
+ return obj._wrapped_class.__class__.__name__ == name
+
+ def is_instance(self, obj, name):
+ return name in obj.class_names.split(",")
+
+ def get_content_type_id(self, obj):
+ return obj.self_content_type_id
+
+ def create_obj(self, cls, **kwargs):
+ return cls.objects.new(**kwargs)
diff --git a/lib/xos-synchronizer/xossynchronizer/backend.py b/lib/xos-synchronizer/xossynchronizer/backend.py
new file mode 100644
index 0000000..b404864
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/backend.py
@@ -0,0 +1,165 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import print_function
+import os
+import inspect
+import imp
+import sys
+import threading
+import time
+from xossynchronizer.steps.syncstep import SyncStep
+from xossynchronizer.event_loop import XOSObserver
+from xossynchronizer.model_policy_loop import XOSPolicyEngine
+from xossynchronizer.event_engine import XOSEventEngine
+from xossynchronizer.pull_step_engine import XOSPullStepEngine
+from xossynchronizer.modelaccessor import *
+
+from xosconfig import Config
+from multistructlog import create_logger
+
+log = create_logger(Config().get("logging"))
+
+
+class Backend:
+ def __init__(self, log=log):
+ self.log = log
+
+ def load_sync_step_modules(self, step_dir):
+ sync_steps = []
+
+ self.log.info("Loading sync steps", step_dir=step_dir)
+
+ for fn in os.listdir(step_dir):
+ pathname = os.path.join(step_dir, fn)
+ if (
+ os.path.isfile(pathname)
+ and fn.endswith(".py")
+ and (fn != "__init__.py")
+ and (not fn.startswith("test"))
+ ):
+
+ # we need to extend the path to load modules in the step_dir
+ sys_path_save = sys.path
+ sys.path.append(step_dir)
+ module = imp.load_source(fn[:-3], pathname)
+
+ self.log.debug("Loaded file: %s", pathname)
+
+ # reset the original path
+ sys.path = sys_path_save
+
+ for classname in dir(module):
+ c = getattr(module, classname, None)
+
+ # if classname.startswith("Sync"):
+ # print classname, c, inspect.isclass(c), issubclass(c, SyncStep), hasattr(c,"provides")
+
+ # make sure 'c' is a descendent of SyncStep and has a
+ # provides field (this eliminates the abstract base classes
+ # since they don't have a provides)
+
+ if inspect.isclass(c):
+ bases = inspect.getmro(c)
+ base_names = [b.__name__ for b in bases]
+ if (
+ ("SyncStep" in base_names)
+ and (hasattr(c, "provides") or hasattr(c, "observes"))
+ and (c not in sync_steps)
+ ):
+ sync_steps.append(c)
+
+ self.log.info("Loaded sync steps", steps=sync_steps)
+
+ return sync_steps
+
+ def run(self):
+ observer_thread = None
+ model_policy_thread = None
+ event_engine = None
+
+ steps_dir = Config.get("steps_dir")
+ if steps_dir:
+ sync_steps = []
+
+ # load sync_steps
+ if steps_dir:
+ sync_steps = self.load_sync_step_modules(steps_dir)
+
+ # if we have at least one sync_step
+ if len(sync_steps) > 0:
+ # start the observer
+ self.log.info("Starting XOSObserver", sync_steps=sync_steps)
+ observer = XOSObserver(sync_steps, self.log)
+ observer_thread = threading.Thread(
+ target=observer.run, name="synchronizer"
+ )
+ observer_thread.start()
+
+ else:
+ self.log.info("Skipping observer thread due to no steps dir.")
+
+ pull_steps_dir = Config.get("pull_steps_dir")
+ if pull_steps_dir:
+ self.log.info("Starting XOSPullStepEngine", pull_steps_dir=pull_steps_dir)
+ pull_steps_engine = XOSPullStepEngine()
+ pull_steps_engine.load_pull_step_modules(pull_steps_dir)
+ pull_steps_thread = threading.Thread(
+ target=pull_steps_engine.start, name="pull_step_engine"
+ )
+ pull_steps_thread.start()
+ else:
+ self.log.info("Skipping pull step engine due to no pull_steps_dir dir.")
+
+ event_steps_dir = Config.get("event_steps_dir")
+ if event_steps_dir:
+ self.log.info("Starting XOSEventEngine", event_steps_dir=event_steps_dir)
+ event_engine = XOSEventEngine(self.log)
+ event_engine.load_event_step_modules(event_steps_dir)
+ event_engine.start()
+ else:
+ self.log.info("Skipping event engine due to no event_steps dir.")
+
+ # start model policies thread
+ policies_dir = Config.get("model_policies_dir")
+ if policies_dir:
+ policy_engine = XOSPolicyEngine(policies_dir=policies_dir, log=self.log)
+ model_policy_thread = threading.Thread(
+ target=policy_engine.run, name="policy_engine"
+ )
+ model_policy_thread.is_policy_thread = True
+ model_policy_thread.start()
+ else:
+ self.log.info(
+ "Skipping model policies thread due to no model_policies dir."
+ )
+
+ if (not observer_thread) and (not model_policy_thread) and (not event_engine):
+ self.log.info(
+ "No sync steps, no policies, and no event steps. Synchronizer exiting."
+ )
+ # the caller will exit with status 0
+ return
+
+ while True:
+ try:
+ time.sleep(1000)
+ except KeyboardInterrupt:
+ print("exiting due to keyboard interrupt")
+ # TODO: See about setting the threads as daemons
+ if observer_thread:
+ observer_thread._Thread__stop()
+ if model_policy_thread:
+ model_policy_thread._Thread__stop()
+ sys.exit(1)
diff --git a/lib/xos-synchronizer/xossynchronizer/backend_modelpolicy.py b/lib/xos-synchronizer/xossynchronizer/backend_modelpolicy.py
new file mode 100644
index 0000000..a8e826b
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/backend_modelpolicy.py
@@ -0,0 +1,51 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import print_function
+import os
+import inspect
+import imp
+import sys
+import threading
+import time
+from syncstep import SyncStep
+from synchronizers.new_base.event_loop import XOSObserver
+
+from xosconfig import Config
+from multistructlog import create_logger
+
+log = create_logger(Config().get("logging"))
+
+
+class Backend:
+ def run(self):
+ # start model policies thread
+ policies_dir = Config("model_policies_dir")
+ if policies_dir:
+ from synchronizers.model_policy import run_policy
+
+ model_policy_thread = threading.Thread(target=run_policy)
+ model_policy_thread.start()
+ else:
+ model_policy_thread = None
+ log.info("Skipping model policies thread due to no model_policies dir.")
+
+ while True:
+ try:
+ time.sleep(1000)
+ except KeyboardInterrupt:
+ print("exiting due to keyboard interrupt")
+ if model_policy_thread:
+ model_policy_thread._Thread__stop()
+ sys.exit(1)
diff --git a/lib/xos-synchronizer/xossynchronizer/dependency_walker_new.py b/lib/xos-synchronizer/xossynchronizer/dependency_walker_new.py
new file mode 100644
index 0000000..138c26d
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/dependency_walker_new.py
@@ -0,0 +1,119 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+#!/usr/bin/env python
+
+# TODO: Moved this into the synchronizer, as it appeared to require model
+# access. Verify whether or not that's true and reconcile with
+# generate/dependency_walker.py
+
+from __future__ import print_function
+import os
+import imp
+import inspect
+import time
+import traceback
+import commands
+import threading
+from xosconfig import Config
+import json
+
+from xosconfig import Config
+from multistructlog import create_logger
+
+log = create_logger(Config().get("logging"))
+
+missing_links = {}
+
+if Config.get("dependency_graph"):
+ dep_data = open(Config.get("dependency_graph")).read()
+else:
+ dep_data = "{}"
+
+dependencies = json.loads(dep_data)
+dependencies = {k: [item[0] for item in items] for k, items in dependencies.items()}
+
+inv_dependencies = {}
+for k, lst in dependencies.items():
+ for v in lst:
+ try:
+ inv_dependencies[v].append(k)
+ except KeyError:
+ inv_dependencies[v] = [k]
+
+
+def plural(name):
+ if name.endswith("s"):
+ return name + "es"
+ else:
+ return name + "s"
+
+
+def walk_deps(fn, object):
+ model = object.__class__.__name__
+ try:
+ deps = dependencies[model]
+ except BaseException:
+ deps = []
+ return __walk_deps(fn, object, deps)
+
+
+def walk_inv_deps(fn, object):
+ model = object.__class__.__name__
+ try:
+ deps = inv_dependencies[model]
+ except BaseException:
+ deps = []
+ return __walk_deps(fn, object, deps)
+
+
+def __walk_deps(fn, object, deps):
+ model = object.__class__.__name__
+ ret = []
+ for dep in deps:
+ # print "Checking dep %s"%dep
+ peer = None
+ link = dep.lower()
+ try:
+ peer = getattr(object, link)
+ except AttributeError:
+ link = plural(link)
+ try:
+ peer = getattr(object, link)
+ except AttributeError:
+ if model + "." + link not in missing_links:
+ print("Model %s missing link for dependency %s" % (model, link))
+ log.exception(
+ "WARNING: Model missing link for dependency.",
+ model=model,
+ link=link,
+ )
+ missing_links[model + "." + link] = True
+
+ if peer:
+ try:
+ peer_objects = peer.all()
+ except AttributeError:
+ peer_objects = [peer]
+ except BaseException:
+ peer_objects = []
+
+ for o in peer_objects:
+ if hasattr(o, "updated"):
+ fn(o, object)
+ ret.append(o)
+ # Uncomment the following line to enable recursion
+ # walk_inv_deps(fn, o)
+ return ret
diff --git a/lib/xos-synchronizer/xossynchronizer/event_engine.py b/lib/xos-synchronizer/xossynchronizer/event_engine.py
new file mode 100644
index 0000000..e5e18d1
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/event_engine.py
@@ -0,0 +1,216 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import confluent_kafka
+import imp
+import inspect
+import os
+import threading
+import time
+from xosconfig import Config
+
+
+class XOSKafkaMessage:
+ def __init__(self, consumer_msg):
+
+ self.topic = consumer_msg.topic()
+ self.key = consumer_msg.key()
+ self.value = consumer_msg.value()
+
+ self.timestamp = None
+ (ts_type, ts_val) = consumer_msg.timestamp()
+
+ if ts_type is not confluent_kafka.TIMESTAMP_NOT_AVAILABLE:
+ self.timestamp = ts_val
+
+
+class XOSKafkaThread(threading.Thread):
+ """ XOSKafkaThread
+
+ A Thread for servicing Kafka events. There is one event_step associated with one XOSKafkaThread. A
+ Consumer is launched to listen on the topics specified by the thread. The thread's process_event()
+ function is called for each event.
+ """
+
+ def __init__(self, step, bootstrap_servers, log, *args, **kwargs):
+ super(XOSKafkaThread, self).__init__(*args, **kwargs)
+ self.consumer = None
+ self.step = step
+ self.bootstrap_servers = bootstrap_servers
+ self.log = log
+ self.daemon = True
+
+ def create_kafka_consumer(self):
+ # use the service name as the group id
+ consumer_config = {
+ "group.id": Config().get("name"),
+ "bootstrap.servers": ",".join(self.bootstrap_servers),
+ "default.topic.config": {"auto.offset.reset": "smallest"},
+ }
+
+ return confluent_kafka.Consumer(**consumer_config)
+
+ def run(self):
+ if (not self.step.topics) and (not self.step.pattern):
+ raise Exception(
+ "Neither topics nor pattern is defined for step %s" % self.step.__name__
+ )
+
+ if self.step.topics and self.step.pattern:
+ raise Exception(
+ "Both topics and pattern are defined for step %s. Choose one."
+ % self.step.__name__
+ )
+
+ self.log.info(
+ "Waiting for events",
+ topic=self.step.topics,
+ pattern=self.step.pattern,
+ step=self.step.__name__,
+ )
+
+ while True:
+ try:
+ # setup consumer or loop on failure
+ if self.consumer is None:
+ self.consumer = self.create_kafka_consumer()
+
+ if self.step.topics:
+ self.consumer.subscribe(self.step.topics)
+
+ elif self.step.pattern:
+ self.consumer.subscribe(self.step.pattern)
+
+ except confluent_kafka.KafkaError._ALL_BROKERS_DOWN as e:
+ self.log.warning(
+ "No brokers available on %s, %s" % (self.bootstrap_servers, e)
+ )
+ time.sleep(20)
+ continue
+
+ except confluent_kafka.KafkaError as e:
+ # Maybe Kafka has not started yet. Log the exception and try again in a second.
+ self.log.exception("Exception in kafka loop: %s" % e)
+ time.sleep(1)
+ continue
+
+ # wait until we get a message, if no message, loop again
+ msg = self.consumer.poll(timeout=1.0)
+
+ if msg is None:
+ continue
+
+ if msg.error():
+ if msg.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:
+ self.log.debug(
+ "Reached end of kafka topic %s, partition: %s, offset: %d"
+ % (msg.topic(), msg.partition(), msg.offset())
+ )
+ else:
+ self.log.exception("Error in kafka message: %s" % msg.error())
+
+ else:
+ # wrap parsing the event in a class
+ event_msg = XOSKafkaMessage(msg)
+
+ self.log.info(
+ "Processing event", event_msg=event_msg, step=self.step.__name__
+ )
+
+ try:
+ self.step(log=self.log).process_event(event_msg)
+
+ except BaseException:
+ self.log.exception(
+ "Exception in event step",
+ event_msg=event_msg,
+ step=self.step.__name__,
+ )
+
+
+class XOSEventEngine(object):
+ """ XOSEventEngine
+
+ Subscribe to and handle processing of events. Two methods are defined:
+
+ load_step_modules(dir) ... look for step modules in the given directory, and load objects that are
+ descendant from EventStep.
+
+ start() ... Launch threads to handle processing of the EventSteps. It's expected that load_step_modules()
+ will be called before start().
+ """
+
+ def __init__(self, log):
+ self.event_steps = []
+ self.threads = []
+ self.log = log
+
+ def load_event_step_modules(self, event_step_dir):
+ self.event_steps = []
+ self.log.info("Loading event steps", event_step_dir=event_step_dir)
+
+ # NOTE we'll load all the classes that inherit from EventStep
+ for fn in os.listdir(event_step_dir):
+ pathname = os.path.join(event_step_dir, fn)
+ if (
+ os.path.isfile(pathname)
+ and fn.endswith(".py")
+ and (fn != "__init__.py")
+ and ("test" not in fn)
+ ):
+ event_module = imp.load_source(fn[:-3], pathname)
+
+ for classname in dir(event_module):
+ c = getattr(event_module, classname, None)
+
+ if inspect.isclass(c):
+ base_names = [b.__name__ for b in c.__bases__]
+ if "EventStep" in base_names:
+ self.event_steps.append(c)
+ self.log.info("Loaded event steps", steps=self.event_steps)
+
+ def start(self):
+ eventbus_kind = Config.get("event_bus.kind")
+ eventbus_endpoint = Config.get("event_bus.endpoint")
+
+ if not eventbus_kind:
+ self.log.error(
+ "Eventbus kind is not configured in synchronizer config file."
+ )
+ return
+
+ if eventbus_kind not in ["kafka"]:
+ self.log.error(
+ "Eventbus kind is set to a technology we do not implement.",
+ eventbus_kind=eventbus_kind,
+ )
+ return
+
+ if not eventbus_endpoint:
+ self.log.error(
+ "Eventbus endpoint is not configured in synchronizer config file."
+ )
+ return
+
+ for step in self.event_steps:
+ if step.technology == "kafka":
+ thread = XOSKafkaThread(step, [eventbus_endpoint], self.log)
+ thread.start()
+ self.threads.append(thread)
+ else:
+ self.log.error(
+ "Unknown technology. Skipping step",
+ technology=step.technology,
+ step=step.__name__,
+ )
diff --git a/lib/xos-synchronizer/xossynchronizer/event_loop.py b/lib/xos-synchronizer/xossynchronizer/event_loop.py
new file mode 100644
index 0000000..96ce727
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/event_loop.py
@@ -0,0 +1,772 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# TODO:
+# Add unit tests:
+# - 2 sets of Instance, ControllerSlice, ControllerNetworks - delete and create case
+
+import time
+import threading
+import json
+
+from collections import defaultdict
+from networkx import (
+ DiGraph,
+ weakly_connected_component_subgraphs,
+ all_shortest_paths,
+ NetworkXNoPath,
+)
+from networkx.algorithms.dag import topological_sort
+
+from xossynchronizer.steps.syncstep import InnocuousException, DeferredException, SyncStep
+from xossynchronizer.modelaccessor import *
+
+from xosconfig import Config
+from multistructlog import create_logger
+
+log = create_logger(Config().get("logging"))
+
+
+class StepNotReady(Exception):
+ pass
+
+
+class ExternalDependencyFailed(Exception):
+ pass
+
+
+# FIXME: Move drivers into a context shared across sync steps.
+
+
+class NoOpDriver:
+ def __init__(self):
+ self.enabled = True
+ self.dependency_graph = None
+
+
+# Everyone gets NoOpDriver by default. To use a different driver, call
+# set_driver() below.
+DRIVER = NoOpDriver()
+
+DIRECT_EDGE = 1
+PROXY_EDGE = 2
+
+
+def set_driver(x):
+ global DRIVER
+ DRIVER = x
+
+
+class XOSObserver(object):
+ sync_steps = []
+
+ def __init__(self, sync_steps, log=log):
+ # The Condition object via which events are received
+ self.log = log
+
+ self.step_lookup = {}
+ self.sync_steps = sync_steps
+ self.load_sync_steps()
+
+ self.load_dependency_graph()
+
+ self.event_cond = threading.Condition()
+
+ self.driver = DRIVER
+ self.observer_name = Config.get("name")
+
+ def wait_for_event(self, timeout):
+ self.event_cond.acquire()
+ self.event_cond.wait(timeout)
+ self.event_cond.release()
+
+ def wake_up(self):
+ self.log.debug("Wake up routine called")
+ self.event_cond.acquire()
+ self.event_cond.notify()
+ self.event_cond.release()
+
+ def load_dependency_graph(self):
+
+ try:
+ if Config.get("dependency_graph"):
+ self.log.trace(
+ "Loading model dependency graph",
+ path=Config.get("dependency_graph"),
+ )
+ dep_graph_str = open(Config.get("dependency_graph")).read()
+ else:
+ self.log.trace("Using default model dependency graph", graph={})
+ dep_graph_str = "{}"
+
+ # joint_dependencies is of the form { Model1 -> [(Model2, src_port, dst_port), ...] }
+ # src_port is the field that accesses Model2 from Model1
+ # dst_port is the field that accesses Model1 from Model2
+ static_dependencies = json.loads(dep_graph_str)
+ dynamic_dependencies = self.compute_service_dependencies()
+
+ joint_dependencies = dict(
+ static_dependencies.items() + dynamic_dependencies
+ )
+
+ model_dependency_graph = DiGraph()
+ for src_model, deps in joint_dependencies.items():
+ for dep in deps:
+ dst_model, src_accessor, dst_accessor = dep
+ if src_model != dst_model:
+ edge_label = {
+ "src_accessor": src_accessor,
+ "dst_accessor": dst_accessor,
+ }
+ model_dependency_graph.add_edge(
+ src_model, dst_model, edge_label
+ )
+
+ model_dependency_graph_rev = model_dependency_graph.reverse(copy=True)
+ self.model_dependency_graph = {
+ # deletion
+ True: model_dependency_graph_rev,
+ False: model_dependency_graph,
+ }
+ self.log.trace("Loaded dependencies", edges=model_dependency_graph.edges())
+ except Exception as e:
+ self.log.exception("Error loading dependency graph", e=e)
+ raise e
+
+ def load_sync_steps(self):
+ model_to_step = defaultdict(list)
+ external_dependencies = []
+
+ for s in self.sync_steps:
+ if not isinstance(s.observes, list):
+ observes = [s.observes]
+ else:
+ observes = s.observes
+
+ for m in observes:
+ model_to_step[m.__name__].append(s.__name__)
+
+ try:
+ external_dependencies.extend(s.external_dependencies)
+ except AttributeError:
+ pass
+
+ self.step_lookup[s.__name__] = s
+
+ self.model_to_step = model_to_step
+ self.external_dependencies = list(set(external_dependencies))
+ self.log.info(
+ "Loaded external dependencies", external_dependencies=external_dependencies
+ )
+ self.log.info("Loaded model_map", **model_to_step)
+
+ def reset_model_accessor(self, o=None):
+ try:
+ model_accessor.reset_queries()
+ except BaseException:
+ # this shouldn't happen, but in case it does, catch it...
+ if o:
+ logdict = o.tologdict()
+ else:
+ logdict = {}
+
+ self.log.error("exception in reset_queries", **logdict)
+
+ def delete_record(self, o, dr_log=None):
+
+ if dr_log is None:
+ dr_log = self.log
+
+ if getattr(o, "backend_need_reap", False):
+ # the object has already been deleted and marked for reaping
+ model_accessor.journal_object(o, "syncstep.call.already_marked_reap")
+ else:
+ step = getattr(o, "synchronizer_step", None)
+ if not step:
+ raise ExternalDependencyFailed
+
+ model_accessor.journal_object(o, "syncstep.call.delete_record")
+
+ dr_log.debug("Deleting object", **o.tologdict())
+
+ step.log = dr_log.new(step=step)
+ step.delete_record(o)
+ step.log = dr_log
+
+ dr_log.debug("Deleted object", **o.tologdict())
+
+ model_accessor.journal_object(o, "syncstep.call.delete_set_reap")
+ o.backend_need_reap = True
+ o.save(update_fields=["backend_need_reap"])
+
+ def sync_record(self, o, sr_log=None):
+ try:
+ step = o.synchronizer_step
+ except AttributeError:
+ step = None
+
+ if step is None:
+ raise ExternalDependencyFailed
+
+ if sr_log is None:
+ sr_log = self.log
+
+ # Mark this as an object that will require delete. Do
+ # this now rather than after the syncstep,
+ if not (o.backend_need_delete):
+ o.backend_need_delete = True
+ o.save(update_fields=["backend_need_delete"])
+
+ model_accessor.journal_object(o, "syncstep.call.sync_record")
+
+ sr_log.debug("Syncing object", **o.tologdict())
+
+ step.log = sr_log.new(step=step)
+ step.sync_record(o)
+ step.log = sr_log
+
+ sr_log.debug("Synced object", **o.tologdict())
+
+ o.enacted = max(o.updated, o.changed_by_policy)
+ scratchpad = {"next_run": 0, "exponent": 0, "last_success": time.time()}
+ o.backend_register = json.dumps(scratchpad)
+ o.backend_status = "OK"
+ o.backend_code = 1
+ model_accessor.journal_object(o, "syncstep.call.save_update")
+ o.save(
+ update_fields=[
+ "enacted",
+ "backend_status",
+ "backend_register",
+ "backend_code",
+ ]
+ )
+
+ if hasattr(step, "after_sync_save"):
+ step.log = sr_log.new(step=step)
+ step.after_sync_save(o)
+ step.log = sr_log
+
+ sr_log.info("Saved sync object", o=o)
+
+ """ This function needs a cleanup. FIXME: Rethink backend_status, backend_register """
+
+ def handle_sync_exception(self, o, e):
+ self.log.exception("sync step failed!", e=e, **o.tologdict())
+ current_code = o.backend_code
+
+ if hasattr(e, "message"):
+ status = str(e.message)
+ else:
+ status = str(e)
+
+ if isinstance(e, InnocuousException):
+ code = 1
+ elif isinstance(e, DeferredException):
+ # NOTE if the synchronization is Deferred it means that synchronization is still in progress
+ code = 0
+ else:
+ code = 2
+
+ self.set_object_error(o, status, code)
+
+ dependency_error = "Failed due to error in model %s id %d: %s" % (
+ o.leaf_model_name,
+ o.id,
+ status,
+ )
+ return dependency_error, code
+
+ def set_object_error(self, o, status, code):
+ if o.backend_status:
+ error_list = o.backend_status.split(" // ")
+ else:
+ error_list = []
+
+ if status not in error_list:
+ error_list.append(status)
+
+ # Keep last two errors
+ error_list = error_list[-2:]
+
+ o.backend_code = code
+ o.backend_status = " // ".join(error_list)
+
+ try:
+ scratchpad = json.loads(o.backend_register)
+ scratchpad["exponent"]
+ except BaseException:
+ scratchpad = {
+ "next_run": 0,
+ "exponent": 0,
+ "last_success": time.time(),
+ "failures": 0,
+ }
+
+ # Second failure
+ if scratchpad["exponent"]:
+ if code == 1:
+ delay = scratchpad["exponent"] * 60 # 1 minute
+ else:
+ delay = scratchpad["exponent"] * 600 # 10 minutes
+
+ # cap delays at 8 hours
+ if delay > 8 * 60 * 60:
+ delay = 8 * 60 * 60
+ scratchpad["next_run"] = time.time() + delay
+
+ scratchpad["exponent"] += 1
+
+ try:
+ scratchpad["failures"] += 1
+ except KeyError:
+ scratchpad["failures"] = 1
+
+ scratchpad["last_failure"] = time.time()
+
+ o.backend_register = json.dumps(scratchpad)
+
+ # TOFIX:
+ # DatabaseError: value too long for type character varying(140)
+ if model_accessor.obj_exists(o):
+ try:
+ o.backend_status = o.backend_status[:1024]
+ o.save(
+ update_fields=["backend_status", "backend_register"],
+ always_update_timestamp=True,
+ )
+ except BaseException as e:
+ self.log.exception("Could not update backend status field!", e=e)
+ pass
+
+ def sync_cohort(self, cohort, deletion):
+ threading.current_thread().is_sync_thread = True
+
+ sc_log = self.log.new(thread_id=threading.current_thread().ident)
+
+ try:
+ start_time = time.time()
+ sc_log.debug("Starting to work on cohort", cohort=cohort, deletion=deletion)
+
+ cohort_emptied = False
+ dependency_error = None
+ dependency_error_code = None
+
+ itty = iter(cohort)
+
+ while not cohort_emptied:
+ try:
+ self.reset_model_accessor()
+ o = next(itty)
+
+ if dependency_error:
+ self.set_object_error(
+ o, dependency_error, dependency_error_code
+ )
+ continue
+
+ try:
+ if deletion:
+ self.delete_record(o, sc_log)
+ else:
+ self.sync_record(o, sc_log)
+ except ExternalDependencyFailed:
+ dependency_error = (
+ "External dependency on object %s id %d not met"
+ % (o.leaf_model_name, o.id)
+ )
+ dependency_error_code = 1
+ except (DeferredException, InnocuousException, Exception) as e:
+ dependency_error, dependency_error_code = self.handle_sync_exception(
+ o, e
+ )
+
+ except StopIteration:
+ sc_log.debug("Cohort completed", cohort=cohort, deletion=deletion)
+ cohort_emptied = True
+ finally:
+ self.reset_model_accessor()
+ model_accessor.connection_close()
+
+ def tenant_class_name_from_service(self, service_name):
+ """ This code supports legacy functionality. To be cleaned up. """
+ name1 = service_name + "Instance"
+ if hasattr(Slice().stub, name1):
+ return name1
+ else:
+ name2 = service_name.replace("Service", "Tenant")
+ if hasattr(Slice().stub, name2):
+ return name2
+ else:
+ return None
+
+ def compute_service_dependencies(self):
+ """ FIXME: Implement more cleanly via xproto """
+
+ model_names = self.model_to_step.keys()
+ ugly_tuples = [
+ (m, m.replace("Instance", "").replace("Tenant", "Service"))
+ for m in model_names
+ if m.endswith("ServiceInstance") or m.endswith("Tenant")
+ ]
+ ugly_rtuples = [(v, k) for k, v in ugly_tuples]
+
+ ugly_map = dict(ugly_tuples)
+ ugly_rmap = dict(ugly_rtuples)
+
+ s_model_names = [v for k, v in ugly_tuples]
+ s_models0 = [
+ getattr(Slice().stub, model_name, None) for model_name in s_model_names
+ ]
+ s_models1 = [model.objects.first() for model in s_models0]
+ s_models = [m for m in s_models1 if m is not None]
+
+ dependencies = []
+ for model in s_models:
+ deps = ServiceDependency.objects.filter(subscriber_service_id=model.id)
+ if deps:
+ services = [
+ self.tenant_class_name_from_service(
+ d.provider_service.leaf_model_name
+ )
+ for d in deps
+ ]
+ dependencies.append(
+ (ugly_rmap[model.leaf_model_name], [(s, "", "") for s in services])
+ )
+
+ return dependencies
+
+ def compute_service_instance_dependencies(self, objects):
+ link_set = [
+ ServiceInstanceLink.objects.filter(subscriber_service_instance_id=o.id)
+ for o in objects
+ ]
+
+ dependencies = [
+ (l.provider_service_instance, l.subscriber_service_instance)
+ for links in link_set
+ for l in links
+ ]
+ providers = []
+
+ for p, s in dependencies:
+ if not p.enacted or p.enacted < p.updated:
+ p.dependent = s
+ providers.append(p)
+
+ return providers
+
+ def run(self):
+ # Cleanup: Move self.driver into a synchronizer context
+ # made available to every sync step.
+ if not self.driver.enabled:
+ self.log.warning("Driver is not enabled. Not running sync steps.")
+ return
+
+ while True:
+ self.log.trace("Waiting for event or timeout")
+ self.wait_for_event(timeout=5)
+ self.log.trace("Synchronizer awake")
+
+ self.run_once()
+
+ def fetch_pending(self, deletion=False):
+ unique_model_list = list(set(self.model_to_step.keys()))
+ pending_objects = []
+ pending_steps = []
+ step_list = self.step_lookup.values()
+
+ for e in self.external_dependencies:
+ s = SyncStep
+ s.observes = e
+ step_list.append(s)
+
+ for step_class in step_list:
+ step = step_class(driver=self.driver)
+ step.log = self.log.new(step=step)
+
+ if not hasattr(step, "call"):
+ pending = step.fetch_pending(deletion)
+ for obj in pending:
+ step = step_class(driver=self.driver)
+ step.log = self.log.new(step=step)
+ obj.synchronizer_step = step
+
+ pending_service_dependencies = self.compute_service_instance_dependencies(
+ pending
+ )
+
+ for obj in pending_service_dependencies:
+ obj.synchronizer_step = None
+
+ pending_objects.extend(pending)
+ pending_objects.extend(pending_service_dependencies)
+ else:
+ # Support old and broken legacy synchronizers
+ # This needs to be dropped soon.
+ pending_steps.append(step)
+
+ self.log.trace(
+ "Fetched pending data",
+ pending_objects=pending_objects,
+ legacy_steps=pending_steps,
+ )
+ return pending_objects, pending_steps
+
+ def linked_objects(self, o):
+ if o is None:
+ return [], None
+ try:
+ o_lst = [o for o in o.all()]
+ edge_type = PROXY_EDGE
+ except (AttributeError, TypeError):
+ o_lst = [o]
+ edge_type = DIRECT_EDGE
+ return o_lst, edge_type
+
+ """ Automatically test if a real dependency path exists between two objects. e.g.
+ given an Instance, and a ControllerSite, the test amounts to:
+ instance.slice.site == controller.site
+
+ Then the two objects are related, and should be put in the same cohort.
+ If the models of the two objects are not dependent, then the check trivially
+ returns False.
+ """
+
+ def same_object(self, o1, o2):
+ if not o1 or not o2:
+ return False, None
+
+ o1_lst, edge_type = self.linked_objects(o1)
+
+ try:
+ found = next(
+ obj
+ for obj in o1_lst
+ if obj.leaf_model_name == o2.leaf_model_name and obj.pk == o2.pk
+ )
+ except AttributeError as e:
+ self.log.exception("Compared objects could not be identified", e=e)
+ raise e
+ except StopIteration:
+ # This is a temporary workaround to establish dependencies between
+ # deleted proxy objects. A better solution would be for the ORM to
+ # return the set of deleted objects via foreign keys. At that point,
+ # the following line would change back to found = False
+ # - Sapan
+
+ found = getattr(o2, "deleted", False)
+
+ return found, edge_type
+
+ def concrete_path_exists(self, o1, o2):
+ try:
+ m1 = o1.leaf_model_name
+ m2 = o2.leaf_model_name
+ except AttributeError:
+ # One of the nodes is not in the dependency graph
+ # No dependency
+ return False, None
+
+ if m1.endswith("ServiceInstance") and m2.endswith("ServiceInstance"):
+ return getattr(o2, "dependent", None) == o1, DIRECT_EDGE
+
+ # FIXME: Dynamic dependency check
+ G = self.model_dependency_graph[False]
+ paths = all_shortest_paths(G, m1, m2)
+
+ try:
+ any(paths)
+ paths = all_shortest_paths(G, m1, m2)
+ except NetworkXNoPath:
+ # Easy. The two models are unrelated.
+ return False, None
+
+ for p in paths:
+ src_object = o1
+ edge_type = DIRECT_EDGE
+
+ for i in range(len(p) - 1):
+ src = p[i]
+ dst = p[i + 1]
+ edge_label = G[src][dst]
+ sa = edge_label["src_accessor"]
+ try:
+ dst_accessor = getattr(src_object, sa)
+ dst_objects, link_edge_type = self.linked_objects(dst_accessor)
+ if link_edge_type == PROXY_EDGE:
+ edge_type = link_edge_type
+
+ """
+
+ True If no linked objects and deletion
+ False If no linked objects
+ True If multiple linked objects
+ <continue traversal> If single linked object
+
+ """
+
+ if dst_objects == []:
+ # Workaround for ORM not returning linked deleted
+ # objects
+ if o2.deleted:
+ return True, edge_type
+ else:
+ dst_object = None
+ elif len(dst_objects) > 1:
+ # Multiple linked objects. Assume anything could be among those multiple objects.
+ raise AttributeError
+ else:
+ dst_object = dst_objects[0]
+ except AttributeError as e:
+ if sa != "fake_accessor":
+ self.log.debug(
+ "Could not check object dependencies, making conservative choice %s",
+ e,
+ src_object=src_object,
+ sa=sa,
+ o1=o1,
+ o2=o2,
+ )
+ return True, edge_type
+
+ src_object = dst_object
+
+ if not src_object:
+ break
+
+ verdict, edge_type = self.same_object(src_object, o2)
+ if verdict:
+ return verdict, edge_type
+
+ # Otherwise try other paths
+
+ return False, None
+
+ """
+
+ This function implements the main scheduling logic
+ of the Synchronizer. It divides incoming work (dirty objects)
+ into cohorts of dependent objects, and runs each such cohort
+ in its own thread.
+
+ Future work:
+
+ * Run event thread in parallel to the scheduling thread, and
+ add incoming objects to existing cohorts. Doing so should
+ greatly improve synchronizer performance.
+ * A single object might need to be added to multiple cohorts.
+ In this case, the last cohort handles such an object.
+ * This algorithm is horizontal-scale-ready. Multiple synchronizers
+ could run off a shared runqueue of cohorts.
+
+ """
+
+ def compute_dependent_cohorts(self, objects, deletion):
+ model_map = defaultdict(list)
+ n = len(objects)
+ r = range(n)
+ indexed_objects = zip(r, objects)
+
+ oG = DiGraph()
+
+ for i in r:
+ oG.add_node(i)
+
+ try:
+ for i0 in range(n):
+ for i1 in range(n):
+ if i0 != i1:
+ if deletion:
+ path_args = (objects[i1], objects[i0])
+ else:
+ path_args = (objects[i0], objects[i1])
+
+ is_connected, edge_type = self.concrete_path_exists(*path_args)
+ if is_connected:
+ try:
+ edge_type = oG[i1][i0]["type"]
+ if edge_type == PROXY_EDGE:
+ oG.remove_edge(i1, i0)
+ oG.add_edge(i0, i1, {"type": edge_type})
+ except KeyError:
+ oG.add_edge(i0, i1, {"type": edge_type})
+ except KeyError:
+ pass
+
+ components = weakly_connected_component_subgraphs(oG)
+ cohort_indexes = [reversed(topological_sort(g)) for g in components]
+ cohorts = [
+ [objects[i] for i in cohort_index] for cohort_index in cohort_indexes
+ ]
+
+ return cohorts
+
+ def run_once(self):
+ self.load_dependency_graph()
+
+ try:
+ # Why are we checking the DB connection here?
+ model_accessor.check_db_connection_okay()
+
+ loop_start = time.time()
+
+ # Two passes. One for sync, the other for deletion.
+ for deletion in (False, True):
+ objects_to_process = []
+
+ objects_to_process, steps_to_process = self.fetch_pending(deletion)
+ dependent_cohorts = self.compute_dependent_cohorts(
+ objects_to_process, deletion
+ )
+
+ threads = []
+ self.log.trace("In run once inner loop", deletion=deletion)
+
+ for cohort in dependent_cohorts:
+ thread = threading.Thread(
+ target=self.sync_cohort,
+ name="synchronizer",
+ args=(cohort, deletion),
+ )
+
+ threads.append(thread)
+
+ # Start threads
+ for t in threads:
+ t.start()
+
+ self.reset_model_accessor()
+
+ # Wait for all threads to finish before continuing with the run
+ # loop
+ for t in threads:
+ t.join()
+
+ # Run legacy synchronizers, which do everything in call()
+ for step in steps_to_process:
+ try:
+ step.call(deletion=deletion)
+ except Exception as e:
+ self.log.exception("Legacy step failed", step=step, e=e)
+
+ loop_end = time.time()
+
+ except Exception as e:
+ self.log.exception(
+ "Core error. This seems like a misconfiguration or bug. This error will not be relayed to the user!",
+ e=e,
+ )
+ self.log.error("Exception in observer run loop")
diff --git a/lib/xos-synchronizer/xossynchronizer/event_steps/__init__.py b/lib/xos-synchronizer/xossynchronizer/event_steps/__init__.py
new file mode 100644
index 0000000..b0fb0b2
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/event_steps/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/lib/xos-synchronizer/xossynchronizer/event_steps/eventstep.py b/lib/xos-synchronizer/xossynchronizer/event_steps/eventstep.py
new file mode 100644
index 0000000..9596248
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/event_steps/eventstep.py
@@ -0,0 +1,43 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class EventStep(object):
+ """
+ All the event steps defined in each synchronizer needs to inherit from this class in order to be loaded
+
+ Each step should define a technology, and either a `topics` or a `pattern`. The meaning of `topics` and `pattern`
+ depend on the technology that is chosen.
+ """
+
+ technology = "kafka"
+ topics = []
+ pattern = None
+
+ def __init__(self, log, **kwargs):
+ """
+ Initialize a pull step. Override this function to include any initialization. Make sure to call the original
+ __init__() from your method.
+ """
+
+ # self.log can be used to emit logging messages.
+ self.log = log
+
+ def process_event(self, event):
+ # This method must be overridden in your class. Do not call the original method.
+
+ self.log.warning(
+ "There is no default process_event, please provide a process_event method",
+ msg=event,
+ )
diff --git a/lib/xos-synchronizer/xossynchronizer/exceptions.py b/lib/xos-synchronizer/xossynchronizer/exceptions.py
new file mode 100644
index 0000000..3589777
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/exceptions.py
@@ -0,0 +1,25 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class SynchronizerException(Exception):
+ pass
+
+
+class SynchronizerProgrammingError(SynchronizerException):
+ pass
+
+
+class SynchronizerConfigurationError(SynchronizerException):
+ pass
diff --git a/lib/xos-synchronizer/xossynchronizer/loadmodels.py b/lib/xos-synchronizer/xossynchronizer/loadmodels.py
new file mode 100644
index 0000000..7e82ac9
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/loadmodels.py
@@ -0,0 +1,60 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+from xosconfig import Config
+from multistructlog import create_logger
+
+log = create_logger(Config().get("logging"))
+
+
+class ModelLoadClient(object):
+ def __init__(self, api):
+ self.api = api
+
+ def upload_models(self, name, dir, version="unknown"):
+ request = self.api.dynamicload_pb2.LoadModelsRequest(name=name, version=version)
+
+ for fn in os.listdir(dir):
+ if fn.endswith(".xproto"):
+ item = request.xprotos.add()
+ item.filename = fn
+ item.contents = open(os.path.join(dir, fn)).read()
+
+ models_fn = os.path.join(dir, "models.py")
+ if os.path.exists(models_fn):
+ item = request.decls.add()
+ item.filename = "models.py"
+ item.contents = open(models_fn).read()
+
+ attic_dir = os.path.join(dir, "attic")
+ if os.path.exists(attic_dir):
+ log.warn(
+ "Attics are deprecated, please use the legacy=True option in xProto"
+ )
+ for fn in os.listdir(attic_dir):
+ if fn.endswith(".py"):
+ item = request.attics.add()
+ item.filename = fn
+ item.contents = open(os.path.join(attic_dir, fn)).read()
+
+ api_convenience_dir = os.path.join(dir, "convenience")
+ if os.path.exists(api_convenience_dir):
+ for fn in os.listdir(api_convenience_dir):
+ if fn.endswith(".py") and "test" not in fn:
+ item = request.convenience_methods.add()
+ item.filename = fn
+ item.contents = open(os.path.join(api_convenience_dir, fn)).read()
+
+ result = self.api.dynamicload.LoadModels(request)
diff --git a/lib/xos-synchronizer/xossynchronizer/mock_modelaccessor_build.py b/lib/xos-synchronizer/xossynchronizer/mock_modelaccessor_build.py
new file mode 100644
index 0000000..5c389cf
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/mock_modelaccessor_build.py
@@ -0,0 +1,71 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import cPickle
+import subprocess
+
+"""
+ Support for autogenerating mock_modelaccessor.
+
+ Each unit test might have its own requirements for the set of xprotos that make
+ up its model testing framework. These should always include the core, and
+ optionally include one or more services.
+"""
+
+
+def build_mock_modelaccessor(
+ dest_dir, xos_dir, services_dir, service_xprotos, target="mock_classes.xtarget"
+):
+ dest_fn = os.path.join(dest_dir, "mock_modelaccessor.py")
+
+ args = ["xosgenx", "--target", target]
+ args.append(os.path.join(xos_dir, "core/models/core.xproto"))
+ for xproto in service_xprotos:
+ args.append(os.path.join(services_dir, xproto))
+
+ # Check to see if we've already run xosgenx. If so, don't run it again.
+ context_fn = dest_fn + ".context"
+ this_context = (xos_dir, services_dir, service_xprotos, target)
+ need_xosgenx = True
+ if os.path.exists(context_fn):
+ try:
+ context = cPickle.loads(open(context_fn).read())
+ if context == this_context:
+ return
+ except (cPickle.UnpicklingError, EOFError):
+ # Something went wrong with the file read or depickling
+ pass
+
+ if os.path.exists(context_fn):
+ os.remove(context_fn)
+
+ if os.path.exists(dest_fn):
+ os.remove(dest_fn)
+
+ p = subprocess.Popen(
+ " ".join(args) + " > " + dest_fn,
+ shell=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ )
+ (stdoutdata, stderrdata) = p.communicate()
+ if (p.returncode != 0) or (not os.path.exists(dest_fn)):
+ raise Exception(
+ "Failed to create mock model accessor, returncode=%d, stdout=%s"
+ % (p.returncode, stdoutdata)
+ )
+
+ # Save the context of this invocation of xosgenx
+ open(context_fn, "w").write(cPickle.dumps(this_context))
diff --git a/lib/xos-synchronizer/xossynchronizer/model_policies/__init__.py b/lib/xos-synchronizer/xossynchronizer/model_policies/__init__.py
new file mode 100644
index 0000000..b0fb0b2
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/model_policies/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/lib/xos-synchronizer/xossynchronizer/model_policies/model_policy_tenantwithcontainer.py b/lib/xos-synchronizer/xossynchronizer/model_policies/model_policy_tenantwithcontainer.py
new file mode 100644
index 0000000..66ac348
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/model_policies/model_policy_tenantwithcontainer.py
@@ -0,0 +1,320 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from xossynchronizer.modelaccessor import *
+from xossynchronizer.model_policies.policy import Policy
+from xossynchronizer.exceptions import *
+
+
+class Scheduler(object):
+ # XOS Scheduler Abstract Base Class
+ # Used to implement schedulers that pick which node to put instances on
+
+ def __init__(self, slice, label=None, constrain_by_service_instance=False):
+ self.slice = slice
+ self.label = label # Only pick nodes with this label
+ # Apply service-instance-based constraints
+ self.constrain_by_service_instance = constrain_by_service_instance
+
+ def pick(self):
+ # this method should return a tuple (node, parent)
+ # node is the node to instantiate on
+ # parent is for container_vm instances only, and is the VM that will
+ # hold the container
+
+ raise Exception("Abstract Base")
+
+
+class LeastLoadedNodeScheduler(Scheduler):
+ # This scheduler always return the node with the fewest number of
+ # instances.
+
+ def pick(self):
+ set_label = False
+
+ nodes = []
+ if self.label:
+ nodes = Node.objects.filter(nodelabels__name=self.label)
+ if not nodes:
+ set_label = self.constrain_by_service_instance
+
+ if not nodes:
+ if self.slice.default_node:
+ # if slice.default_node is set, then filter by default_node
+ nodes = Node.objects.filter(name=self.slice.default_node)
+ else:
+ nodes = Node.objects.all()
+
+ # convert to list
+ nodes = list(nodes)
+
+ # sort so that we pick the least-loaded node
+ nodes = sorted(nodes, key=lambda node: node.instances.count())
+
+ if not nodes:
+ raise Exception("LeastLoadedNodeScheduler: No suitable nodes to pick from")
+
+ picked_node = nodes[0]
+
+ if set_label:
+ nl = NodeLabel(name=self.label)
+ nl.node.add(picked_node)
+ nl.save()
+
+ # TODO: logic to filter nodes by which nodes are up, and which
+ # nodes the slice can instantiate on.
+ return [picked_node, None]
+
+
+class TenantWithContainerPolicy(Policy):
+ # This policy is abstract. Inherit this class into your own policy and override model_name
+ model_name = None
+
+ def handle_create(self, tenant):
+ return self.handle_update(tenant)
+
+ def handle_update(self, service_instance):
+ if (service_instance.link_deleted_count > 0) and (
+ not service_instance.provided_links.exists()
+ ):
+ model = globals()[self.model_name]
+ self.log.info(
+ "The last provided link has been deleted -- self-destructing."
+ )
+ self.handle_delete(service_instance)
+ if model.objects.filter(id=service_instance.id).exists():
+ service_instance.delete()
+ else:
+ self.log.info("Tenant %s is already deleted" % service_instance)
+ return
+ self.manage_container(service_instance)
+
+ # def handle_delete(self, tenant):
+ # if tenant.vcpe:
+ # tenant.vcpe.delete()
+
+ def save_instance(self, instance):
+ # Override this function to do custom pre-save or post-save processing,
+ # such as creating ports for containers.
+ instance.save()
+
+ def ip_to_mac(self, ip):
+ (a, b, c, d) = ip.split(".")
+ return "02:42:%02x:%02x:%02x:%02x" % (int(a), int(b), int(c), int(d))
+
+ def allocate_public_service_instance(self, **kwargs):
+ """ Get a ServiceInstance that provides public connectivity. Currently this means to use AddressPool and
+ the AddressManager Service.
+
+ Expect this to be refactored when we break hard-coded service dependencies.
+ """
+ address_pool_name = kwargs.pop("address_pool_name")
+
+ am_service = AddressManagerService.objects.all() # TODO: Hardcoded dependency
+ if not am_service:
+ raise Exception("no addressing services")
+ am_service = am_service[0]
+
+ ap = AddressPool.objects.filter(
+ name=address_pool_name, service_id=am_service.id
+ )
+ if not ap:
+ raise Exception("Addressing service unable to find addresspool %s" % name)
+ ap = ap[0]
+
+ ip = ap.get_address()
+ if not ip:
+ raise Exception("AddressPool '%s' has run out of addresses." % ap.name)
+
+ ap.save() # save the AddressPool to account for address being removed from it
+
+ subscriber_service = None
+ if "subscriber_service" in kwargs:
+ subscriber_service = kwargs.pop("subscriber_service")
+
+ subscriber_service_instance = None
+ if "subscriber_tenant" in kwargs:
+ subscriber_service_instance = kwargs.pop("subscriber_tenant")
+ elif "subscriber_service_instance" in kwargs:
+ subscriber_service_instance = kwargs.pop("subscriber_service_instance")
+
+ # TODO: potential partial failure -- AddressPool address is allocated and saved before addressing tenant
+
+ t = None
+ try:
+ t = AddressManagerServiceInstance(
+ owner=am_service, **kwargs
+ ) # TODO: Hardcoded dependency
+ t.public_ip = ip
+ t.public_mac = self.ip_to_mac(ip)
+ t.address_pool_id = ap.id
+ t.save()
+
+ if subscriber_service:
+ link = ServiceInstanceLink(
+ subscriber_service=subscriber_service, provider_service_instance=t
+ )
+ link.save()
+
+ if subscriber_service_instance:
+ link = ServiceInstanceLink(
+ subscriber_service_instance=subscriber_service_instance,
+ provider_service_instance=t,
+ )
+ link.save()
+ except BaseException:
+ # cleanup if anything went wrong
+ ap.put_address(ip)
+ ap.save() # save the AddressPool to account for address being added to it
+ if t and t.id:
+ t.delete()
+ raise
+
+ return t
+
+ def get_image(self, tenant):
+ slice = tenant.owner.slices.all()
+ if not slice:
+ raise SynchronizerProgrammingError("provider service has no slice")
+ slice = slice[0]
+
+ # If slice has default_image set then use it
+ if slice.default_image:
+ return slice.default_image
+
+ raise SynchronizerProgrammingError(
+ "Please set a default image for %s" % self.slice.name
+ )
+
+ """ get_legacy_tenant_attribute
+ pick_least_loaded_instance_in_slice
+ count_of_tenants_of_an_instance
+
+ These three methods seem to be used by A-CORD. Look for ways to consolidate with existing methods and eliminate
+ these legacy ones
+ """
+
+ def get_legacy_tenant_attribute(self, tenant, name, default=None):
+ if tenant.service_specific_attribute:
+ attributes = json.loads(tenant.service_specific_attribute)
+ else:
+ attributes = {}
+ return attributes.get(name, default)
+
+ def pick_least_loaded_instance_in_slice(self, tenant, slices, image):
+ for slice in slices:
+ if slice.instances.all().count() > 0:
+ for instance in slice.instances.all():
+ if instance.image != image:
+ continue
+ # Pick the first instance that has lesser than 5 tenants
+ if self.count_of_tenants_of_an_instance(tenant, instance) < 5:
+ return instance
+ return None
+
+ # TODO: Ideally the tenant count for an instance should be maintained using a
+ # many-to-one relationship attribute, however this model being proxy, it does
+ # not permit any new attributes to be defined. Find if any better solutions
+ def count_of_tenants_of_an_instance(self, tenant, instance):
+ tenant_count = 0
+ for tenant in self.__class__.objects.all():
+ if (
+ self.get_legacy_tenant_attribute(tenant, "instance_id", None)
+ == instance.id
+ ):
+ tenant_count += 1
+ return tenant_count
+
+ def manage_container(self, tenant):
+ if tenant.deleted:
+ return
+
+ desired_image = self.get_image(tenant)
+
+ if (tenant.instance is not None) and (
+ tenant.instance.image.id != desired_image.id
+ ):
+ tenant.instance.delete()
+ tenant.instance = None
+
+ if tenant.instance is None:
+ if not tenant.owner.slices.count():
+ raise SynchronizerConfigurationError("The service has no slices")
+
+ new_instance_created = False
+ instance = None
+ if self.get_legacy_tenant_attribute(
+ tenant, "use_same_instance_for_multiple_tenants", default=False
+ ):
+ # Find if any existing instances can be used for this tenant
+ slices = tenant.owner.slices.all()
+ instance = self.pick_least_loaded_instance_in_slice(
+ tenant, slices, desired_image
+ )
+
+ if not instance:
+ slice = tenant.owner.slices.first()
+
+ flavor = slice.default_flavor
+ if not flavor:
+ flavors = Flavor.objects.filter(name="m1.small")
+ if not flavors:
+ raise SynchronizerConfigurationError("No m1.small flavor")
+ flavor = flavors[0]
+
+ if slice.default_isolation == "container_vm":
+ raise Exception("Not implemented")
+ else:
+ scheduler = getattr(self, "scheduler", LeastLoadedNodeScheduler)
+ constrain_by_service_instance = getattr(
+ self, "constrain_by_service_instance", False
+ )
+ tenant_node_label = getattr(tenant, "node_label", None)
+ (node, parent) = scheduler(
+ slice,
+ label=tenant_node_label,
+ constrain_by_service_instance=constrain_by_service_instance,
+ ).pick()
+
+ assert slice is not None
+ assert node is not None
+ assert desired_image is not None
+ assert tenant.creator is not None
+ assert node.site_deployment.deployment is not None
+ assert flavor is not None
+
+ try:
+ instance = Instance(
+ slice=slice,
+ node=node,
+ image=desired_image,
+ creator=tenant.creator,
+ deployment=node.site_deployment.deployment,
+ flavor=flavor,
+ isolation=slice.default_isolation,
+ parent=parent,
+ )
+ self.save_instance(instance)
+ new_instance_created = True
+
+ tenant.instance = instance
+ tenant.save()
+ except BaseException:
+ # NOTE: We don't have transactional support, so if the synchronizer crashes and exits after
+ # creating the instance, but before adding it to the tenant, then we will leave an
+ # orphaned instance.
+ if new_instance_created:
+ instance.delete()
+ raise
diff --git a/lib/xos-synchronizer/xossynchronizer/model_policies/policy.py b/lib/xos-synchronizer/xossynchronizer/model_policies/policy.py
new file mode 100644
index 0000000..b455c79
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/model_policies/policy.py
@@ -0,0 +1,40 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+""" policy.py
+
+ Base Classes for Model Policies
+"""
+
+from xosconfig import Config
+from multistructlog import create_logger
+
+log = create_logger(Config().get("logging"))
+
+
+class Policy(object):
+ """ An XOS Model Policy
+
+ Set the class member model_name to the name of the model that this policy will act on.
+
+ The following functions will be invoked if they are defined:
+
+ handle_create ... called when a model is created
+ handle_update ... called when a model is updated
+ handle_delete ... called when a model is deleted
+ """
+
+ def __init__(self):
+ self.logger = log
diff --git a/lib/xos-synchronizer/xossynchronizer/model_policies/test_config.yaml b/lib/xos-synchronizer/xossynchronizer/model_policies/test_config.yaml
new file mode 100644
index 0000000..bffe809
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/model_policies/test_config.yaml
@@ -0,0 +1,30 @@
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+name: test-model-policies
+accessor:
+ username: xosadmin@opencord.org
+ password: "sample"
+ kind: testframework
+logging:
+ version: 1
+ handlers:
+ console:
+ class: logging.StreamHandler
+ loggers:
+ 'multistructlog':
+ handlers:
+ - console
diff --git a/lib/xos-synchronizer/xossynchronizer/model_policy_loop.py b/lib/xos-synchronizer/xossynchronizer/model_policy_loop.py
new file mode 100644
index 0000000..c23e47c
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/model_policy_loop.py
@@ -0,0 +1,223 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from __future__ import print_function
+from xossynchronizer.modelaccessor import *
+from xossynchronizer.dependency_walker_new import *
+from xossynchronizer.policy import Policy
+
+import imp
+import pdb
+import time
+import traceback
+
+
+class XOSPolicyEngine(object):
+ def __init__(self, policies_dir, log):
+ self.model_policies = self.load_model_policies(policies_dir)
+ self.policies_by_name = {}
+ self.policies_by_class = {}
+ self.log = log
+
+ for policy in self.model_policies:
+ if policy.model_name not in self.policies_by_name:
+ self.policies_by_name[policy.model_name] = []
+ self.policies_by_name[policy.model_name].append(policy)
+
+ if policy.model not in self.policies_by_class:
+ self.policies_by_class[policy.model] = []
+ self.policies_by_class[policy.model].append(policy)
+
+ def update_wp(self, d, o):
+ try:
+ save_fields = []
+ if d.write_protect != o.write_protect:
+ d.write_protect = o.write_protect
+ save_fields.append("write_protect")
+ if save_fields:
+ d.save(update_fields=save_fields)
+ except AttributeError as e:
+ raise e
+
+ def update_dep(self, d, o):
+ try:
+ print("Trying to update %s" % d)
+ save_fields = []
+ if d.updated < o.updated:
+ save_fields = ["updated"]
+
+ if save_fields:
+ d.save(update_fields=save_fields)
+ except AttributeError as e:
+ log.exception("AttributeError in update_dep", e=e)
+ raise e
+ except Exception as e:
+ log.exception("Exception in update_dep", e=e)
+
+ def delete_if_inactive(self, d, o):
+ try:
+ d.delete()
+ print("Deleted %s (%s)" % (d, d.__class__.__name__))
+ except BaseException:
+ pass
+ return
+
+ def load_model_policies(self, policies_dir):
+ policies = []
+ for fn in os.listdir(policies_dir):
+ if fn.startswith("test"):
+ # don't try to import unit tests!
+ continue
+ pathname = os.path.join(policies_dir, fn)
+ if (
+ os.path.isfile(pathname)
+ and fn.endswith(".py")
+ and (fn != "__init__.py")
+ ):
+ module = imp.load_source(fn[:-3], pathname)
+ for classname in dir(module):
+ c = getattr(module, classname, None)
+
+ # make sure 'c' is a descendent of Policy and has a
+ # provides field (this eliminates the abstract base classes
+ # since they don't have a provides)
+
+ if (
+ inspect.isclass(c)
+ and issubclass(c, Policy)
+ and hasattr(c, "model_name")
+ and (c not in policies)
+ ):
+ if not c.model_name:
+ log.info(
+ "load_model_policies: skipping model policy",
+ classname=classname,
+ )
+ continue
+ if not model_accessor.has_model_class(c.model_name):
+ log.error(
+ "load_model_policies: unable to find model policy",
+ classname=classname,
+ model=c.model_name,
+ )
+ c.model = model_accessor.get_model_class(c.model_name)
+ policies.append(c)
+
+ log.info("Loaded model policies", policies=policies)
+ return policies
+
+ def execute_model_policy(self, instance, action):
+ # These are the models whose children get deleted when they are
+ delete_policy_models = ["Slice", "Instance", "Network"]
+ sender_name = getattr(instance, "model_name", instance.__class__.__name__)
+
+ # if (action != "deleted"):
+ # walk_inv_deps(self.update_dep, instance)
+ # walk_deps(self.update_wp, instance)
+ # elif (sender_name in delete_policy_models):
+ # walk_inv_deps(self.delete_if_inactive, instance)
+
+ policies_failed = False
+ for policy in self.policies_by_name.get(sender_name, None):
+ method_name = "handle_%s" % action
+ if hasattr(policy, method_name):
+ try:
+ log.debug(
+ "MODEL POLICY: calling handler",
+ sender_name=sender_name,
+ instance=instance,
+ policy=policy.__name__,
+ method=method_name,
+ )
+ getattr(policy(), method_name)(instance)
+ log.debug(
+ "MODEL POLICY: completed handler",
+ sender_name=sender_name,
+ instance=instance,
+ policy_name=policy.__name__,
+ method=method_name,
+ )
+ except Exception as e:
+ log.exception("MODEL POLICY: Exception when running handler", e=e)
+ policies_failed = True
+
+ try:
+ instance.policy_status = "%s" % traceback.format_exc(limit=1)
+ instance.policy_code = 2
+ instance.save(update_fields=["policy_status", "policy_code"])
+ except Exception as e:
+ log.exception(
+ "MODEL_POLICY: Exception when storing policy_status", e=e
+ )
+
+ if not policies_failed:
+ try:
+ instance.policed = max(instance.updated, instance.changed_by_step)
+ instance.policy_status = "done"
+ instance.policy_code = 1
+
+ instance.save(update_fields=["policed", "policy_status", "policy_code"])
+
+ if hasattr(policy, "after_policy_save"):
+ policy().after_policy_save(instance)
+
+ log.info("MODEL_POLICY: Saved", o=instance)
+ except BaseException:
+ log.exception(
+ "MODEL POLICY: Object failed to update policed timestamp",
+ instance=instance,
+ )
+
+ def noop(self, o, p):
+ pass
+
+ def run(self):
+ while True:
+ start = time.time()
+ try:
+ self.run_policy_once()
+ except Exception as e:
+ log.exception("MODEL_POLICY: Exception in run()", e=e)
+ if time.time() - start < 5:
+ time.sleep(5)
+
+ # TODO: This loop is different from the synchronizer event_loop, but they both do mostly the same thing. Look for
+ # ways to combine them.
+
+ def run_policy_once(self):
+ models = self.policies_by_class.keys()
+
+ model_accessor.check_db_connection_okay()
+
+ objects = model_accessor.fetch_policies(models, False)
+ deleted_objects = model_accessor.fetch_policies(models, True)
+
+ for o in objects:
+ if o.deleted:
+ # This shouldn't happen, but previous code was examining o.deleted. Verify.
+ continue
+ if not o.policed:
+ self.execute_model_policy(o, "create")
+ else:
+ self.execute_model_policy(o, "update")
+
+ for o in deleted_objects:
+ self.execute_model_policy(o, "delete")
+
+ try:
+ model_accessor.reset_queries()
+ except Exception as e:
+ # this shouldn't happen, but in case it does, catch it...
+ log.exception("MODEL POLICY: exception in reset_queries", e)
diff --git a/lib/xos-synchronizer/xossynchronizer/modelaccessor.py b/lib/xos-synchronizer/xossynchronizer/modelaccessor.py
new file mode 100644
index 0000000..6084579
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/modelaccessor.py
@@ -0,0 +1,322 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+""" ModelAccessor
+
+ A class for abstracting access to models. Used to get any djangoisms out
+ of the synchronizer code base.
+
+ This module will import all models into this module's global scope, so doing
+ a "from modelaccessor import *" from a calling module ought to import all
+ models into the calling module's scope.
+"""
+
+import functools
+import importlib
+import os
+import signal
+import sys
+import time
+from loadmodels import ModelLoadClient
+
+from xosconfig import Config
+from multistructlog import create_logger
+from xosutil.autodiscover_version import autodiscover_version_of_main
+
+log = create_logger(Config().get("logging"))
+
+orig_sigint = None
+model_accessor = None
+
+
+class ModelAccessor(object):
+ def __init__(self):
+ self.all_model_classes = self.get_all_model_classes()
+
+ def __getattr__(self, name):
+ """ Wrapper for getattr to facilitate retrieval of classes """
+ has_model_class = self.__getattribute__("has_model_class")
+ get_model_class = self.__getattribute__("get_model_class")
+ if has_model_class(name):
+ return get_model_class(name)
+
+ # Default behaviour
+ return self.__getattribute__(name)
+
+ def get_all_model_classes(self):
+ """ Build a dictionary of all model class names """
+ raise Exception("Not Implemented")
+
+ def get_model_class(self, name):
+ """ Given a class name, return that model class """
+ return self.all_model_classes[name]
+
+ def has_model_class(self, name):
+ """ Given a class name, return that model class """
+ return name in self.all_model_classes
+
+ def fetch_pending(self, main_objs, deletion=False):
+ """ Execute the default fetch_pending query """
+ raise Exception("Not Implemented")
+
+ def fetch_policies(self, main_objs, deletion=False):
+ """ Execute the default fetch_pending query """
+ raise Exception("Not Implemented")
+
+ def reset_queries(self):
+ """ Reset any state between passes of synchronizer. For django, to
+ limit memory consumption of cached queries.
+ """
+ pass
+
+ def connection_close(self):
+ """ Close any active database connection. For django, to limit memory
+ consumption.
+ """
+ pass
+
+ def check_db_connection_okay(self):
+ """ Checks to make sure the db connection is okay """
+ pass
+
+ def obj_exists(self, o):
+ """ Return True if the object exists in the data model """
+ raise Exception("Not Implemented")
+
+ def obj_in_list(self, o, olist):
+ """ Return True if o is the same as one of the objects in olist """
+ raise Exception("Not Implemented")
+
+ def now(self):
+ """ Return the current time for timestamping purposes """
+ raise Exception("Not Implemented")
+
+ def is_type(self, obj, name):
+ """ returns True is obj is of model type "name" """
+ raise Exception("Not Implemented")
+
+ def is_instance(self, obj, name):
+ """ returns True if obj is of model type "name" or is a descendant """
+ raise Exception("Not Implemented")
+
+ def get_content_type_id(self, obj):
+ raise Exception("Not Implemented")
+
+ def journal_object(self, o, operation, msg=None, timestamp=None):
+ pass
+
+ def create_obj(self, cls, **kwargs):
+ raise Exception("Not Implemented")
+
+
+def import_models_to_globals():
+ # add all models to globals
+ for (k, v) in model_accessor.all_model_classes.items():
+ globals()[k] = v
+
+ # xosbase doesn't exist from the synchronizer's perspective, so fake out
+ # ModelLink.
+ if "ModelLink" not in globals():
+
+ class ModelLink:
+ def __init__(self, dest, via, into=None):
+ self.dest = dest
+ self.via = via
+ self.into = into
+
+ globals()["ModelLink"] = ModelLink
+
+
+def keep_trying(client, reactor):
+ # Keep checking the connection to wait for it to become unavailable.
+ # Then reconnect. The strategy is to send NoOp operations, one per second, until eventually a NoOp throws an
+ # exception. This will indicate the server has reset. When that happens, we force the client to reconnect, and
+ # it will download a new API from the server.
+
+ from xosapi.xos_grpc_client import Empty
+
+ try:
+ client.utility.NoOp(Empty())
+ except Exception as e:
+ # If we caught an exception, then the API has become unavailable.
+ # So reconnect.
+
+ log.exception("exception in NoOp", e=e)
+ log.info("restarting synchronizer")
+
+ os.execv(sys.executable, ["python"] + sys.argv)
+ return
+
+ reactor.callLater(1, functools.partial(keep_trying, client, reactor))
+
+
+def grpcapi_reconnect(client, reactor):
+ global model_accessor
+
+ # Make sure to try to load models before trying to initialize the ORM. It might be the ORM is broken because it
+ # is waiting on our models.
+
+ if Config.get("models_dir"):
+ version = autodiscover_version_of_main(max_parent_depth=0) or "unknown"
+ log.info("Service version is %s" % version)
+ try:
+ ModelLoadClient(client).upload_models(
+ Config.get("name"), Config.get("models_dir"), version=version
+ )
+ except Exception as e: # TODO: narrow exception scope
+ if (
+ hasattr(e, "code")
+ and callable(e.code)
+ and hasattr(e.code(), "name")
+ and (e.code().name) == "UNAVAILABLE"
+ ):
+ # We need to make sure we force a reconnection, as it's possible that we will end up downloading a
+ # new xos API.
+ log.info("grpc unavailable during loadmodels. Force a reconnect")
+ client.connected = False
+ client.connect()
+ return
+ log.exception("failed to onboard models")
+ # If it's some other error, then we don't need to force a reconnect. Just try the LoadModels() again.
+ reactor.callLater(10, functools.partial(grpcapi_reconnect, client, reactor))
+ return
+
+ # If the ORM is broken, then wait for the orm to become available.
+
+ if not getattr(client, "xos_orm", None):
+ log.warning("No xos_orm. Will keep trying...")
+ reactor.callLater(1, functools.partial(keep_trying, client, reactor))
+ return
+
+ # this will prevent updated timestamps from being automatically updated
+ client.xos_orm.caller_kind = "synchronizer"
+
+ client.xos_orm.restart_on_disconnect = True
+
+ from apiaccessor import CoreApiModelAccessor
+
+ model_accessor = CoreApiModelAccessor(orm=client.xos_orm)
+
+ # If required_models is set, then check to make sure the required_models
+ # are present. If not, then the synchronizer needs to go to sleep until
+ # the models show up.
+
+ required_models = Config.get("required_models")
+ if required_models:
+ required_models = [x.strip() for x in required_models]
+
+ missing = []
+ found = []
+ for model in required_models:
+ if model_accessor.has_model_class(model):
+ found.append(model)
+ else:
+ missing.append(model)
+
+ log.info("required_models, found:", models=", ".join(found))
+ if missing:
+ log.warning("required_models: missing", models=", ".join(missing))
+ # We're missing a required model. Give up and wait for the connection
+ # to reconnect, and hope our missing model has shown up.
+ reactor.callLater(1, functools.partial(keep_trying, client, reactor))
+ return
+
+ # import all models to global space
+ import_models_to_globals()
+
+ # Synchronizer framework isn't ready to embrace reactor yet...
+ reactor.stop()
+
+ # Restore the sigint handler
+ signal.signal(signal.SIGINT, orig_sigint)
+
+
+def config_accessor_grpcapi():
+ global orig_sigint
+
+ log.info("Connecting to the gRPC API")
+
+ grpcapi_endpoint = Config.get("accessor.endpoint")
+ grpcapi_username = Config.get("accessor.username")
+ grpcapi_password = Config.get("accessor.password")
+
+ # if password starts with "@", then retreive the password from a file
+ if grpcapi_password.startswith("@"):
+ fn = grpcapi_password[1:]
+ if not os.path.exists(fn):
+ raise Exception("%s does not exist" % fn)
+ grpcapi_password = open(fn).readline().strip()
+
+ from xosapi.xos_grpc_client import SecureClient
+ from twisted.internet import reactor
+
+ grpcapi_client = SecureClient(
+ endpoint=grpcapi_endpoint, username=grpcapi_username, password=grpcapi_password
+ )
+ grpcapi_client.set_reconnect_callback(
+ functools.partial(grpcapi_reconnect, grpcapi_client, reactor)
+ )
+ grpcapi_client.start()
+
+ # Start reactor. This will cause the client to connect and then execute
+ # grpcapi_callback().
+
+ # Reactor will take over SIGINT during reactor.run(), but does not return it when reactor.stop() is called.
+
+ orig_sigint = signal.getsignal(signal.SIGINT)
+
+ # Start reactor. This will cause the client to connect and then execute
+ # grpcapi_callback().
+
+ reactor.run()
+
+
+def config_accessor_mock():
+ global model_accessor
+ from mock_modelaccessor import model_accessor as mock_model_accessor
+
+ model_accessor = mock_model_accessor
+
+ # mock_model_accessor doesn't have an all_model_classes field, so make one.
+ import mock_modelaccessor as mma
+
+ all_model_classes = {}
+ for k in dir(mma):
+ v = getattr(mma, k)
+ if hasattr(v, "leaf_model_name"):
+ all_model_classes[k] = v
+
+ model_accessor.all_model_classes = all_model_classes
+
+ import_models_to_globals()
+
+
+def config_accessor():
+ accessor_kind = Config.get("accessor.kind")
+
+ if accessor_kind == "testframework":
+ config_accessor_mock()
+ elif accessor_kind == "grpcapi":
+ config_accessor_grpcapi()
+ else:
+ raise Exception("Unknown accessor kind %s" % accessor_kind)
+
+ # now import any wrappers that the synchronizer needs to add to the ORM
+ if Config.get("wrappers"):
+ for wrapper_name in Config.get("wrappers"):
+ importlib.import_module(wrapper_name)
+
+
+config_accessor()
diff --git a/lib/xos-synchronizer/xossynchronizer/pull_step_engine.py b/lib/xos-synchronizer/xossynchronizer/pull_step_engine.py
new file mode 100644
index 0000000..3f4732d
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/pull_step_engine.py
@@ -0,0 +1,102 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import imp
+import inspect
+import os
+import threading
+import time
+from xosconfig import Config
+from multistructlog import create_logger
+
+log = create_logger(Config().get("logging"))
+
+
+class XOSPullStepScheduler:
+ """ XOSPullStepThread
+
+ A Thread for servicing pull steps. There is one event_step associated with one XOSPullStepThread.
+ The thread's pull_records() function is called for every five seconds.
+ """
+
+ def __init__(self, steps, *args, **kwargs):
+ self.steps = steps
+
+ def run(self):
+ while True:
+ time.sleep(5)
+ self.run_once()
+
+ def run_once(self):
+ log.trace("Starting pull steps", steps=self.steps)
+
+ threads = []
+ for step in self.steps:
+ thread = threading.Thread(target=step().pull_records, name="pull_step")
+ threads.append(thread)
+
+ for t in threads:
+ t.start()
+
+ for t in threads:
+ t.join()
+
+ log.trace("Done with pull steps", steps=self.steps)
+
+
+class XOSPullStepEngine:
+ """ XOSPullStepEngine
+
+ Load pull step modules. Two methods are defined:
+
+ load_pull_step_modules(dir) ... look for step modules in the given directory, and load objects that are
+ descendant from PullStep.
+
+ start() ... Launch threads to handle processing of the PullSteps. It's expected that load_step_modules()
+ will be called before start().
+ """
+
+ def __init__(self):
+ self.pull_steps = []
+
+ def load_pull_step_modules(self, pull_step_dir):
+ self.pull_steps = []
+ log.info("Loading pull steps", pull_step_dir=pull_step_dir)
+
+ # NOTE we'll load all the classes that inherit from PullStep
+ for fn in os.listdir(pull_step_dir):
+ pathname = os.path.join(pull_step_dir, fn)
+ if (
+ os.path.isfile(pathname)
+ and fn.endswith(".py")
+ and (fn != "__init__.py")
+ and ("test" not in fn)
+ ):
+ event_module = imp.load_source(fn[:-3], pathname)
+
+ for classname in dir(event_module):
+ c = getattr(event_module, classname, None)
+
+ if inspect.isclass(c):
+ base_names = [b.__name__ for b in c.__bases__]
+ if "PullStep" in base_names:
+ self.pull_steps.append(c)
+ log.info("Loaded pull steps", steps=self.pull_steps)
+
+ def start(self):
+ log.info("Starting pull steps engine", steps=self.pull_steps)
+
+ for step in self.pull_steps:
+ sched = XOSPullStepScheduler(steps=self.pull_steps)
+ sched.run()
diff --git a/lib/xos-synchronizer/xossynchronizer/pull_steps/__init__.py b/lib/xos-synchronizer/xossynchronizer/pull_steps/__init__.py
new file mode 100644
index 0000000..b0fb0b2
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/pull_steps/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/lib/xos-synchronizer/xossynchronizer/pull_steps/pullstep.py b/lib/xos-synchronizer/xossynchronizer/pull_steps/pullstep.py
new file mode 100644
index 0000000..adbc0b1
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/pull_steps/pullstep.py
@@ -0,0 +1,33 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class PullStep(object):
+ """
+ All the pull steps defined in each synchronizer needs to inherit from this class in order to be loaded
+ """
+
+ def __init__(self, **kwargs):
+ """
+ Initialize a pull step
+ :param kwargs:
+ -- observed_model: name of the model that is being polled
+ """
+ self.observed_model = kwargs.get("observed_model")
+
+ def pull_records(self):
+ self.log.debug(
+ "There is no default pull_records, please provide a pull_records method for %s"
+ % self.observed_model
+ )
diff --git a/lib/xos-synchronizer/xossynchronizer/steps/SyncInstanceUsingAnsible.py b/lib/xos-synchronizer/xossynchronizer/steps/SyncInstanceUsingAnsible.py
new file mode 100644
index 0000000..6ed656c
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/steps/SyncInstanceUsingAnsible.py
@@ -0,0 +1,320 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import hashlib
+import os
+import socket
+import sys
+import base64
+import time
+from xosconfig import Config
+
+from xossynchronizer.steps.syncstep import SyncStep, DeferredException
+from xossynchronizer.ansible_helper import run_template_ssh
+from xossynchronizer.modelaccessor import *
+
+
+class SyncInstanceUsingAnsible(SyncStep):
+ # All of the following should be defined for classes derived from this
+ # base class. Examples below use VSGTenant.
+
+ # provides=[VSGTenant]
+ # observes=VSGTenant
+ # requested_interval=0
+ # template_name = "sync_vcpetenant.yaml"
+
+ def __init__(self, **args):
+ SyncStep.__init__(self, **args)
+
+ def skip_ansible_fields(self, o):
+ # Return True if the instance processing and get_ansible_fields stuff
+ # should be skipped. This hook is primarily for the OnosApp
+ # sync step, so it can do its external REST API sync thing.
+ return False
+
+ def defer_sync(self, o, reason):
+ # zdw, 2017-02-18 - is raising the exception here necessary? - seems like
+ # it's just logging the same thing twice
+ self.log.info("defer object", object=str(o), reason=reason, **o.tologdict())
+ raise DeferredException("defer object %s due to %s" % (str(o), reason))
+
+ def get_extra_attributes(self, o):
+ # This is a place to include extra attributes that aren't part of the
+ # object itself.
+
+ return {}
+
+ def get_instance(self, o):
+ # We need to know what instance is associated with the object. Let's
+ # assume 'o' has a field called 'instance'. If the field is called
+ # something else, or if custom logic is needed, then override this
+ # method.
+
+ return o.instance
+
+ def get_external_sync(self, o):
+ hostname = getattr(o, "external_hostname", None)
+ container = getattr(o, "external_container", None)
+ if hostname and container:
+ return (hostname, container)
+ else:
+ return None
+
+ def run_playbook(self, o, fields, template_name=None):
+ if not template_name:
+ template_name = self.template_name
+ tStart = time.time()
+ run_template_ssh(template_name, fields, object=o)
+ self.log.info(
+ "playbook execution time", time=int(time.time() - tStart), **o.tologdict()
+ )
+
+ def pre_sync_hook(self, o, fields):
+ pass
+
+ def post_sync_hook(self, o, fields):
+ pass
+
+ def sync_fields(self, o, fields):
+ self.run_playbook(o, fields)
+
+ def prepare_record(self, o):
+ pass
+
+ def get_node(self, o):
+ return o.node
+
+ def get_node_key(self, node):
+ # NOTE `node_key` is never defined, does it differ from `proxy_ssh_key`? the value looks to be the same
+ return Config.get("node_key")
+
+ def get_key_name(self, instance):
+ if instance.isolation == "vm":
+ if (
+ instance.slice
+ and instance.slice.service
+ and instance.slice.service.private_key_fn
+ ):
+ key_name = instance.slice.service.private_key_fn
+ else:
+ raise Exception("Make sure to set private_key_fn in the service")
+ elif instance.isolation == "container":
+ node = self.get_node(instance)
+ key_name = self.get_node_key(node)
+ else:
+ # container in VM
+ key_name = instance.parent.slice.service.private_key_fn
+
+ return key_name
+
+ def get_ansible_fields(self, instance):
+ # return all of the fields that tell Ansible how to talk to the context
+ # that's setting up the container.
+
+ if instance.isolation == "vm":
+ # legacy where container was configured by sync_vcpetenant.py
+
+ fields = {
+ "instance_name": instance.name,
+ "hostname": instance.node.name,
+ "instance_id": instance.instance_id,
+ "username": "ubuntu",
+ "ssh_ip": instance.get_ssh_ip(),
+ }
+
+ elif instance.isolation == "container":
+ # container on bare metal
+ node = self.get_node(instance)
+ hostname = node.name
+ fields = {
+ "hostname": hostname,
+ "baremetal_ssh": True,
+ "instance_name": "rootcontext",
+ "username": "root",
+ "container_name": "%s-%s" % (instance.slice.name, str(instance.id))
+ # ssh_ip is not used for container-on-metal
+ }
+ else:
+ # container in a VM
+ if not instance.parent:
+ raise Exception("Container-in-VM has no parent")
+ if not instance.parent.instance_id:
+ raise Exception("Container-in-VM parent is not yet instantiated")
+ if not instance.parent.slice.service:
+ raise Exception("Container-in-VM parent has no service")
+ if not instance.parent.slice.service.private_key_fn:
+ raise Exception("Container-in-VM parent service has no private_key_fn")
+ fields = {
+ "hostname": instance.parent.node.name,
+ "instance_name": instance.parent.name,
+ "instance_id": instance.parent.instance_id,
+ "username": "ubuntu",
+ "ssh_ip": instance.parent.get_ssh_ip(),
+ "container_name": "%s-%s" % (instance.slice.name, str(instance.id)),
+ }
+
+ key_name = self.get_key_name(instance)
+ if not os.path.exists(key_name):
+ raise Exception("Node key %s does not exist" % key_name)
+
+ key = file(key_name).read()
+
+ fields["private_key"] = key
+
+ # Now the ceilometer stuff
+ # Only do this if the instance is not being deleted.
+ if not instance.deleted:
+ cslice = ControllerSlice.objects.get(slice_id=instance.slice.id)
+ if not cslice:
+ raise Exception(
+ "Controller slice object for %s does not exist"
+ % instance.slice.name
+ )
+
+ cuser = ControllerUser.objects.get(user_id=instance.creator.id)
+ if not cuser:
+ raise Exception(
+ "Controller user object for %s does not exist" % instance.creator
+ )
+
+ fields.update(
+ {
+ "keystone_tenant_id": cslice.tenant_id,
+ "keystone_user_id": cuser.kuser_id,
+ "rabbit_user": getattr(instance.controller, "rabbit_user", None),
+ "rabbit_password": getattr(
+ instance.controller, "rabbit_password", None
+ ),
+ "rabbit_host": getattr(instance.controller, "rabbit_host", None),
+ }
+ )
+
+ return fields
+
+ def sync_record(self, o):
+ self.log.info("sync'ing object", object=str(o), **o.tologdict())
+
+ self.prepare_record(o)
+
+ if self.skip_ansible_fields(o):
+ fields = {}
+ else:
+ if self.get_external_sync(o):
+ # sync to some external host
+
+ # UNTESTED
+
+ (hostname, container_name) = self.get_external_sync(o)
+ fields = {
+ "hostname": hostname,
+ "baremetal_ssh": True,
+ "instance_name": "rootcontext",
+ "username": "root",
+ "container_name": container_name,
+ }
+ key_name = self.get_node_key(node)
+ if not os.path.exists(key_name):
+ raise Exception("Node key %s does not exist" % key_name)
+
+ key = file(key_name).read()
+
+ fields["private_key"] = key
+ # TO DO: Ceilometer stuff
+ else:
+ instance = self.get_instance(o)
+ # sync to an XOS instance
+ if not instance:
+ self.defer_sync(o, "waiting on instance")
+ return
+
+ if not instance.instance_name:
+ self.defer_sync(o, "waiting on instance.instance_name")
+ return
+
+ fields = self.get_ansible_fields(instance)
+
+ fields["ansible_tag"] = getattr(
+ o, "ansible_tag", o.__class__.__name__ + "_" + str(o.id)
+ )
+
+ # If 'o' defines a 'sync_attributes' list, then we'll copy those
+ # attributes into the Ansible recipe's field list automatically.
+ if hasattr(o, "sync_attributes"):
+ for attribute_name in o.sync_attributes:
+ fields[attribute_name] = getattr(o, attribute_name)
+
+ fields.update(self.get_extra_attributes(o))
+
+ self.sync_fields(o, fields)
+
+ o.save()
+
+ def delete_record(self, o):
+ try:
+ # TODO: This may be broken, as get_controller() does not exist in convenience wrapper
+ controller = o.get_controller()
+ controller_register = json.loads(
+ o.node.site_deployment.controller.backend_register
+ )
+
+ if controller_register.get("disabled", False):
+ raise InnocuousException(
+ "Controller %s is disabled" % o.node.site_deployment.controller.name
+ )
+ except AttributeError:
+ pass
+
+ instance = self.get_instance(o)
+
+ if not instance:
+ # the instance is gone. There's nothing left for us to do.
+ return
+
+ if instance.deleted:
+ # the instance is being deleted. There's nothing left for us to do.
+ return
+
+ if isinstance(instance, basestring):
+ # sync to some external host
+
+ # XXX - this probably needs more work...
+
+ fields = {
+ "hostname": instance,
+ "instance_id": "ubuntu", # this is the username to log into
+ "private_key": service.key,
+ }
+ else:
+ # sync to an XOS instance
+ fields = self.get_ansible_fields(instance)
+
+ fields["ansible_tag"] = getattr(
+ o, "ansible_tag", o.__class__.__name__ + "_" + str(o.id)
+ )
+
+ # If 'o' defines a 'sync_attributes' list, then we'll copy those
+ # attributes into the Ansible recipe's field list automatically.
+ if hasattr(o, "sync_attributes"):
+ for attribute_name in o.sync_attributes:
+ fields[attribute_name] = getattr(o, attribute_name)
+
+ if hasattr(self, "map_delete_inputs"):
+ fields.update(self.map_delete_inputs(o))
+
+ fields["delete"] = True
+ res = self.run_playbook(o, fields)
+
+ if hasattr(self, "map_delete_outputs"):
+ self.map_delete_outputs(o, res)
diff --git a/lib/xos-synchronizer/xossynchronizer/steps/__init__.py b/lib/xos-synchronizer/xossynchronizer/steps/__init__.py
new file mode 100644
index 0000000..b0fb0b2
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/steps/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/lib/xos-synchronizer/xossynchronizer/steps/sync_object.py b/lib/xos-synchronizer/xossynchronizer/steps/sync_object.py
new file mode 100644
index 0000000..1fb5894
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/steps/sync_object.py
@@ -0,0 +1,25 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from synchronizers.new_base.syncstep import *
+
+
+class SyncObject(SyncStep):
+ provides = [] # Caller fills this in
+ requested_interval = 0
+ observes = [] # Caller fills this in
+
+ def sync_record(self, r):
+ raise DeferredException("Waiting for Service dependency: %r" % r)
diff --git a/lib/xos-synchronizer/xossynchronizer/steps/syncstep.py b/lib/xos-synchronizer/xossynchronizer/steps/syncstep.py
new file mode 100644
index 0000000..2f31e3e
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/steps/syncstep.py
@@ -0,0 +1,158 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import base64
+
+from xosconfig import Config
+from xossynchronizer.modelaccessor import *
+from xossynchronizer.ansible_helper import run_template
+
+# from tests.steps.mock_modelaccessor import model_accessor
+
+import json
+import time
+import pdb
+
+from xosconfig import Config
+from functools import reduce
+
+
+def f7(seq):
+ seen = set()
+ seen_add = seen.add
+ return [x for x in seq if not (x in seen or seen_add(x))]
+
+
+def elim_dups(backend_str):
+ strs = backend_str.split(" // ")
+ strs2 = f7(strs)
+ return " // ".join(strs2)
+
+
+def deepgetattr(obj, attr):
+ return reduce(getattr, attr.split("."), obj)
+
+
+def obj_class_name(obj):
+ return getattr(obj, "model_name", obj.__class__.__name__)
+
+
+class InnocuousException(Exception):
+ pass
+
+
+class DeferredException(Exception):
+ pass
+
+
+class FailedDependency(Exception):
+ pass
+
+
+class SyncStep(object):
+ """ An XOS Sync step.
+
+ Attributes:
+ psmodel Model name the step synchronizes
+ dependencies list of names of models that must be synchronized first if the current model depends on them
+ """
+
+ # map_sync_outputs can return this value to cause a step to be marked
+ # successful without running ansible. Used for sync_network_controllers
+ # on nat networks.
+ SYNC_WITHOUT_RUNNING = "sync_without_running"
+
+ slow = False
+
+ def get_prop(self, prop):
+ # NOTE config_dir is never define, is this used?
+ sync_config_dir = Config.get("config_dir")
+ prop_config_path = "/".join(sync_config_dir, self.name, prop)
+ return open(prop_config_path).read().rstrip()
+
+ def __init__(self, **args):
+ """Initialize a sync step
+ Keyword arguments:
+ name -- Name of the step
+ provides -- XOS models sync'd by this step
+ """
+ dependencies = []
+ self.driver = args.get("driver")
+ self.error_map = args.get("error_map")
+
+ try:
+ self.soft_deadline = int(self.get_prop("soft_deadline_seconds"))
+ except BaseException:
+ self.soft_deadline = 5 # 5 seconds
+
+ if "log" in args:
+ self.log = args.get("log")
+
+ return
+
+ def fetch_pending(self, deletion=False):
+ # This is the most common implementation of fetch_pending
+ # Steps should override it if they have their own logic
+ # for figuring out what objects are outstanding.
+
+ return model_accessor.fetch_pending(self.observes, deletion)
+
+ def sync_record(self, o):
+ self.log.debug("In default sync record", **o.tologdict())
+
+ tenant_fields = self.map_sync_inputs(o)
+ if tenant_fields == SyncStep.SYNC_WITHOUT_RUNNING:
+ return
+
+ main_objs = self.observes
+ if isinstance(main_objs, list):
+ main_objs = main_objs[0]
+
+ path = "".join(main_objs.__name__).lower()
+ res = run_template(self.playbook, tenant_fields, path=path, object=o)
+
+ if hasattr(self, "map_sync_outputs"):
+ self.map_sync_outputs(o, res)
+
+ self.log.debug("Finished default sync record", **o.tologdict())
+
+ def delete_record(self, o):
+ self.log.debug("In default delete record", **o.tologdict())
+
+ # If there is no map_delete_inputs, then assume deleting a record is a no-op.
+ if not hasattr(self, "map_delete_inputs"):
+ return
+
+ tenant_fields = self.map_delete_inputs(o)
+
+ main_objs = self.observes
+ if isinstance(main_objs, list):
+ main_objs = main_objs[0]
+
+ path = "".join(main_objs.__name__).lower()
+
+ tenant_fields["delete"] = True
+ res = run_template(self.playbook, tenant_fields, path=path)
+
+ if hasattr(self, "map_delete_outputs"):
+ self.map_delete_outputs(o, res)
+ else:
+ # "rc" is often only returned when something bad happens, so assume that no "rc" implies a successful rc
+ # of 0.
+ if res[0].get("rc", 0) != 0:
+ raise Exception("Nonzero rc from Ansible during delete_record")
+
+ self.log.debug("Finished default delete record", **o.tologdict())
diff --git a/lib/xos-synchronizer/xossynchronizer/synchronizer.py b/lib/xos-synchronizer/xossynchronizer/synchronizer.py
new file mode 100644
index 0000000..9a530d7
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/synchronizer.py
@@ -0,0 +1,68 @@
+#!/usr/bin/env python
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import sys
+import time
+
+from xosconfig import Config
+from multistructlog import create_logger
+
+class Synchronizer(object):
+ def __init__(self):
+ self.log = create_logger(Config().get("logging"))
+
+ def create_model_accessor(self):
+ from modelaccessor import model_accessor
+
+ self.model_accessor = model_accessor
+
+ def wait_for_ready(self):
+ models_active = False
+ wait = False
+ while not models_active:
+ try:
+ _i = self.model_accessor.Instance.objects.first()
+ _n = self.model_accessor.NetworkTemplate.objects.first()
+ models_active = True
+ except Exception as e:
+ self.log.info("Exception", e=e)
+ self.log.info("Waiting for data model to come up before starting...")
+ time.sleep(10)
+ wait = True
+
+ if wait:
+ time.sleep(
+ 60
+ ) # Safety factor, seeing that we stumbled waiting for the data model to come up.
+
+ def run(self):
+ self.create_model_accessor()
+ self.wait_for_ready()
+
+ # Don't import backend until after the model accessor has been initialized. This is to support sync steps that
+ # use `from xossynchronizer.modelaccessor import ...` and require the model accessor to be initialized before
+ # their code can be imported.
+
+ from backend import Backend
+
+ log_closure = self.log.bind(synchronizer_name=Config().get("name"))
+ backend = Backend(log=log_closure)
+ backend.run()
+
+
+if __name__ == "__main__":
+ main()