CORD-1677: Refactor of Synchronizer core
CORD-1678: Better diagnostics for synchronizer framework
Change-Id: I542a3fa24f836847a5e184721150e9b51cac82cf
diff --git a/xos/core/models/core.xproto b/xos/core/models/core.xproto
index 05e9c04..bfdf4b0 100644
--- a/xos/core/models/core.xproto
+++ b/xos/core/models/core.xproto
@@ -10,14 +10,16 @@
optional string backend_register = 5 [default = "{}", max_length = 1024];
required bool backend_need_delete = 6 [default = False];
required bool backend_need_reap = 7 [default = False];
- required string backend_status = 8 [default = "0 - Provisioning in progress", max_length = 1024, null = True];
- required bool deleted = 9 [default = False];
- required bool write_protect = 11 [default = False];
- required bool lazy_blocked = 11 [default = False];
- required bool no_sync = 12 [default = False];
- required bool no_policy = 13 [default = False];
- optional string policy_status = 14 [default = "0 - Policy in process", max_length = 1024];
- required string leaf_model_name = 15 [null = False, max_length = 1024, help_text = "The most specialized model in this chain of inheritance, often defined by a service developer"];
+ required string backend_status = 8 [default = "Provisioning in progress", max_length = 1024, null = True];
+ required int32 backend_code = 9 [default = 0];
+ required bool deleted = 10 [default = False];
+ required bool write_protect = 12 [default = False];
+ required bool lazy_blocked = 13 [default = False];
+ required bool no_sync = 14 [default = False];
+ required bool no_policy = 15 [default = False];
+ optional string policy_status = 16 [default = "Policy in process", max_length = 1024];
+ optional int32 policy_code = 16 [default = 0];
+ required string leaf_model_name = 17 [null = False, max_length = 1024, help_text = "The most specialized model in this chain of inheritance, often defined by a service developer"];
}
// The calling user represents the user being accessed, or is a site admin.
diff --git a/xos/synchronizers/model_policy.py b/xos/synchronizers/model_policy.py
index aabbf83..55e8c94 100644
--- a/xos/synchronizers/model_policy.py
+++ b/xos/synchronizers/model_policy.py
@@ -171,8 +171,6 @@
objects = []
deleted_objects = []
- log.debug("MODEL POLICY: run_policy_once()")
-
check_db_connection_okay()
for m in models:
@@ -192,5 +190,3 @@
except Exception,e:
# this shouldn't happen, but in case it does, catch it...
log.exception("MODEL POLICY: exception in reset_queries", e = e)
-
- log.debug("MODEL POLICY: finished run_policy_once()")
diff --git a/xos/synchronizers/new_base/backend.py b/xos/synchronizers/new_base/backend.py
index 656c3cd..1f3034d 100644
--- a/xos/synchronizers/new_base/backend.py
+++ b/xos/synchronizers/new_base/backend.py
@@ -38,32 +38,36 @@
class Backend:
- def __init__(self):
+ def __init__(self, log = log):
+ self.log = log
pass
def load_sync_step_modules(self, step_dir):
sync_steps = []
- log.info("Loading sync steps", strp_dir = step_dir)
+ self.log.info("Loading sync steps", step_dir = step_dir)
for fn in os.listdir(step_dir):
pathname = os.path.join(step_dir,fn)
if os.path.isfile(pathname) and fn.endswith(".py") and (fn!="__init__.py"):
module = imp.load_source(fn[:-3],pathname)
+
for classname in dir(module):
c = getattr(module, classname, None)
- if classname.startswith("Sync"):
- print classname, c, inspect.isclass(c), issubclass(c, SyncStep), hasattr(c,"provides")
+ #if classname.startswith("Sync"):
+ # print classname, c, inspect.isclass(c), issubclass(c, SyncStep), hasattr(c,"provides")
# make sure 'c' is a descendent of SyncStep and has a
# provides field (this eliminates the abstract base classes
# since they don't have a provides)
+
+ if inspect.isclass(c):
+ base_names = [b.__name__ for b in c.__bases__]
+ if ('SyncStep' in base_names or 'OpenStackSyncStep' in base_names or 'SyncInstanceUsingAnsible' in base_names) and hasattr(c,"provides") and (c not in sync_steps):
+ sync_steps.append(c)
- if inspect.isclass(c) and issubclass(c, SyncStep) and hasattr(c,"provides") and (c not in sync_steps):
- sync_steps.append(c)
-
- log.info("Loaded sync steps", count = len(sync_steps))
+ self.log.info("Loaded sync steps", steps = sync_steps)
return sync_steps
@@ -72,14 +76,14 @@
watcher_thread = None
model_policy_thread = None
- model_accessor.update_diag(sync_start=time.time(), backend_status="0 - Synchronizer Start")
+ model_accessor.update_diag(sync_start=time.time(), backend_status="Synchronizer Start")
steps_dir = Config.get("steps_dir")
if steps_dir:
sync_steps = self.load_sync_step_modules(steps_dir)
if sync_steps:
# start the observer
- observer = XOSObserver(sync_steps)
+ observer = XOSObserver(sync_steps, log = self.log)
observer_thread = threading.Thread(target=observer.run,name='synchronizer')
observer_thread.start()
@@ -89,16 +93,16 @@
watcher_thread = threading.Thread(target=watcher.run,name='watcher')
watcher_thread.start()
else:
- log.info("Skipping observer and watcher threads due to no steps dir.")
+ self.log.info("Skipping observer and watcher threads due to no steps dir.")
# start model policies thread
policies_dir = Config.get("model_policies_dir")
if policies_dir:
- policy_engine = XOSPolicyEngine(policies_dir=policies_dir)
+ policy_engine = XOSPolicyEngine(policies_dir=policies_dir, log = self.log)
model_policy_thread = threading.Thread(target=policy_engine.run, name="policy_engine")
model_policy_thread.start()
else:
- log.info("Skipping model policies thread due to no model_policies dir.")
+ self.log.info("Skipping model policies thread due to no model_policies dir.")
while True:
try:
diff --git a/xos/synchronizers/new_base/dependency_walker_new.py b/xos/synchronizers/new_base/dependency_walker_new.py
index fa7e5ed..e278484 100644
--- a/xos/synchronizers/new_base/dependency_walker_new.py
+++ b/xos/synchronizers/new_base/dependency_walker_new.py
@@ -40,6 +40,7 @@
dep_data = open(Config.get("dependency_graph")).read()
dependencies = json.loads(dep_data)
+dependencies = {k:[item[0] for item in items] for k,items in dependencies.items()}
inv_dependencies = {}
for k, lst in dependencies.items():
diff --git a/xos/synchronizers/new_base/djangoaccessor.py b/xos/synchronizers/new_base/djangoaccessor.py
index cb0de92..b76fe9e 100644
--- a/xos/synchronizers/new_base/djangoaccessor.py
+++ b/xos/synchronizers/new_base/djangoaccessor.py
@@ -90,9 +90,9 @@
# db.connection.close()
db.close_old_connections()
except Exception, e:
- logger.exception("XXX we failed to fix the failure", e = e)
+ log.exception("XXX we failed to fix the failure", e = e)
else:
- logger.exception("XXX some other error")
+ log.exception("XXX some other error")
def obj_exists(self, o):
return (o.pk is not None)
diff --git a/xos/synchronizers/new_base/error_mapper.py b/xos/synchronizers/new_base/error_mapper.py
deleted file mode 100644
index 5617d20..0000000
--- a/xos/synchronizers/new_base/error_mapper.py
+++ /dev/null
@@ -1,36 +0,0 @@
-
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from xosconfig import Config
-from multistructlog import create_logger
-
-log = create_logger(Config().get('logging'))
-
-class ErrorMapper:
- def __init__(self, error_map_file):
- self.error_map = {}
- try:
- error_map_lines = open(error_map_file).read().splitlines()
- for l in error_map_lines:
- if (not l.startswith('#')):
- splits = l.split('->')
- k, v = map(lambda i: i.rstrip(), splits)
- self.error_map[k] = v
- except:
- log.info('Could not read error map')
-
- def map(self, error):
- return self.error_map[error]
diff --git a/xos/synchronizers/new_base/event_loop.py b/xos/synchronizers/new_base/event_loop.py
index 91d82f7..924a45b 100644
--- a/xos/synchronizers/new_base/event_loop.py
+++ b/xos/synchronizers/new_base/event_loop.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -13,51 +12,37 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
-import os
-import imp
-import inspect
import time
import sys
-import traceback
-import commands
import threading
import json
import pdb
import pprint
import traceback
+from collections import defaultdict
+from networkx import DiGraph, dfs_edges, weakly_connected_component_subgraphs, all_shortest_paths, NetworkXNoPath
+from networkx.algorithms.dag import topological_sort
+
from datetime import datetime
+#from multistructlog import create_logger
+from xosconfig import Config
+from synchronizers.new_base.steps import *
+from syncstep import InnocuousException, DeferredException, SyncStep
+from synchronizers.new_base.modelaccessor import *
from xosconfig import Config
from multistructlog import create_logger
log = create_logger(Config().get('logging'))
-from synchronizers.new_base.steps import *
-from syncstep import SyncStep, NullSyncStep
-from toposort import toposort
-from synchronizers.new_base.error_mapper import *
-from synchronizers.new_base.steps.sync_object import SyncObject
-from synchronizers.new_base.modelaccessor import *
-
-debug_mode = False
-
-
-class bcolors:
- HEADER = '\033[95m'
- OKBLUE = '\033[94m'
- OKGREEN = '\033[92m'
- WARNING = '\033[93m'
- FAIL = '\033[91m'
- ENDC = '\033[0m'
- BOLD = '\033[1m'
- UNDERLINE = '\033[4m'
-
-
-
class StepNotReady(Exception):
pass
+class ExternalDependencyFailed(Exception):
+ pass
+
+# FIXME: Move drivers into a context shared across sync steps.
+
class NoOpDriver:
def __init__(self):
@@ -67,39 +52,25 @@
# Everyone gets NoOpDriver by default. To use a different driver, call
# set_driver() below.
-
DRIVER = NoOpDriver()
+
def set_driver(x):
global DRIVER
DRIVER = x
-STEP_STATUS_WORKING = 1
-STEP_STATUS_OK = 2
-STEP_STATUS_KO = 3
-
-
-def invert_graph(g):
- ig = {}
- for k, v in g.items():
- for v0 in v:
- try:
- ig[v0].append(k)
- except:
- ig[v0] = [k]
- return ig
-
-
class XOSObserver:
sync_steps = []
- def __init__(self, sync_steps):
- # The Condition object that gets signalled by Feefie events
+ def __init__(self, sync_steps, log = log):
+ # The Condition object via which events are received
+ self.log = log
self.step_lookup = {}
self.sync_steps = sync_steps
self.load_sync_steps()
+ self.load_dependency_graph()
self.event_cond = threading.Condition()
self.driver = DRIVER
@@ -111,403 +82,469 @@
self.event_cond.release()
def wake_up(self):
- log.info('Wake up routine called. Event cond %r' % self.event_cond)
+ self.log.debug('Wake up routine called')
self.event_cond.acquire()
self.event_cond.notify()
self.event_cond.release()
- def load_sync_steps(self):
+ def load_dependency_graph(self):
dep_path = Config.get("dependency_graph")
- log.info('Loading model dependency graph from %s' % dep_path)
- try:
- # This contains dependencies between records, not sync steps
- self.model_dependency_graph = json.loads(open(dep_path).read())
- for left, lst in self.model_dependency_graph.items():
- new_lst = []
- for k in lst:
- try:
- tup = (k, k.lower())
- new_lst.append(tup)
- deps = self.model_dependency_graph[k]
- except:
- self.model_dependency_graph[k] = []
+ self.log.info('Loading model dependency graph', path = dep_path)
- self.model_dependency_graph[left] = new_lst
+ try:
+ dep_graph_str = open(dep_path).read()
+
+ # joint_dependencies is of the form { Model1 -> [(Model2, src_port, dst_port), ...] }
+ # src_port is the field that accesses Model2 from Model1
+ # dst_port is the field that accesses Model1 from Model2
+ joint_dependencies = json.loads(dep_graph_str)
+
+ model_dependency_graph = DiGraph()
+ for src_model, deps in joint_dependencies.items():
+ for dep in deps:
+ dst_model, src_accessor, dst_accessor = dep
+ if src_model != dst_model:
+ edge_label = {'src_accessor': src_accessor,
+ 'dst_accessor': dst_accessor}
+ model_dependency_graph.add_edge(
+ src_model, dst_model, edge_label)
+
+ model_dependency_graph_rev = model_dependency_graph.reverse(
+ copy=True)
+ self.model_dependency_graph = {
+ # deletion
+ True: model_dependency_graph_rev,
+ False: model_dependency_graph
+ }
+ self.log.info("Loaded dependencies", edges = model_dependency_graph.edges())
except Exception as e:
+ self.log.exception("Error loading dependency graph", e = e)
raise e
- try:
- # FIXME `pl_dependency_graph` is never defined, this will always fail
- # NOTE can we remove it?
- backend_path = Config.get("pl_dependency_graph")
- log.info(
- 'Loading backend dependency graph from %s' %
- backend_path)
- # This contains dependencies between backend records
- self.backend_dependency_graph = json.loads(
- open(backend_path).read())
- for k, v in self.backend_dependency_graph.items():
- try:
- self.model_dependency_graph[k].extend(v)
- except KeyError:
- self.model_dependency_graphp[k] = v
+ def load_sync_steps(self):
+ model_to_step = defaultdict(list)
+ external_dependencies = []
- except Exception as e:
- log.info('Backend dependency graph not loaded')
- # We can work without a backend graph
- self.backend_dependency_graph = {}
-
- provides_dict = {}
for s in self.sync_steps:
- self.step_lookup[s.__name__] = s
- for m in s.provides:
- try:
- provides_dict[m.__name__].append(s.__name__)
- except KeyError:
- provides_dict[m.__name__] = [s.__name__]
+ if not isinstance(s.observes, list):
+ observes = [s.observes]
+ else:
+ observes = s.observes
- step_graph = {}
- phantom_steps = []
- for k, v in self.model_dependency_graph.items():
+ for m in observes:
+ model_to_step[m.__name__].append(s.__name__)
+
try:
- for source in provides_dict[k]:
- if (not v):
- step_graph[source] = []
-
- for m, _ in v:
- try:
- for dest in provides_dict[m]:
- # no deps, pass
- try:
- if (dest not in step_graph[source]):
- step_graph[source].append(dest)
- except:
- step_graph[source] = [dest]
- except KeyError:
- if (m not in provides_dict):
- try:
- step_graph[source] += ['#%s' % m]
- except:
- step_graph[source] = ['#%s' % m]
-
- phantom_steps += ['#%s' % m]
- pass
-
- except KeyError:
+ external_dependencies.extend(s.external_dependencies)
+ except AttributeError:
pass
- # no dependencies, pass
- self.dependency_graph = step_graph
- self.deletion_dependency_graph = invert_graph(step_graph)
+ self.step_lookup[s.__name__] = s
- pp = pprint.PrettyPrinter(indent=4)
- log.debug(pp.pformat(step_graph))
- self.ordered_steps = toposort(
- self.dependency_graph, phantom_steps + map(lambda s: s.__name__, self.sync_steps))
- self.ordered_steps = [
- i for i in self.ordered_steps if i != 'SyncObject']
+ self.model_to_step = model_to_step
+ self.external_dependencies = list(set(external_dependencies))
+ self.log.info('Loaded external dependencies', external_dependencies = external_dependencies)
+ self.log.info('Loaded model_map', **model_to_step)
- self.load_run_times()
-
- def check_duration(self, step, duration):
+ def reset_model_accessor(self, o=None):
try:
- if (duration > step.deadline):
- log.info(
- 'Sync step missed deadline',
- step_name = step.name, duration = duration)
+ model_accessor.reset_queries()
+ except BaseException:
+ # this shouldn't happen, but in case it does, catch it...
+ if (o):
+ logdict = o.tologdict()
+ else:
+ logdict = {}
+
+ log.error("exception in reset_queries", **logdict)
+
+ def delete_record(self, o, log):
+ if getattr(o, "backend_need_reap", False):
+ # the object has already been deleted and marked for reaping
+ model_accessor.journal_object(
+ o, "syncstep.call.already_marked_reap")
+ else:
+ step = getattr(o, 'synchronizer_step', None)
+ if not step:
+ raise ExternalDependencyFailed
+
+ model_accessor.journal_object(o, "syncstep.call.delete_record")
+ log.debug("Deleting object", **o.tologdict())
+
+ step.log = log.bind(step = step)
+ step.delete_record(o)
+ step.log = self.log
+
+ log.debug("Deleted object", **o.tologdict())
+
+ model_accessor.journal_object(o, "syncstep.call.delete_set_reap")
+ o.backend_need_reap = True
+ o.save(update_fields=['backend_need_reap'])
+
+
+ def sync_record(self, o, log):
+ try:
+ step = o.synchronizer_step
except AttributeError:
- # S doesn't have a deadline
- pass
+ raise ExternalDependencyFailed
- def update_run_time(self, step, deletion):
- if (not deletion):
- self.last_run_times[step.__name__] = time.time()
+ new_enacted = model_accessor.now()
+
+ # Mark this as an object that will require delete. Do
+ # this now rather than after the syncstep,
+ if not (o.backend_need_delete):
+ o.backend_need_delete = True
+ o.save(update_fields=['backend_need_delete'])
+
+ model_accessor.journal_object(o, "syncstep.call.sync_record")
+
+ log.debug("Syncing object", **o.tologdict())
+
+ step.log = log.bind(step = step)
+ step.sync_record(o)
+ step.log = self.log
+
+ log.debug("Synced object", **o.tologdict())
+
+ model_accessor.update_diag(
+ syncrecord_start=time.time(), backend_status="Synced Record", backend_code=1)
+ o.enacted = new_enacted
+ scratchpad = {'next_run': 0, 'exponent': 0,
+ 'last_success': time.time()}
+ o.backend_register = json.dumps(scratchpad)
+ o.backend_status = "OK"
+ o.backend_code = 1
+ model_accessor.journal_object(o, "syncstep.call.save_update")
+ o.save(update_fields=['enacted', 'backend_status', 'backend_register'])
+ log.info("Saved sync object, new enacted", enacted = new_enacted)
+
+ """ This function needs a cleanup. FIXME: Rethink backend_status, backend_register """
+ def handle_sync_exception(self, o, e):
+ self.log.exception("sync step failed!", e = e, **o.tologdict())
+ current_code = o.backend_code
+
+ if hasattr(e, 'message'):
+ status = e.message
else:
- self.last_deletion_run_times[step.__name__] = time.time()
+ status = str(e)
- def check_schedule(self, step, deletion):
- last_run_times = self.last_run_times if not deletion else self.last_deletion_run_times
-
- time_since_last_run = time.time() - last_run_times.get(step.__name__, 0)
- try:
- if (time_since_last_run < step.requested_interval):
- raise StepNotReady
- except AttributeError:
- log.info(
- 'Step does not have requested_interval set',
- step_name = step.__name__)
- raise StepNotReady
-
- def load_run_times(self):
- try:
- jrun_times = open(
- '/tmp/%sobserver_run_times' %
- self.observer_name).read()
- self.last_run_times = json.loads(jrun_times)
- except:
- self.last_run_times = {}
- for e in self.ordered_steps:
- self.last_run_times[e] = 0
- try:
- jrun_times = open(
- '/tmp/%sobserver_deletion_run_times' %
- self.observer_name).read()
- self.last_deletion_run_times = json.loads(jrun_times)
- except:
- self.last_deletion_run_times = {}
- for e in self.ordered_steps:
- self.last_deletion_run_times[e] = 0
-
- def lookup_step_class(self, s):
- if ('#' in s):
- return NullSyncStep
+ if isinstance(e, InnocuousException) or isinstance(e, DeferredException):
+ code = 1
else:
- step = self.step_lookup[s]
- return step
+ code = 2
- def lookup_step(self, s):
- if ('#' in s):
- objname = s[1:]
- so = NullSyncStep()
+ self.set_object_error(o, status, code)
- obj = model_accessor.get_model_class(objname)
+ dependency_error = 'Failed due to error in model %s id %d: %s' % (
+ o.leaf_model_name, o.id, status)
+ return dependency_error, code
- so.provides = [obj]
- so.observes = [obj]
- step = so
+ def set_object_error(self, o, status, code):
+ if o.backend_status:
+ error_list = o.backend_status.split(' // ')
else:
- step_class = self.step_lookup[s]
- step = step_class(driver=self.driver, error_map=self.error_mapper)
- return step
+ error_list = []
- def save_run_times(self):
- run_times = json.dumps(self.last_run_times)
- open(
- '/tmp/%sobserver_run_times' %
- self.observer_name,
- 'w').write(run_times)
+ if status not in error_list:
+ error_list.append(status)
- deletion_run_times = json.dumps(self.last_deletion_run_times)
- open('/tmp/%sobserver_deletion_run_times' %
- self.observer_name, 'w').write(deletion_run_times)
+ # Keep last two errors
+ error_list = error_list[-2:]
- def check_class_dependency(self, step, failed_steps):
- step.dependenices = []
- for obj in step.provides:
- lst = self.model_dependency_graph.get(obj, [])
- nlst = map(lambda a_b1: a_b1[1], lst)
- step.dependenices.extend(nlst)
- for failed_step in failed_steps:
- if (failed_step in step.dependencies):
- raise StepNotReady
+ o.backend_code = code
+ o.backend_status = ' // '.join(error_list)
- def sync(self, S, deletion):
try:
- step = self.lookup_step_class(S)
- start_time = time.time()
+ scratchpad = json.loads(o.backend_register)
+ scratchpad['exponent']
+ except BaseException:
+ scratchpad = {'next_run': 0, 'exponent': 0,
+ 'last_success': time.time(), 'failures': 0}
- log.debug(
- "Starting to work on steps",
- step_name = step.__name__, deletion = str(deletion))
+ # Second failure
+ if (scratchpad['exponent']):
+ if code == 1:
+ delay = scratchpad['exponent'] * 60 # 1 minute
+ else:
+ delay = scratchpad['exponent'] * 600 # 10 minutes
- dependency_graph = self.dependency_graph if not deletion else self.deletion_dependency_graph
- # if not deletion else self.deletion_step_conditions
- step_conditions = self.step_conditions
- step_status = self.step_status # if not deletion else self.deletion_step_status
+ # cap delays at 8 hours
+ if (delay > 8 * 60 * 60):
+ delay = 8 * 60 * 60
+ scratchpad['next_run'] = time.time() + delay
- # Wait for step dependencies to be met
+ scratchpad['exponent'] += 1
+
+ try:
+ scratchpad['failures'] += 1
+ except KeyError:
+ scratchpad['failures'] = 1
+
+ scratchpad['last_failure'] = time.time()
+
+ o.backend_register = json.dumps(scratchpad)
+
+ # TOFIX:
+ # DatabaseError: value too long for type character varying(140)
+ if (model_accessor.obj_exists(o)):
try:
- deps = dependency_graph[S]
- has_deps = True
- except KeyError:
- has_deps = False
+ o.backend_status = o.backend_status[:1024]
+ o.save(update_fields=['backend_status',
+ 'backend_register', 'updated'])
+ except BaseException,e:
+ self.log.exception("Could not update backend status field!", e = e)
+ pass
- go = True
+ def sync_cohort(self, cohort, deletion):
+ log = self.log.bind(thread_id = threading.current_thread().ident)
+ try:
+ start_time = time.time()
+ log.debug("Starting to work on cohort", cohort = cohort, deletion = deletion)
- failed_dep = None
- if (has_deps):
- for d in deps:
- if d == step.__name__:
- go = True
+ cohort_emptied = False
+ dependency_error = None
+ dependency_error_code = None
+
+ itty = iter(cohort)
+
+ while not cohort_emptied:
+ try:
+ self.reset_model_accessor()
+ o = next(itty)
+
+ if dependency_error:
+ self.set_object_error(
+ o, dependency_error, dependency_error_code)
continue
- cond = step_conditions[d]
- cond.acquire()
- if (step_status[d] is STEP_STATUS_WORKING):
- cond.wait()
- elif step_status[d] == STEP_STATUS_OK:
- go = True
- else:
- go = False
- failed_dep = d
- cond.release()
- if (not go):
- break
- else:
- go = True
-
- if (not go):
- self.failed_steps.append(step)
- my_status = STEP_STATUS_KO
- else:
- sync_step = self.lookup_step(S)
- sync_step.__name__ = step.__name__
- sync_step.dependencies = []
- try:
- mlist = sync_step.provides
-
try:
- for m in mlist:
- lst = self.model_dependency_graph[m.__name__]
- nlst = map(lambda a_b: a_b[1], lst)
- sync_step.dependencies.extend(nlst)
- except Exception as e:
- raise e
+ if (deletion):
+ self.delete_record(o, log)
+ else:
+ self.sync_record(o, log)
+ except ExternalDependencyFailed:
+ dependency_error = 'External dependency on object %s id %d not met'%(o.__class__.__name__, o.id)
+ dependency_error_code = 1
+ except (DeferredException, InnocuousException, Exception) as e:
+ dependency_error, dependency_error_code = self.handle_sync_exception(
+ o, e)
- except KeyError:
- pass
- sync_step.debug_mode = debug_mode
-
- should_run = False
- try:
- # Various checks that decide whether
- # this step runs or not
- self.check_class_dependency(
- sync_step, self.failed_steps) # dont run Slices if Sites failed
- # dont run sync_network_routes if time since last run < 1
- # hour
- self.check_schedule(sync_step, deletion)
- should_run = True
- except StepNotReady:
- self.failed_steps.append(sync_step)
- my_status = STEP_STATUS_KO
- except Exception as e:
- log.error('%r' % e)
- self.failed_steps.append(sync_step)
- my_status = STEP_STATUS_KO
-
- if (should_run):
- try:
- duration = time.time() - start_time
-
- failed_objects = sync_step(
- failed=list(
- self.failed_step_objects),
- deletion=deletion)
-
- self.check_duration(sync_step, duration)
-
- if failed_objects:
- self.failed_step_objects.update(failed_objects)
-
- my_status = STEP_STATUS_OK
- self.update_run_time(sync_step, deletion)
- except Exception as e:
- self.failed_steps.append(S)
- my_status = STEP_STATUS_KO
- else:
- my_status = STEP_STATUS_OK
-
- try:
- my_cond = step_conditions[S]
- my_cond.acquire()
- step_status[S] = my_status
- my_cond.notify_all()
- my_cond.release()
- except KeyError as e:
- pass
+ except StopIteration:
+ log.debug("Cohort completed", cohort = cohort, deletion = deletion)
+ cohort_emptied = True
finally:
- try:
- model_accessor.reset_queries()
- except:
- # this shouldn't happen, but in case it does, catch it...
- log.error("exception in reset_queries")
-
+ self.reset_model_accessor()
model_accessor.connection_close()
def run(self):
+ # Cleanup: Move self.driver into a synchronizer context
+ # made available to every sync step.
if not self.driver.enabled:
return
while True:
- log.debug('Waiting for event')
+ self.log.debug('Waiting for event or timeout')
self.wait_for_event(timeout=5)
- log.debug('Observer woke up')
+ self.log.debug('Synchronizer awake')
self.run_once()
+ def fetch_pending(self, deletion=False):
+ unique_model_list = list(set(self.model_to_step.keys()))
+ pending_objects = []
+ pending_steps = []
+ step_list = self.step_lookup.values()
+
+ for e in self.external_dependencies:
+ s = SyncStep
+ s.observes = e
+ step_list.append(s)
+
+ for step_class in step_list:
+ step = step_class(driver=self.driver)
+ step.log = self.log.bind(step = step)
+
+ if not hasattr(step, 'call'):
+ pending = step.fetch_pending(deletion)
+ for obj in pending:
+ obj.synchronizer_step = step
+ pending_objects.extend(pending)
+ else:
+ # Support old and broken legacy synchronizers
+ # This needs to be dropped soon.
+ pending_steps.append(step)
+
+ self.log.debug('Fetched pending data', pending_objects = pending_objects, legacy_steps = pending_steps)
+ return pending_objects, pending_steps
+
+ """ Automatically test if a real dependency path exists between two objects. e.g.
+ given an Instance, and a ControllerSite, the test amounts to:
+ instance.slice.site == controller.site
+
+ Then the two objects are related, and should be put in the same cohort.
+ If the models of the two objects are not dependent, then the check trivially
+ returns False.
+ """
+
+ def concrete_path_exists(self, o1, o2):
+ try:
+ m1 = o1.leaf_model_name
+ m2 = o2.leaf_model_name
+ except AttributeError:
+ # One of the nodes is not in the dependency graph
+ # No dependency
+ return False
+
+ # FIXME: Dynamic dependency check
+ G = self.model_dependency_graph[False]
+ paths = all_shortest_paths(G, m1, m2)
+
+ try:
+ any(paths)
+ paths = all_shortest_paths(G, m1, m2)
+ except NetworkXNoPath:
+ # Easy. The two models are unrelated.
+ return False
+
+ for p in paths:
+ path_verdict = True
+ src_object = o1
+ da = None
+
+ for i in range(len(p) - 1):
+ src = p[i]
+ dst = p[i + 1]
+ edge_label = G[src][dst]
+ sa = edge_label['src_accessor']
+ da = edge_label['dst_accessor']
+ try:
+ dst_object = getattr(src_object, sa)
+ if dst_object and dst_object.leaf_model_name != dst and i != len(
+ p) - 2:
+ raise AttributeError
+ except AttributeError as e:
+ self.log.debug(
+ 'Could not check object dependencies, making conservative choice', src_object = src_object, sa = sa, o1 = o1, o2 = o2)
+ return True
+ src_object = dst_object
+
+ if src_object and ((not da and src_object == o2) or (
+ da and src_object == getattr(o2, da))):
+ return True
+
+ # Otherwise try other paths
+
+ return False
+
+ """
+
+ This function implements the main scheduling logic
+ of the Synchronizer. It divides incoming work (dirty objects)
+ into cohorts of dependent objects, and runs each such cohort
+ in its own thread.
+
+ Future work:
+
+ * Run event thread in parallel to the scheduling thread, and
+ add incoming objects to existing cohorts. Doing so should
+ greatly improve synchronizer performance.
+ * A single object might need to be added to multiple cohorts.
+ In this case, the last cohort handles such an object.
+ * This algorithm is horizontal-scale-ready. Multiple synchronizers
+ could run off a shared runqueue of cohorts.
+
+ """
+
+ def compute_dependent_cohorts(self, objects, deletion):
+ model_map = defaultdict(list)
+ n = len(objects)
+ r = range(n)
+ indexed_objects = zip(r, objects)
+
+ mG = self.model_dependency_graph[deletion]
+
+ oG = DiGraph()
+
+ for i in r:
+ oG.add_node(i)
+
+ for v0, v1 in mG.edges():
+ try:
+ for i0 in range(n):
+ for i1 in range(n):
+ if i0 != i1:
+ if not deletion and self.concrete_path_exists(
+ objects[i0], objects[i1]):
+ oG.add_edge(i0, i1)
+ elif deletion and self.concrete_path_exists(objects[i1], objects[i0]):
+ oG.add_edge(i0, i1)
+ except KeyError:
+ pass
+
+ components = weakly_connected_component_subgraphs(oG)
+ cohort_indexes = [reversed(topological_sort(g)) for g in components]
+ cohorts = [[objects[i] for i in cohort_index]
+ for cohort_index in cohort_indexes]
+
+ return cohorts
+
def run_once(self):
try:
+ # Why are we checking the DB connection here?
model_accessor.check_db_connection_okay()
loop_start = time.time()
- error_map_file = Config.get('error_map_path')
- self.error_mapper = ErrorMapper(error_map_file)
# Two passes. One for sync, the other for deletion.
- for deletion in [False, True]:
- # Set of individual objects within steps that failed
- self.failed_step_objects = set()
+ for deletion in (False, True):
+ objects_to_process = []
- # Set up conditions and step status
- # This is needed for steps to run in parallel
- # while obeying dependencies.
-
- providers = set()
- dependency_graph = self.dependency_graph if not deletion else self.deletion_dependency_graph
-
- for v in dependency_graph.values():
- if (v):
- providers.update(v)
-
- self.step_conditions = {}
- self.step_status = {}
-
- for p in list(providers):
- self.step_conditions[p] = threading.Condition()
-
- self.step_status[p] = STEP_STATUS_WORKING
-
- self.failed_steps = []
+ objects_to_process, steps_to_process = self.fetch_pending(deletion)
+ dependent_cohorts = self.compute_dependent_cohorts(
+ objects_to_process, deletion)
threads = []
- log.debug('Deletion', deletion =deletion)
- schedule = self.ordered_steps if not deletion else reversed(
- self.ordered_steps)
+ self.log.debug('In run once inner loop', deletion = deletion)
- for S in schedule:
+ for cohort in dependent_cohorts:
thread = threading.Thread(
- target=self.sync, name='synchronizer', args=(
- S, deletion))
+ target=self.sync_cohort, name='synchronizer', args=(
+ cohort, deletion))
- log.debug('Deletion', deletion =deletion)
threads.append(thread)
# Start threads
for t in threads:
t.start()
- # another spot to clean up debug state
- try:
- model_accessor.reset_queries()
- except:
- # this shouldn't happen, but in case it does, catch it...
- log.exception("exception in reset_queries")
+ self.reset_model_accessor()
# Wait for all threads to finish before continuing with the run
# loop
for t in threads:
t.join()
- self.save_run_times()
+ # Run legacy synchronizers, which do everything in call()
+ for step in steps_to_process:
+ try:
+ step.call()
+ except Exception,e:
+ self.log.exception("Legacy step failed", step = step, e = e)
loop_end = time.time()
model_accessor.update_diag(
loop_end=loop_end,
loop_start=loop_start,
- backend_status="1 - Bottom Of Loop")
+ backend_code=1,
+ backend_status="Bottom Of Loop")
except Exception as e:
- traceback.print_exc()
- model_accessor.update_diag(backend_status="2 - Exception in Event Loop")
+ self.log.exception(
+ 'Core error. This seems like a misconfiguration or bug. This error will not be relayed to the user!',
+ e = e)
+ self.log.error("Exception in observer run loop")
+
+ model_accessor.update_diag(
+ backend_code=2,
+ backend_status="Exception in Event Loop")
diff --git a/xos/synchronizers/new_base/event_manager.py b/xos/synchronizers/new_base/event_manager.py
index 77114c8..54bba5b 100644
--- a/xos/synchronizers/new_base/event_manager.py
+++ b/xos/synchronizers/new_base/event_manager.py
@@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
# FIXME Appear that a lot of unused code sits in here
import threading
diff --git a/xos/synchronizers/new_base/model_policy_loop.py b/xos/synchronizers/new_base/model_policy_loop.py
index 6feb65f..a2ad6a9 100644
--- a/xos/synchronizers/new_base/model_policy_loop.py
+++ b/xos/synchronizers/new_base/model_policy_loop.py
@@ -29,10 +29,11 @@
log = create_logger(Config().get('logging'))
class XOSPolicyEngine(object):
- def __init__(self, policies_dir):
+ def __init__(self, policies_dir, log = log):
self.model_policies = self.load_model_policies(policies_dir)
self.policies_by_name = {}
self.policies_by_class = {}
+ self.log = log
for policy in self.model_policies:
if not policy.model_name in self.policies_by_name:
@@ -103,7 +104,7 @@
c.model = model_accessor.get_model_class(c.model_name)
policies.append(c)
- log.info("Loaded model policies", count = len(policies))
+ log.info("Loaded model policies", policies = policies)
return policies
def execute_model_policy(self, instance, action):
diff --git a/xos/synchronizers/new_base/syncstep.py b/xos/synchronizers/new_base/syncstep.py
index 00f103e..8aa2afa 100644
--- a/xos/synchronizers/new_base/syncstep.py
+++ b/xos/synchronizers/new_base/syncstep.py
@@ -16,19 +16,17 @@
import os
import base64
+
from xosconfig import Config
from synchronizers.new_base.modelaccessor import *
from synchronizers.new_base.ansible_helper import run_template
+#from tests.steps.mock_modelaccessor import model_accessor
import json
import time
import pdb
from xosconfig import Config
-from multistructlog import create_logger
-
-log = create_logger(Config().get('logging'))
-
def f7(seq):
seen = set()
@@ -107,58 +105,8 @@
return model_accessor.fetch_pending(self.observes, deletion)
- def check_dependencies(self, obj, failed):
- for dep in self.dependencies:
- peer_name = dep[0].lower() + dep[1:] # django names are camelCased with the first letter lower
-
- peer_objects = []
- try:
- peer_names = plural(peer_name)
- peer_object_list = []
-
- try:
- peer_object_list.append(deepgetattr(obj, peer_name))
- except:
- pass
-
- try:
- peer_object_list.append(deepgetattr(obj, peer_names))
- except:
- pass
-
- for peer_object in peer_object_list:
- try:
- peer_objects.extend(peer_object.all())
- except AttributeError:
- peer_objects.append(peer_object)
- except:
- peer_objects = []
-
- # if (hasattr(obj,'controller')):
- # try:
- # peer_objects = filter(lambda o:o.controller==obj.controller, peer_objects)
- # except AttributeError:
- # pass
-
- if (model_accessor.obj_in_list(failed, peer_objects)):
- if (obj.backend_status != failed.backend_status):
- obj.backend_status = failed.backend_status
- obj.save(update_fields=['backend_status'])
- raise FailedDependency("Failed dependency for %s:%s peer %s:%s failed %s:%s" % (
- obj_class_name(obj), str(getattr(obj, "pk", "no_pk")), obj_class_name(peer_object),
- str(getattr(peer_object, "pk", "no_pk")), obj_class_name(failed), str(getattr(failed, "pk", "no_pk"))))
-
def sync_record(self, o):
- log.debug("Sync_record called for", class_name = obj_class_name(o), object = str(o))
-
- # try:
- # controller = o.get_controller()
- # controller_register = json.loads(controller.backend_register)
- #
- # if (controller_register.get('disabled',False)):
- # raise InnocuousException('Controller %s is disabled'%controller.name)
- # except AttributeError:
- # pass
+ self.log.debug("In default sync record", **o.tologdict())
tenant_fields = self.map_sync_inputs(o)
if tenant_fields == SyncStep.SYNC_WITHOUT_RUNNING:
@@ -174,15 +122,10 @@
if hasattr(self, "map_sync_outputs"):
self.map_sync_outputs(o, res)
+ self.log.debug("Finished default sync record", **o.tologdict())
+
def delete_record(self, o):
- # try:
- # controller = o.get_controller()
- # controller_register = json.loads(o.node.site_deployment.controller.backend_register)
- #
- # if (controller_register.get('disabled',False)):
- # raise InnocuousException('Controller %s is disabled'%sliver.node.site_deployment.controller.name)
- # except AttributeError:
- # pass
+ self.log.debug("In default delete record", **o.tologdict())
# If there is no map_delete_inputs, then assume deleting a record is a no-op.
if not hasattr(self, "map_delete_inputs"):
@@ -203,149 +146,4 @@
except AttributeError:
pass
- def call(self, failed=[], deletion=False):
- pending = self.fetch_pending(deletion)
-
- for o in pending:
- # another spot to clean up debug state
- try:
- model_accessor.reset_queries()
- except Exception,e:
- # this shouldn't happen, but in case it does, catch it...
- log.exception("exception in reset_queries", e = e,**o.tologdict())
-
- sync_failed = False
-
- backoff_disabled = Config.get("backoff_disabled")
-
- try:
- scratchpad = json.loads(o.backend_register)
- if (scratchpad):
- next_run = scratchpad['next_run']
- if (not backoff_disabled and next_run > time.time()):
- sync_failed = True
- except Exception,e:
- log.exception("Exception while loading scratchpad", e = e, **o.tologdict())
- pass
-
- if (not sync_failed):
- try:
- for f in failed:
- self.check_dependencies(o, f) # Raises exception if failed
- if (deletion):
- if getattr(o, "backend_need_reap", False):
- # the object has already been deleted and marked for reaping
- model_accessor.journal_object(o, "syncstep.call.already_marked_reap")
- else:
- model_accessor.journal_object(o, "syncstep.call.delete_record")
- self.delete_record(o)
- model_accessor.journal_object(o, "syncstep.call.delete_set_reap")
- o.backend_need_reap = True
- o.save(update_fields=['backend_need_reap'])
- # o.delete(purge=True)
- else:
- new_enacted = model_accessor.now()
- try:
- run_always = self.run_always
- except AttributeError:
- run_always = False
-
- # Mark this as an object that will require delete. Do
- # this now rather than after the syncstep,
- if not (o.backend_need_delete):
- o.backend_need_delete = True
- o.save(update_fields=['backend_need_delete'])
-
- model_accessor.journal_object(o, "syncstep.call.sync_record")
- self.sync_record(o)
-
- model_accessor.update_diag(syncrecord_start=time.time(), backend_status="1 - Synced Record")
- o.enacted = new_enacted
- scratchpad = {'next_run': 0, 'exponent': 0, 'last_success': time.time()}
- o.backend_register = json.dumps(scratchpad)
- o.backend_status = "1 - OK"
- model_accessor.journal_object(o, "syncstep.call.save_update")
- o.save(update_fields=['enacted', 'backend_status', 'backend_register'])
- log.info("save sync object, new enacted registered", enacted = str(new_enacted))
- except (InnocuousException, Exception, DeferredException) as e:
- log.exception("sync step failed!", **o.tologdict())
- try:
- if (o.backend_status.startswith('2 - ')):
- str_e = '%s // %r' % (o.backend_status[4:], e)
- str_e = elim_dups(str_e)
- else:
- str_e = '%r' % e
- except:
- str_e = '%r' % e
-
- try:
- error = self.error_map.map(str_e)
- except:
- error = '%s' % str_e
-
- if isinstance(e, InnocuousException):
- o.backend_status = '1 - %s' % error
- else:
- o.backend_status = '2 - %s' % error
-
- try:
- scratchpad = json.loads(o.backend_register)
- scratchpad['exponent']
- except Exception,e:
- log.exception("Exception while updating scratchpad", e = e, **o.tologdict())
- scratchpad = {'next_run': 0, 'exponent': 0, 'last_success': time.time(), 'failures': 0}
-
- # Second failure
- if (scratchpad['exponent']):
- if isinstance(e, DeferredException):
- delay = scratchpad['exponent'] * 60 # 1 minute
- else:
- delay = scratchpad['exponent'] * 600 # 10 minutes
- # cap delays at 8 hours
- if (delay > 8 * 60 * 60):
- delay = 8 * 60 * 60
- scratchpad['next_run'] = time.time() + delay
-
- try:
- scratchpad['exponent'] += 1
- except:
- scratchpad['exponent'] = 1
-
- try:
- scratchpad['failures'] += 1
- except KeyError:
- scratchpad['failures'] = 1
-
- scratchpad['last_failure'] = time.time()
-
- o.backend_register = json.dumps(scratchpad)
-
- # TOFIX:
- # DatabaseError: value too long for type character varying(140)
- if (model_accessor.obj_exists(o)):
- try:
- o.backend_status = o.backend_status[:1024]
- o.save(update_fields=['backend_status', 'backend_register', 'updated'])
- except:
- print "Could not update backend status field!"
- pass
- sync_failed = True
-
- if (sync_failed):
- failed.append(o)
-
- return failed
-
- def __call__(self, **args):
- return self.call(**args)
-
-
-# TODO: What does this do? It seems like it's just going to toss exceptions.
-
-class NullSyncStep(SyncStep): # was SyncObject
- provides = [] # Caller fills this in
- requested_interval = 0
- observes = [] # Caller fills this in
-
- def sync_record(self, r):
- raise DeferredException('Waiting for Service dependency: %r' % r)
+ self.log.debug("Finished default delete record", **o.tologdict())
diff --git a/xos/synchronizers/new_base/tests/__init__.py b/xos/synchronizers/new_base/tests/__init__.py
new file mode 100644
index 0000000..d4e8062
--- /dev/null
+++ b/xos/synchronizers/new_base/tests/__init__.py
@@ -0,0 +1,16 @@
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
diff --git a/xos/synchronizers/new_base/tests/model-deps-onos.yaml b/xos/synchronizers/new_base/tests/model-deps-onos.yaml
new file mode 100644
index 0000000..a459052
--- /dev/null
+++ b/xos/synchronizers/new_base/tests/model-deps-onos.yaml
@@ -0,0 +1,21 @@
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+{
+ "ONOSApp": [
+ ["ONOSService", "", ""]
+ ]
+ }
diff --git a/xos/synchronizers/new_base/tests/model-deps.yaml b/xos/synchronizers/new_base/tests/model-deps.yaml
new file mode 100644
index 0000000..0c92522
--- /dev/null
+++ b/xos/synchronizers/new_base/tests/model-deps.yaml
@@ -0,0 +1,378 @@
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+{
+ "XOSBase": [
+
+ ],
+ "User": [
+ "ControllerSite",
+
+ "Site",
+ "ControllerDashboardView",
+
+ "DashboardView"
+
+ ],
+ "Privilege": [
+
+ ],
+ "AddressPool": [
+
+ "Service"
+
+ ],
+ "Controller": [
+
+ "Deployment"
+
+ ],
+ "ControllerDashboardView": [
+
+ "Controller",
+ "ControllerDashboardView",
+
+ "DashboardView"
+
+ ],
+ "ControllerImages": [
+
+ "Image",
+
+ "Controller"
+
+ ],
+ "ControllerNetwork": [
+ "ControllerNetwork",
+
+ "Network",
+
+ "Controller"
+
+ ],
+ "ControllerRole": [
+
+ ],
+ "ControllerSite": [
+ "ControllerSite",
+
+ "Site",
+
+ "Controller"
+
+ ],
+ "ControllerPrivilege": [
+
+ "Controller",
+ "ControllerPrivilege",
+
+ "Privilege"
+
+ ],
+ "ControllerSitePrivilege": [
+
+ "Controller",
+ "ControllerSitePrivilege",
+
+ "SitePrivilege"
+
+ ],
+ "ControllerSlice": [
+
+ "Controller",
+ "ControllerSlice",
+
+ "Slice"
+
+ ],
+ "ControllerSlicePrivilege": [
+
+ "Controller",
+ "ControllerSlicePrivilege",
+
+ "SlicePrivilege"
+
+ ],
+ "ControllerUser": [
+ "ControllerUser",
+
+ "User",
+
+ "Controller"
+
+ ],
+ "DashboardView": [
+
+ "Controller",
+
+ "Deployment"
+
+ ],
+ "Deployment": [
+
+ ],
+ "DeploymentPrivilege": [
+ "ControllerUser",
+
+ "User",
+
+ "Deployment",
+
+ "DeploymentRole"
+
+ ],
+ "DeploymentRole": [
+
+ ],
+ "Diag": [
+
+ ],
+ "Flavor": [
+
+ ],
+ "Image": [
+
+ ],
+ "ImageDeployments": [
+
+ "Image",
+
+ "Deployment"
+
+ ],
+ "Instance": [
+
+ "Image",
+ "ControllerUser",
+
+ "User",
+ "ControllerSlice",
+
+ "Slice",
+
+ "Deployment",
+
+ "Node",
+
+ "Flavor",
+
+ "Instance"
+
+ ],
+ "Network": [
+
+ "NetworkTemplate",
+ "ControllerSlice",
+
+ "Slice",
+ "ControllerSlice",
+
+ "Slice",
+ "ControllerSlice",
+
+ "Slice",
+
+ "Instance"
+
+ ],
+ "NetworkParameter": [
+
+ "NetworkParameterType"
+
+ ],
+ "NetworkParameterType": [
+
+ ],
+ "NetworkSlice": [
+ "ControllerNetwork",
+
+ "Network",
+ "ControllerSlice",
+
+ "Slice"
+
+ ],
+ "NetworkTemplate": [
+
+ ],
+ "Node": [
+
+ "SiteDeployment"
+
+ ],
+ "NodeLabel": [
+
+ "Node"
+
+ ],
+ "Port": [
+ "ControllerNetwork",
+
+ "Network",
+
+ "Instance"
+
+ ],
+ "Role": [
+
+ ],
+ "Service": [
+
+ ],
+ "ServiceAttribute": [
+
+ "Service"
+
+ ],
+ "ServiceDependency": [
+
+ "Service",
+
+ "Service"
+
+ ],
+ "ServiceMonitoringAgentInfo": [
+
+ "Service"
+
+ ],
+ "ServicePrivilege": [
+ "ControllerUser",
+
+ "User",
+
+ "Service",
+
+ "ServiceRole"
+
+ ],
+ "ServiceRole": [
+
+ ],
+ "Site": [
+
+ "Deployment"
+
+ ],
+ "SiteDeployment": [
+ "ControllerSite",
+
+ "Site",
+
+ "Deployment",
+
+ "Controller"
+
+ ],
+ "SitePrivilege": [
+ "ControllerUser",
+
+ "User",
+ "ControllerSite",
+
+ "Site",
+
+ "SiteRole"
+
+ ],
+ "SiteRole": [
+
+ ],
+ "Slice": [
+ "ControllerSite",
+
+ "Site",
+
+ "Service",
+ "ControllerUser",
+
+ "User",
+
+ "Flavor",
+
+ "Image",
+
+ "Node"
+
+ ],
+ "SlicePrivilege": [
+ "ControllerUser",
+
+ "User",
+ "ControllerSlice",
+
+ "Slice",
+
+ "SliceRole"
+
+ ],
+ "SliceRole": [
+
+ ],
+ "Tag": [
+
+ "Service"
+
+ ],
+ "InterfaceType": [
+
+ ],
+ "ServiceInterface": [
+
+ "Service",
+
+ "InterfaceType"
+
+ ],
+ "ServiceInstance": [
+
+ "Service"
+
+ ],
+ "ServiceInstanceLink": [
+
+ "ServiceInstance",
+
+ "ServiceInterface",
+
+ "ServiceInstance",
+
+ "Service",
+ "ControllerNetwork",
+
+ "Network"
+
+ ],
+ "ServiceInstanceAttribute": [
+
+ "ServiceInstance"
+
+ ],
+ "TenantWithContainer": [
+
+ "Instance",
+ "ControllerUser",
+
+ "User"
+
+ ],
+ "XOS": [
+
+ ],
+ "XOSGuiExtension": [
+
+ ]
+}
diff --git a/xos/synchronizers/new_base/tests/steps/mock_modelaccessor.py b/xos/synchronizers/new_base/tests/steps/mock_modelaccessor.py
new file mode 100644
index 0000000..c390327
--- /dev/null
+++ b/xos/synchronizers/new_base/tests/steps/mock_modelaccessor.py
@@ -0,0 +1,1321 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from mock import Mock
+import random
+
+class Object:
+ objects = Mock()
+ def __init__(self, **kwargs):
+ for (k,v) in kwargs.items():
+ setattr(self,k,v)
+
+ setattr(self, 'save', Mock())
+ setattr(self, 'delete', Mock())
+ setattr(self, 'backend_code', 0)
+ setattr(self, 'id', 98052)
+
+ def tologdict(self):
+ return {}
+
+class ONOSService(Object):
+ pass
+
+class ONOSTenant(Object):
+ pass
+
+class ModelAccessor:
+ def check_db_connection_ok(self):
+ return True
+
+ def fetch_pending(self, model, deleted = False):
+ num = random.randint(1,5)
+ object_list = []
+
+ for i in range(num):
+ if isinstance(model, list):
+ model = model[0]
+
+ try:
+ obj = model()
+ except:
+ import pdb
+ pdb.set_trace()
+
+ obj.name = "Opinionated Berry %d"%i
+ object_list.append(obj)
+
+ return object_list
+
+model_accessor = ModelAccessor()
+
+#####
+# DO NOT MODIFY THE CLASSES BELOW. THEY ARE AUTOGENERATED.
+#
+
+class XOSBase(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "Provisioning in progress"
+ backend_code = 0
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+
+ leaf_model_name = "XOSBase"
+class User(Object):
+ email = None
+ username = "Something"
+ password = "Something"
+ last_login = None
+ firstname = None
+ lastname = None
+ phone = None
+ user_url = None
+ site = None
+ public_key = None
+ is_active = True
+ is_admin = False
+ is_staff = True
+ is_readonly = False
+ is_registering = False
+ is_appuser = False
+ login_page = None
+ created = None
+ updated = None
+ enacted = None
+ policed = None
+ backend_status = "Provisioning in progress"
+ backend_need_delete = False
+ backend_need_reap = False
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ timezone = "America/New_York"
+ dashboards = None
+ policy_status = "0 - Policy in process"
+
+ leaf_model_name = "User"
+class Privilege(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ accessor_id = None
+ accessor_type = None
+ controller_id = None
+ object_id = None
+ object_type = None
+ permission = "all"
+ granted = None
+ expires = None
+
+ leaf_model_name = "Privilege"
+class AddressPool(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ name = None
+ addresses = None
+ gateway_ip = None
+ gateway_mac = None
+ cidr = None
+ inuse = None
+ service = None
+
+ leaf_model_name = "AddressPool"
+class Controller(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ name = None
+ backend_type = None
+ version = None
+ auth_url = None
+ admin_user = None
+ admin_password = None
+ admin_tenant = None
+ domain = None
+ rabbit_host = None
+ rabbit_user = None
+ rabbit_password = None
+ deployment = None
+
+ leaf_model_name = "Controller"
+class ControllerDashboardView(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ controller = None
+ dashboardView = None
+ enabled = True
+ url = None
+
+ leaf_model_name = "ControllerDashboardView"
+class ControllerImages(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ image = None
+ controller = None
+ glance_image_id = None
+
+ leaf_model_name = "ControllerImages"
+class ControllerNetwork(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ network = None
+ controller = None
+ subnet = None
+ start_ip = None
+ stop_ip = None
+ net_id = None
+ router_id = None
+ subnet_id = None
+ gateway = None
+ segmentation_id = None
+
+ leaf_model_name = "ControllerNetwork"
+class ControllerRole(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ role = None
+
+ leaf_model_name = "ControllerRole"
+class ControllerSite(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ site = None
+ controller = None
+ tenant_id = None
+
+ leaf_model_name = "ControllerSite"
+class ControllerPrivilege(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ controller = None
+ privilege = None
+ role_id = None
+
+ leaf_model_name = "ControllerPrivilege"
+class ControllerSitePrivilege(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ controller = None
+ site_privilege = None
+ role_id = None
+
+ leaf_model_name = "ControllerSitePrivilege"
+
+class ControllerSlice(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ controller = None
+ slice = None
+ tenant_id = None
+
+ leaf_model_name = "ControllerSlice"
+class ControllerSlicePrivilege(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ controller = None
+ slice_privilege = None
+ role_id = None
+
+ leaf_model_name = "ControllerSlicePrivilege"
+class ControllerUser(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ user = None
+ controller = None
+ kuser_id = None
+
+ leaf_model_name = "ControllerUser"
+class DashboardView(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ name = None
+ url = None
+ enabled = True
+ icon = "default-icon.png"
+ icon_active = "default-icon-active.png"
+ controllers = None
+ deployments = None
+
+ leaf_model_name = "DashboardView"
+class Deployment(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ name = None
+ accessControl = "allow all"
+
+ leaf_model_name = "Deployment"
+class DeploymentPrivilege(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ user = None
+ deployment = None
+ role = None
+
+ leaf_model_name = "DeploymentPrivilege"
+class DeploymentRole(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ role = None
+
+ leaf_model_name = "DeploymentRole"
+class Diag(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ name = None
+
+ leaf_model_name = "Diag"
+class Flavor(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ name = None
+ description = None
+ flavor = None
+
+ leaf_model_name = "Flavor"
+class Image(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ name = None
+ kind = "vm"
+ disk_format = None
+ container_format = None
+ path = None
+ tag = None
+
+ leaf_model_name = "Image"
+class ImageDeployments(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ image = None
+ deployment = None
+
+ leaf_model_name = "ImageDeployments"
+class Instance(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ instance_id = None
+ instance_uuid = None
+ name = None
+ instance_name = None
+ ip = None
+ image = None
+ creator = None
+ slice = None
+ deployment = None
+ node = None
+ numberCores = 0
+ flavor = None
+ userData = None
+ isolation = "vm"
+ volumes = None
+ parent = None
+
+ leaf_model_name = "Instance"
+class Network(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ name = None
+ template = None
+ subnet = None
+ start_ip = None
+ end_ip = None
+ ports = None
+ labels = None
+ owner = None
+ permit_all_slices = False
+ autoconnect = True
+ permitted_slices = None
+ slices = None
+ instances = None
+
+ leaf_model_name = "Network"
+class NetworkParameter(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ parameter = None
+ value = None
+ content_type = None
+ object_id = None
+
+ leaf_model_name = "NetworkParameter"
+class NetworkParameterType(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ name = None
+ description = None
+
+ leaf_model_name = "NetworkParameterType"
+class NetworkSlice(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ network = None
+ slice = None
+
+ leaf_model_name = "NetworkSlice"
+class NetworkTemplate(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ name = None
+ description = None
+ visibility = "private"
+ translation = "none"
+ access = None
+ shared_network_name = None
+ shared_network_id = None
+ topology_kind = "bigswitch"
+ controller_kind = None
+ vtn_kind = "PRIVATE"
+
+ leaf_model_name = "NetworkTemplate"
+class Node(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ name = None
+ site_deployment = None
+
+ leaf_model_name = "Node"
+class NodeLabel(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ name = None
+ node = None
+
+ leaf_model_name = "NodeLabel"
+class Port(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ network = None
+ instance = None
+ ip = None
+ port_id = None
+ mac = None
+ xos_created = False
+
+ leaf_model_name = "Port"
+class Role(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ role_type = None
+ role = None
+ description = None
+
+ leaf_model_name = "Role"
+class Service(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ description = None
+ enabled = True
+ kind = "generic"
+ name = None
+ versionNumber = None
+ published = True
+ view_url = None
+ icon_url = None
+ public_key = None
+ private_key_fn = None
+ service_specific_id = None
+ service_specific_attribute = None
+
+ leaf_model_name = "Service"
+class ServiceAttribute(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ name = None
+ value = None
+ service = None
+
+ leaf_model_name = "ServiceAttribute"
+class ServiceDependency(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ provider_service = None
+ subscriber_service = None
+ connect_method = "none"
+
+ leaf_model_name = "ServiceDependency"
+class ServiceMonitoringAgentInfo(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ name = None
+ service = None
+ target_uri = None
+
+ leaf_model_name = "ServiceMonitoringAgentInfo"
+class ServicePrivilege(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ user = None
+ service = None
+ role = None
+
+ leaf_model_name = "ServicePrivilege"
+class ServiceRole(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ role = None
+
+ leaf_model_name = "ServiceRole"
+class Site(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ name = None
+ site_url = None
+ enabled = True
+ hosts_nodes = True
+ hosts_users = True
+ longitude = None
+ latitude = None
+ login_base = None
+ is_public = True
+ abbreviated_name = None
+ deployments = None
+
+ leaf_model_name = "Site"
+class SiteDeployment(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ site = None
+ deployment = None
+ controller = None
+ availability_zone = None
+
+ leaf_model_name = "SiteDeployment"
+class SitePrivilege(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ user = None
+ site = None
+ role = None
+
+ leaf_model_name = "SitePrivilege"
+class SiteRole(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ role = None
+
+ leaf_model_name = "SiteRole"
+class Slice(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ name = None
+ enabled = True
+ description = None
+ slice_url = None
+ site = None
+ max_instances = 10
+ service = None
+ network = None
+ exposed_ports = None
+ creator = None
+ default_flavor = None
+ default_image = None
+ default_node = None
+ mount_data_sets = "GenBank"
+ default_isolation = "vm"
+
+ leaf_model_name = "Slice"
+class SlicePrivilege(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ user = None
+ slice = None
+ role = None
+
+ leaf_model_name = "SlicePrivilege"
+class SliceRole(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ role = None
+
+ leaf_model_name = "SliceRole"
+class Tag(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ service = None
+ name = None
+ value = None
+ content_type = None
+ object_id = None
+
+ leaf_model_name = "Tag"
+class InterfaceType(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ name = None
+ direction = None
+
+ leaf_model_name = "InterfaceType"
+class ServiceInterface(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ service = None
+ interface_type = None
+
+ leaf_model_name = "ServiceInterface"
+class ServiceInstance(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ name = None
+ owner = None
+ service_specific_id = None
+ service_specific_attribute = None
+
+ leaf_model_name = "ServiceInstance"
+class ServiceInstanceLink(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ provider_service_instance = None
+ provider_service_interface = None
+ subscriber_service_instance = None
+ subscriber_service = None
+ subscriber_network = None
+
+ leaf_model_name = "ServiceInstanceLink"
+class ServiceInstanceAttribute(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ name = None
+ value = None
+ service_instance = None
+
+ leaf_model_name = "ServiceInstanceAttribute"
+class TenantWithContainer(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ name = None
+ owner = None
+ service_specific_id = None
+ service_specific_attribute = None
+ instance = None
+ creator = None
+ external_hostname = None
+ external_container = None
+
+ leaf_model_name = "TenantWithContainer"
+class XOS(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ name = "XOS"
+
+ leaf_model_name = "XOS"
+class XOSGuiExtension(Object):
+ created = None
+ updated = "now()"
+ enacted = None
+ policed = None
+ backend_register = "{}"
+ backend_need_delete = False
+ backend_need_reap = False
+ backend_status = "0 - Provisioning in progress"
+ deleted = False
+ write_protect = False
+ lazy_blocked = False
+ no_sync = False
+ no_policy = False
+ policy_status = "0 - Policy in process"
+ leaf_model_name = None
+ name = None
+ files = None
+
+ leaf_model_name = "XOSGuiExtension"
+
+
diff --git a/xos/synchronizers/new_base/tests/steps/sync_container.py b/xos/synchronizers/new_base/tests/steps/sync_container.py
new file mode 100644
index 0000000..6eeb975
--- /dev/null
+++ b/xos/synchronizers/new_base/tests/steps/sync_container.py
@@ -0,0 +1,56 @@
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import hashlib
+import os
+import socket
+import sys
+import base64
+import time
+from synchronizers.new_base.SyncInstanceUsingAnsible import SyncInstanceUsingAnsible
+from synchronizers.new_base.syncstep import DeferredException
+from synchronizers.new_base.ansible_helper import run_template_ssh
+from mock_modelaccessor import *
+from synchronizers.new_base.syncstep import SyncStep
+
+# hpclibrary will be in steps/..
+parentdir = os.path.join(os.path.dirname(__file__),"..")
+sys.path.insert(0, parentdir)
+
+class SyncContainer(SyncInstanceUsingAnsible):
+ provides=[Instance]
+ observes=Instance
+ template_name = "sync_container.yaml"
+
+ def __init__(self, *args, **kwargs):
+ super(SyncContainer, self).__init__(*args, **kwargs)
+
+ def fetch_pending(self, deletion=False):
+ i = Instance()
+ i.name = "Spectacular Sponge"
+ j = Instance()
+ j.name = "Spontaneous Tent"
+ k = Instance()
+ k.name = "Embarrassed Cat"
+
+ objs = [i,j,k]
+ return objs
+
+ def sync_record(self, o):
+ pass
+
+ def delete_record(self, o):
+ pass
diff --git a/xos/synchronizers/new_base/tests/steps/sync_controller_images.py b/xos/synchronizers/new_base/tests/steps/sync_controller_images.py
new file mode 100644
index 0000000..c47b389
--- /dev/null
+++ b/xos/synchronizers/new_base/tests/steps/sync_controller_images.py
@@ -0,0 +1,53 @@
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import base64
+from synchronizers.new_base.syncstep import *
+from synchronizers.new_base.ansible_helper import *
+from mock_modelaccessor import *
+
+class SyncControllerImages(SyncStep):
+ provides=[ControllerImages]
+ observes = ControllerImages
+ requested_interval=0
+ playbook='sync_controller_images.yaml'
+
+ def fetch_pending(self, deleted):
+ ci = ControllerImages()
+ i = Image()
+ i.name = "Lush Loss"
+ ci.i = i
+ return [ci]
+
+ def map_sync_inputs(self, controller_image):
+ image_fields = {'endpoint':controller_image.controller.auth_url,
+ 'endpoint_v3': controller_image.controller.auth_url_v3,
+ 'admin_user':controller_image.controller.admin_user,
+ 'admin_password':controller_image.controller.admin_password,
+ 'domain': controller_image.controller.domain,
+ 'name':controller_image.image.name,
+ 'filepath':controller_image.image.path,
+ 'ansible_tag': '%s@%s'%(controller_image.image.name,controller_image.controller.name), # name of ansible playbook
+ }
+
+ return image_fields
+
+ def map_sync_outputs(self, controller_image, res):
+ image_id = res[0]['id']
+ controller_image.glance_image_id = image_id
+ controller_image.backend_status = '1 - OK'
+ controller_image.save()
diff --git a/xos/synchronizers/new_base/tests/steps/sync_controller_networks.py b/xos/synchronizers/new_base/tests/steps/sync_controller_networks.py
new file mode 100644
index 0000000..f4f31f2
--- /dev/null
+++ b/xos/synchronizers/new_base/tests/steps/sync_controller_networks.py
@@ -0,0 +1,73 @@
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import base64
+import struct
+import socket
+from netaddr import IPAddress, IPNetwork
+from synchronizers.new_base.syncstep import *
+from synchronizers.new_base.ansible_helper import *
+from mock_modelaccessor import *
+
+class SyncControllerNetworks(SyncStep):
+ requested_interval = 0
+ provides=[Network]
+ observes=ControllerNetwork
+ external_dependencies = [User]
+ playbook='sync_controller_networks.yaml'
+
+ def fetch_pending(self, deleted):
+ ci = ControllerNetwork()
+ i = Network()
+ i.name = "Lush Loss"
+ s = Slice()
+ s.name = "Ghastly Notebook"
+ i.owner = s
+ ci.i = i
+ return [ci]
+
+ def map_sync_outputs(self, controller_network,res):
+ network_id = res[0]['network']['id']
+ subnet_id = res[1]['subnet']['id']
+ controller_network.net_id = network_id
+ controller_network.subnet = self.cidr
+ controller_network.subnet_id = subnet_id
+ controller_network.backend_status = '1 - OK'
+ if not controller_network.segmentation_id:
+ controller_network.segmentation_id = str(self.get_segmentation_id(controller_network))
+ controller_network.save()
+
+ def map_sync_inputs(self, controller_network):
+ pass
+
+ def map_delete_inputs(self, controller_network):
+ network_name = controller_network.network.name
+ subnet_name = '%s-%d'%(network_name,controller_network.pk)
+ cidr = controller_network.subnet
+ network_fields = {'endpoint':controller_network.controller.auth_url,
+ 'admin_user':slice.creator.email, # XXX: FIXME
+ 'admin_project':slice.name, # XXX: FIXME
+ 'admin_password':slice.creator.remote_password,
+ 'name':network_name,
+ 'subnet_name':subnet_name,
+ 'ansible_tag':'%s-%s@%s'%(network_name,slice.slicename,controller_network.controller.name),
+ 'cidr':cidr,
+ 'delete':True
+ }
+
+ return network_fields
+
diff --git a/xos/synchronizers/new_base/tests/steps/sync_controller_site_privileges.py b/xos/synchronizers/new_base/tests/steps/sync_controller_site_privileges.py
new file mode 100644
index 0000000..680dc79
--- /dev/null
+++ b/xos/synchronizers/new_base/tests/steps/sync_controller_site_privileges.py
@@ -0,0 +1,92 @@
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import base64
+import json
+from synchronizers.new_base.syncstep import *
+from synchronizers.new_base.ansible_helper import *
+from mock_modelaccessor import *
+
+class SyncControllerSitePrivileges(SyncStep):
+ provides=[SitePrivilege]
+ requested_interval=0
+ observes=ControllerSitePrivilege
+ playbook='sync_controller_users.yaml'
+
+ def map_sync_inputs(self, controller_site_privilege):
+ controller_register = json.loads(controller_site_privilege.controller.backend_register)
+ if not controller_site_privilege.controller.admin_user:
+ return
+
+ roles = [controller_site_privilege.site_privilege.role.role]
+ # setup user home site roles at controller
+ if not controller_site_privilege.site_privilege.user.site:
+ raise Exception('Siteless user %s'%controller_site_privilege.site_privilege.user.email)
+ else:
+ # look up tenant id for the user's site at the controller
+ #ctrl_site_deployments = SiteDeployment.objects.filter(
+ # site_deployment__site=controller_site_privilege.user.site,
+ # controller=controller_site_privilege.controller)
+
+ #if ctrl_site_deployments:
+ # # need the correct tenant id for site at the controller
+ # tenant_id = ctrl_site_deployments[0].tenant_id
+ # tenant_name = ctrl_site_deployments[0].site_deployment.site.login_base
+ user_fields = {
+ 'endpoint':controller_site_privilege.controller.auth_url,
+ 'endpoint_v3': controller_site_privilege.controller.auth_url_v3,
+ 'domain': controller_site_privilege.controller.domain,
+ 'name': controller_site_privilege.site_privilege.user.email,
+ 'email': controller_site_privilege.site_privilege.user.email,
+ 'password': controller_site_privilege.site_privilege.user.remote_password,
+ 'admin_user': controller_site_privilege.controller.admin_user,
+ 'admin_password': controller_site_privilege.controller.admin_password,
+ 'ansible_tag':'%s@%s'%(controller_site_privilege.site_privilege.user.email.replace('@','-at-'),controller_site_privilege.controller.name),
+ 'admin_tenant': controller_site_privilege.controller.admin_tenant,
+ 'roles':roles,
+ 'tenant':controller_site_privilege.site_privilege.site.login_base}
+
+ return user_fields
+
+ def map_sync_outputs(self, controller_site_privilege, res):
+ # results is an array in which each element corresponds to an
+ # "ok" string received per operation. If we get as many oks as
+ # the number of operations we issued, that means a grand success.
+ # Otherwise, the number of oks tell us which operation failed.
+ controller_site_privilege.role_id = res[0]['id']
+ controller_site_privilege.save()
+
+ def delete_record(self, controller_site_privilege):
+ controller_register = json.loads(controller_site_privilege.controller.backend_register)
+ if (controller_register.get('disabled',False)):
+ raise InnocuousException('Controller %s is disabled'%controller_site_privilege.controller.name)
+
+ if controller_site_privilege.role_id:
+ driver = self.driver.admin_driver(controller=controller_site_privilege.controller)
+ user = ControllerUser.objects.get(
+ controller=controller_site_privilege.controller,
+ user=controller_site_privilege.site_privilege.user
+ )
+ site = ControllerSite.objects.get(
+ controller=controller_site_privilege.controller,
+ user=controller_site_privilege.site_privilege.user
+ )
+ driver.delete_user_role(
+ user.kuser_id,
+ site.tenant_id,
+ controller_site_privilege.site_prvilege.role.role
+ )
diff --git a/xos/synchronizers/new_base/tests/steps/sync_controller_sites.py b/xos/synchronizers/new_base/tests/steps/sync_controller_sites.py
new file mode 100644
index 0000000..cc7e357
--- /dev/null
+++ b/xos/synchronizers/new_base/tests/steps/sync_controller_sites.py
@@ -0,0 +1,84 @@
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import base64
+from synchronizers.new_base.syncstep import *
+from synchronizers.new_base.ansible_helper import *
+import json
+from mock_modelaccessor import *
+
+class SyncControllerSites(SyncStep):
+ requested_interval=0
+ provides=[Site]
+ observes=ControllerSite
+ playbook = 'sync_controller_sites.yaml'
+
+ def fetch_pending(self, deleted=False):
+ lobjs = super(SyncControllerSites, self).fetch_pending(deleted)
+
+ if not deleted:
+ # filter out objects with null controllers
+ lobjs = [x for x in lobjs if x.controller]
+
+ return lobjs
+
+ def map_sync_inputs(self, controller_site):
+ tenant_fields = {'endpoint':controller_site.controller.auth_url,
+ 'endpoint_v3': controller_site.controller.auth_url_v3,
+ 'domain': controller_site.controller.domain,
+ 'admin_user': controller_site.controller.admin_user,
+ 'admin_password': controller_site.controller.admin_password,
+ 'admin_tenant': controller_site.controller.admin_tenant,
+ 'ansible_tag': '%s@%s'%(controller_site.site.login_base,controller_site.controller.name), # name of ansible playbook
+ 'tenant': controller_site.site.login_base,
+ 'tenant_description': controller_site.site.name}
+ return tenant_fields
+
+ def map_sync_outputs(self, controller_site, res):
+ controller_site.tenant_id = res[0]['id']
+ controller_site.backend_status = '1 - OK'
+ controller_site.save()
+
+ def delete_record(self, controller_site):
+ controller_register = json.loads(controller_site.controller.backend_register)
+ if (controller_register.get('disabled',False)):
+ raise InnocuousException('Controller %s is disabled'%controller_site.controller.name)
+
+ if controller_site.tenant_id:
+ driver = self.driver.admin_driver(controller=controller_site.controller)
+ driver.delete_tenant(controller_site.tenant_id)
+
+ """
+ Ansible does not support tenant deletion yet
+
+ import pdb
+ pdb.set_trace()
+ template = os_template_env.get_template('delete_controller_sites.yaml')
+ tenant_fields = {'endpoint':controller_site.controller.auth_url,
+ 'admin_user': controller_site.controller.admin_user,
+ 'admin_password': controller_site.controller.admin_password,
+ 'admin_tenant': 'admin',
+ 'ansible_tag': 'controller_sites/%s@%s'%(controller_site.controller_site.site.login_base,controller_site.controller_site.deployment.name), # name of ansible playbook
+ 'tenant': controller_site.controller_site.site.login_base,
+ 'delete': True}
+
+ rendered = template.render(tenant_fields)
+ res = run_template('sync_controller_sites.yaml', tenant_fields)
+
+ if (len(res)!=1):
+ raise Exception('Could not assign roles for user %s'%tenant_fields['tenant'])
+ """
diff --git a/xos/synchronizers/new_base/tests/steps/sync_controller_slice_privileges.py b/xos/synchronizers/new_base/tests/steps/sync_controller_slice_privileges.py
new file mode 100644
index 0000000..e40e5c2
--- /dev/null
+++ b/xos/synchronizers/new_base/tests/steps/sync_controller_slice_privileges.py
@@ -0,0 +1,80 @@
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import base64
+import json
+from synchronizers.new_base.ansible_helper import *
+from mock_modelaccessor import *
+import syncstep
+
+class SyncControllerSlicePrivileges(syncstep.SyncStep):
+ provides=[SlicePrivilege]
+ requested_interval=0
+ observes=ControllerSlicePrivilege
+ playbook = 'sync_controller_users.yaml'
+
+ def map_sync_inputs(self, controller_slice_privilege):
+ if not controller_slice_privilege.controller.admin_user:
+ return
+
+ template = os_template_env.get_template('sync_controller_users.yaml')
+ roles = [controller_slice_privilege.slice_privilege.role.role]
+ # setup user home slice roles at controller
+ if not controller_slice_privilege.slice_privilege.user.site:
+ raise Exception('Sliceless user %s'%controller_slice_privilege.slice_privilege.user.email)
+ else:
+ user_fields = {
+ 'endpoint':controller_slice_privilege.controller.auth_url,
+ 'endpoint_v3': controller_slice_privilege.controller.auth_url_v3,
+ 'domain': controller_slice_privilege.controller.domain,
+ 'name': controller_slice_privilege.slice_privilege.user.email,
+ 'email': controller_slice_privilege.slice_privilege.user.email,
+ 'password': controller_slice_privilege.slice_privilege.user.remote_password,
+ 'admin_user': controller_slice_privilege.controller.admin_user,
+ 'admin_password': controller_slice_privilege.controller.admin_password,
+ 'ansible_tag':'%s@%s@%s'%(controller_slice_privilege.slice_privilege.user.email.replace('@','-at-'),controller_slice_privilege.slice_privilege.slice.name,controller_slice_privilege.controller.name),
+ 'admin_tenant': controller_slice_privilege.controller.admin_tenant,
+ 'roles':roles,
+ 'tenant':controller_slice_privilege.slice_privilege.slice.name}
+ return user_fields
+
+ def map_sync_outputs(self, controller_slice_privilege, res):
+ controller_slice_privilege.role_id = res[0]['id']
+ controller_slice_privilege.save()
+
+ def delete_record(self, controller_slice_privilege):
+ controller_register = json.loads(controller_slice_privilege.controller.backend_register)
+ if (controller_register.get('disabled',False)):
+ raise InnocuousException('Controller %s is disabled'%controller_slice_privilege.controller.name)
+
+ if controller_slice_privilege.role_id:
+ driver = self.driver.admin_driver(controller=controller_slice_privilege.controller)
+ user = ControllerUser.objects.filter(
+ controller_id=controller_slice_privilege.controller.id,
+ user_id=controller_slice_privilege.slice_privilege.user.id
+ )
+ user = user[0]
+ slice = ControllerSlice.objects.filter(
+ controller_id=controller_slice_privilege.controller.id,
+ user_id=controller_slice_privilege.slice_privilege.user.id
+ )
+ slice = slice[0]
+ driver.delete_user_role(
+ user.kuser_id,
+ slice.tenant_id,
+ controller_slice_privilege.slice_prvilege.role.role
+ )
diff --git a/xos/synchronizers/new_base/tests/steps/sync_controller_slices.py b/xos/synchronizers/new_base/tests/steps/sync_controller_slices.py
new file mode 100644
index 0000000..31196ff
--- /dev/null
+++ b/xos/synchronizers/new_base/tests/steps/sync_controller_slices.py
@@ -0,0 +1,49 @@
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import base64
+import syncstep
+from synchronizers.new_base.ansible_helper import *
+from mock_modelaccessor import *
+
+class SyncControllerSlices(syncstep.SyncStep):
+ provides=[Slice]
+ requested_interval=0
+ observes=ControllerSlice
+ playbook='sync_controller_slices.yaml'
+
+ def map_sync_inputs(self, controller_slice):
+ if getattr(controller_slice, 'force_fail',None):
+ raise Exception("Forced failure")
+ elif getattr(controller_slice, 'force_defer', None):
+ raise syncstep.DeferredException("Forced defer")
+
+ tenant_fields = {'endpoint': 'endpoint',
+ 'name':'Flagrant Haircut'
+ }
+
+ return tenant_fields
+
+ def map_sync_outputs(self, controller_slice, res):
+ controller_slice.save()
+
+
+ def map_delete_inputs(self, controller_slice):
+ tenant_fields = {'endpoint': 'endpoint',
+ 'name':'Conscientious Plastic',
+ 'delete': True}
+ return tenant_fields
diff --git a/xos/synchronizers/new_base/tests/steps/sync_controller_users.py b/xos/synchronizers/new_base/tests/steps/sync_controller_users.py
new file mode 100644
index 0000000..39fcb92
--- /dev/null
+++ b/xos/synchronizers/new_base/tests/steps/sync_controller_users.py
@@ -0,0 +1,69 @@
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import base64
+from synchronizers.new_base.syncstep import *
+from synchronizers.new_base.ansible_helper import *
+from mock_modelaccessor import *
+
+class SyncControllerUsers(SyncStep):
+ provides=[User]
+ requested_interval=0
+ observes=ControllerUser
+ playbook='sync_controller_users.yaml'
+
+ def map_sync_inputs(self, controller_user):
+ if not controller_user.controller.admin_user:
+ return
+
+ # All users will have at least the 'user' role at their home site/tenant.
+ # We must also check if the user should have the admin role
+
+ roles = ['user']
+ if controller_user.user.is_admin:
+ driver = self.driver.admin_driver(controller=controller_user.controller)
+ roles.append(driver.get_admin_role().name)
+
+ # setup user home site roles at controller
+ if not controller_user.user.site:
+ raise Exception('Siteless user %s'%controller_user.user.email)
+ else:
+ user_fields = {
+ 'endpoint':controller_user.controller.auth_url,
+ 'endpoint_v3': controller_user.controller.auth_url_v3,
+ 'domain': controller_user.controller.domain,
+ 'name': controller_user.user.email,
+ 'email': controller_user.user.email,
+ 'password': controller_user.user.remote_password,
+ 'admin_user': controller_user.controller.admin_user,
+ 'admin_password': controller_user.controller.admin_password,
+ 'ansible_tag':'%s@%s'%(controller_user.user.email.replace('@','-at-'),controller_user.controller.name),
+ 'admin_project': controller_user.controller.admin_tenant,
+ 'roles':roles,
+ 'project':controller_user.user.site.login_base
+ }
+ return user_fields
+
+ def map_sync_outputs(self, controller_user, res):
+ controller_user.kuser_id = res[0]['user']['id']
+ controller_user.backend_status = '1 - OK'
+ controller_user.save()
+
+ def delete_record(self, controller_user):
+ if controller_user.kuser_id:
+ driver = self.driver.admin_driver(controller=controller_user.controller)
+ driver.delete_user(controller_user.kuser_id)
diff --git a/xos/synchronizers/new_base/tests/steps/sync_images.py b/xos/synchronizers/new_base/tests/steps/sync_images.py
new file mode 100644
index 0000000..ea12459
--- /dev/null
+++ b/xos/synchronizers/new_base/tests/steps/sync_images.py
@@ -0,0 +1,29 @@
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import base64
+from mock_modelaccessor import *
+from synchronizers.new_base.syncstep import SyncStep
+
+class SyncImages(SyncStep):
+ provides=[Image]
+ requested_interval=0
+ observes=[Image]
+
+ def sync_record(self, role):
+ # do nothing
+ pass
diff --git a/xos/synchronizers/new_base/tests/steps/sync_instances.py b/xos/synchronizers/new_base/tests/steps/sync_instances.py
new file mode 100644
index 0000000..49dccb9
--- /dev/null
+++ b/xos/synchronizers/new_base/tests/steps/sync_instances.py
@@ -0,0 +1,66 @@
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import base64
+import socket
+from synchronizers.new_base.ansible_helper import *
+import syncstep
+from mock_modelaccessor import *
+
+RESTAPI_HOSTNAME = socket.gethostname()
+RESTAPI_PORT = "8000"
+
+def escape(s):
+ s = s.replace('\n', r'\n').replace('"', r'\"')
+ return s
+
+class SyncInstances(syncstep.SyncStep):
+ provides = [Instance]
+ requested_interval = 0
+ observes = Instance
+ playbook = 'sync_instances.yaml'
+
+ def fetch_pending(self, deletion=False):
+ objs = super(SyncInstances, self).fetch_pending(deletion)
+ objs = [x for x in objs if x.isolation == "vm"]
+ return objs
+
+ def map_sync_inputs(self, instance):
+ inputs = {}
+ metadata_update = {}
+
+ fields = {
+ 'name': instance.name,
+ 'delete': False,
+ }
+ return fields
+
+ def map_sync_outputs(self, instance, res):
+ instance.save()
+
+ def map_delete_inputs(self, instance):
+ input = {'endpoint': 'endpoint',
+ 'admin_user': 'admin_user',
+ 'admin_password': 'admin_password',
+ 'project_name': 'project_name',
+ 'tenant': 'tenant',
+ 'tenant_description': 'tenant_description',
+ 'name': instance.name,
+ 'ansible_tag': 'ansible_tag',
+ 'delete': True}
+
+ return input
diff --git a/xos/synchronizers/new_base/tests/steps/sync_ports.py b/xos/synchronizers/new_base/tests/steps/sync_ports.py
new file mode 100644
index 0000000..3d68293
--- /dev/null
+++ b/xos/synchronizers/new_base/tests/steps/sync_ports.py
@@ -0,0 +1,38 @@
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import base64
+from mock_modelaccessor import *
+from synchronizers.new_base.syncstep import SyncStep
+
+class SyncPort(SyncStep):
+ requested_interval = 0 # 3600
+ provides=[Port]
+ observes=Port
+
+ def call(self, failed=[], deletion=False):
+ if deletion:
+ self.delete_ports()
+ else:
+ self.sync_ports()
+
+ def sync_ports(self):
+ open('/tmp/sync_ports','w').write('Sync successful')
+
+
+ def delete_ports(self):
+ open('/tmp/delete_ports','w').write('Delete successful')
diff --git a/xos/synchronizers/new_base/tests/steps/sync_roles.py b/xos/synchronizers/new_base/tests/steps/sync_roles.py
new file mode 100644
index 0000000..0a91b33
--- /dev/null
+++ b/xos/synchronizers/new_base/tests/steps/sync_roles.py
@@ -0,0 +1,34 @@
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import base64
+from mock_modelaccessor import *
+import syncstep
+
+class SyncRoles(syncstep.SyncStep):
+ provides=[Role]
+ requested_interval=0
+ observes=[SiteRole,SliceRole,ControllerRole]
+
+ def sync_record(self, role):
+ if not role.enacted:
+ controllers = Controller.objects.all()
+ for controller in controllers:
+ driver = self.driver.admin_driver(controller=controller)
+ driver.create_role(role.role)
+ role.save()
+
diff --git a/xos/synchronizers/new_base/tests/test_config.yaml b/xos/synchronizers/new_base/tests/test_config.yaml
new file mode 100644
index 0000000..e5ec0d3
--- /dev/null
+++ b/xos/synchronizers/new_base/tests/test_config.yaml
@@ -0,0 +1,24 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+name: test-synchronizer
+accessor:
+ username: xosadmin@opencord.org
+ password: "sample"
+ kind: testframework
+logging:
+ version: 1
+dependency_graph: "model-deps.yaml"
+steps_dir: "tests/steps"
diff --git a/xos/synchronizers/new_base/tests/test_config_onos.yaml b/xos/synchronizers/new_base/tests/test_config_onos.yaml
new file mode 100644
index 0000000..8862ff6
--- /dev/null
+++ b/xos/synchronizers/new_base/tests/test_config_onos.yaml
@@ -0,0 +1,24 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+name: test-synchronizer
+accessor:
+ username: xosadmin@opencord.org
+ password: "sample"
+ kind: testframework
+logging:
+ version: 1
+dependency_graph: "tests/model-deps-onos.yaml"
+steps_dir: "tests/steps"
diff --git a/xos/synchronizers/new_base/tests/test_load.py b/xos/synchronizers/new_base/tests/test_load.py
new file mode 100644
index 0000000..575732b
--- /dev/null
+++ b/xos/synchronizers/new_base/tests/test_load.py
@@ -0,0 +1,85 @@
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import unittest
+from mock import patch
+import mock
+import pdb
+import networkx as nx
+
+import os, sys
+
+sys.path.append("../..")
+sys.path.append("../../new_base")
+config = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/test_config.yaml")
+from xosconfig import Config
+Config.init(config, 'synchronizer-config-schema.yaml')
+
+import synchronizers.new_base.modelaccessor
+from steps.mock_modelaccessor import *
+import event_loop
+import backend
+
+class TestScheduling(unittest.TestCase):
+ def setUp(self):
+ # self.policy = TenantWithContainerPolicy()
+ # self.user = User(email="testadmin@test.org")
+ # self.tenant = Tenant(creator=self.user)
+ # self.flavor = Flavor(name="m1.small")
+ # model_policy_tenantwithcontainer.Instance = Instance
+ # model_policy_tenantwithcontainer.Flavor = Flavor
+
+ b = backend.Backend()
+ steps_dir = Config.get("steps_dir")
+ self.steps = b.load_sync_step_modules(steps_dir)
+ self.synchronizer = event_loop.XOSObserver(self.steps)
+
+ def test_load_steps(self):
+ step_names = [s.__name__ for s in self.steps]
+ self.assertIn('SyncControllerSlices', step_names)
+
+ def test_load_deps(self):
+ self.synchronizer.load_dependency_graph()
+ graph = self.synchronizer.model_dependency_graph
+ self.assertTrue(graph[False].has_edge('Instance','Slice'))
+ self.assertTrue(graph[True].has_edge('Slice','Instance'))
+ self.assertTrue(graph[False].has_edge('Instance','ControllerSlice'))
+ self.assertTrue(graph[True].has_edge('ControllerSlice','Instance'))
+
+ def test_load_dep_accessors(self):
+ self.synchronizer.load_dependency_graph()
+ graph = self.synchronizer.model_dependency_graph
+ self.assertDictContainsSubset({'src_accessor': 'slice'}, graph[False]['Instance']['ControllerSlice'])
+ self.assertDictContainsSubset({'src_accessor': 'slice', 'dst_accessor': 'slice'}, graph[True]['ControllerSlice']['Instance'])
+
+ def test_load_sync_steps(self):
+ self.synchronizer.load_sync_steps()
+ model_to_step = self.synchronizer.model_to_step
+ step_lookup = self.synchronizer.step_lookup
+ self.assertIn(('ControllerSlice', ['SyncControllerSlices']), model_to_step.items())
+ self.assertIn(('SiteRole', ['SyncRoles']), model_to_step.items())
+
+ for k, v in model_to_step.items():
+ val = v[0]
+ observes = step_lookup[val].observes
+ if not isinstance(observes, list):
+ observes = [observes]
+
+ observed_names = [o.__name__ for o in observes]
+ self.assertIn(k, observed_names)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/xos/synchronizers/new_base/tests/test_payload.py b/xos/synchronizers/new_base/tests/test_payload.py
new file mode 100644
index 0000000..909c739
--- /dev/null
+++ b/xos/synchronizers/new_base/tests/test_payload.py
@@ -0,0 +1,205 @@
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import unittest
+from mock import patch
+import mock
+import pdb
+import networkx as nx
+
+import os, sys
+
+sys.path.append("../..")
+sys.path.append("../../new_base")
+config = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/test_config.yaml")
+from xosconfig import Config
+Config.init(config, 'synchronizer-config-schema.yaml')
+
+import synchronizers.new_base.modelaccessor
+from steps.mock_modelaccessor import *
+import mock
+import event_loop
+import backend
+import json
+
+import steps.sync_instances
+import steps.sync_controller_slices
+
+ANSIBLE_FILE='/tmp/payload_test'
+
+def run_fake_ansible_template(*args,**kwargs):
+ opts = args[1]
+ open(ANSIBLE_FILE,'w').write(json.dumps(opts))
+
+def get_ansible_output():
+ ansible_str = open(ANSIBLE_FILE).read()
+ return json.loads(ansible_str)
+
+class TestPayload(unittest.TestCase):
+ def setUp(self):
+ b = backend.Backend()
+ steps_dir = Config.get("steps_dir")
+ self.steps = b.load_sync_step_modules(steps_dir)
+ self.synchronizer = event_loop.XOSObserver(self.steps)
+
+ @mock.patch("steps.sync_instances.syncstep.run_template",side_effect=run_fake_ansible_template)
+ @mock.patch("event_loop.model_accessor")
+ def test_delete_record(self, mock_run_template, mock_modelaccessor):
+ o = Instance()
+ o.name = "Sisi Pascal"
+
+ o.synchronizer_step = steps.sync_instances.SyncInstances()
+ self.synchronizer.delete_record(o)
+
+ a = get_ansible_output()
+ self.assertDictContainsSubset({'delete':True, 'name':o.name}, a)
+ o.save.assert_called_with(update_fields=['backend_need_reap'])
+
+ @mock.patch("steps.sync_instances.syncstep.run_template",side_effect=run_fake_ansible_template)
+ @mock.patch("event_loop.model_accessor")
+ def test_sync_record(self, mock_run_template, mock_modelaccessor):
+ o = Instance()
+ o.name = "Sisi Pascal"
+
+ o.synchronizer_step = steps.sync_instances.SyncInstances()
+ self.synchronizer.sync_record(o)
+
+ a = get_ansible_output()
+ self.assertDictContainsSubset({'delete':False, 'name':o.name}, a)
+ o.save.assert_called_with(update_fields=['enacted', 'backend_status', 'backend_register'])
+
+ @mock.patch("steps.sync_instances.syncstep.run_template",side_effect=run_fake_ansible_template)
+ @mock.patch("event_loop.model_accessor")
+ def test_sync_cohort(self, mock_run_template, mock_modelaccessor):
+ cs = ControllerSlice()
+ s = Slice(name = 'SP SP')
+ cs.slice = s
+
+ o = Instance()
+ o.name = "Sisi Pascal"
+ o.slice = s
+
+ cohort = [cs, o]
+ o.synchronizer_step = steps.sync_instances.SyncInstances()
+ cs.synchronizer_step = steps.sync_controller_slices.SyncControllerSlices()
+
+ self.synchronizer.sync_cohort(cohort, False)
+
+ a = get_ansible_output()
+ self.assertDictContainsSubset({'delete':False, 'name':o.name}, a)
+ o.save.assert_called_with(update_fields=['enacted', 'backend_status', 'backend_register'])
+ cs.save.assert_called_with(update_fields=['enacted', 'backend_status', 'backend_register'])
+
+ @mock.patch("steps.sync_instances.syncstep.run_template",side_effect=run_fake_ansible_template)
+ @mock.patch("event_loop.model_accessor")
+ def test_deferred_exception(self, mock_run_template, mock_modelaccessor):
+ cs = ControllerSlice()
+ s = Slice(name = 'SP SP')
+ cs.slice = s
+ cs.force_defer = True
+
+ o = Instance()
+ o.name = "Sisi Pascal"
+ o.slice = s
+
+ cohort = [cs, o]
+ o.synchronizer_step = steps.sync_instances.SyncInstances()
+ cs.synchronizer_step = steps.sync_controller_slices.SyncControllerSlices()
+
+ self.synchronizer.sync_cohort(cohort, False)
+ o.save.assert_called_with(update_fields=['backend_status', 'backend_register','updated'])
+ self.assertEqual(cs.backend_code, 1)
+
+ self.assertIn('Force', cs.backend_status)
+ self.assertIn('Failed due to', o.backend_status)
+
+ @mock.patch("steps.sync_instances.syncstep.run_template",side_effect=run_fake_ansible_template)
+ @mock.patch("event_loop.model_accessor")
+ def test_backend_status(self, mock_run_template, mock_modelaccessor):
+ cs = ControllerSlice()
+ s = Slice(name = 'SP SP')
+ cs.slice = s
+ cs.force_fail = True
+
+ o = Instance()
+ o.name = "Sisi Pascal"
+ o.slice = s
+
+ cohort = [cs, o]
+ o.synchronizer_step = steps.sync_instances.SyncInstances()
+ cs.synchronizer_step = steps.sync_controller_slices.SyncControllerSlices()
+
+ self.synchronizer.sync_cohort(cohort, False)
+ o.save.assert_called_with(update_fields=['backend_status', 'backend_register','updated'])
+ self.assertIn('Force', cs.backend_status)
+ self.assertIn('Failed due to', o.backend_status)
+
+ @mock.patch("steps.sync_instances.syncstep.run_template",side_effect=run_fake_ansible_template)
+ @mock.patch("event_loop.model_accessor")
+ def test_fetch_pending(self, mock_run_template, mock_accessor, *_other_accessors):
+ pending_objects, pending_steps = self.synchronizer.fetch_pending()
+ pending_objects2 = list(pending_objects)
+
+ any_cs = next(obj for obj in pending_objects if obj.leaf_model_name == 'ControllerSlice')
+ any_instance = next(obj for obj in pending_objects2 if obj.leaf_model_name == 'Instance')
+
+ slice = Slice()
+ any_instance.slice = slice
+ any_cs.slice = slice
+
+ self.synchronizer.external_dependencies = []
+ cohorts = self.synchronizer.compute_dependent_cohorts(pending_objects, False)
+
+ self.assertEqual(len(cohorts), len(pending_objects) - 1)
+
+ @mock.patch("steps.sync_instances.syncstep.run_template",side_effect=run_fake_ansible_template)
+ @mock.patch("event_loop.model_accessor")
+ def test_fetch_pending_with_external_dependencies(self, mock_run_template, mock_accessor, *_other_accessors):
+ pending_objects, pending_steps = self.synchronizer.fetch_pending()
+ pending_objects2 = list(pending_objects)
+
+ self.synchronizer = event_loop.XOSObserver(self.steps)
+
+ any_cn = next(obj for obj in pending_objects if obj.leaf_model_name == 'ControllerNetwork')
+ any_user = next(obj for obj in pending_objects2 if obj.leaf_model_name == 'User')
+
+ cohorts = self.synchronizer.compute_dependent_cohorts(pending_objects, False)
+
+ self.assertEqual(len(cohorts), len(pending_objects))
+
+ # These cannot be None, but for documentation purposes
+ self.assertIsNotNone(any_cn)
+ self.assertIsNotNone(any_user)
+
+ @mock.patch("steps.sync_instances.syncstep.run_template",side_effect=run_fake_ansible_template)
+ @mock.patch("event_loop.model_accessor")
+ def test_external_dependency_exception(self, mock_run_template, mock_modelaccessor):
+ cs = ControllerSlice()
+ s = Slice(name = 'SP SP')
+ cs.slice = s
+
+ o = Instance()
+ o.name = "Sisi Pascal"
+ o.slice = s
+
+ cohort = [cs, o]
+ o.synchronizer_step = None
+ o.synchronizer_step = steps.sync_instances.SyncInstances()
+
+ self.synchronizer.sync_cohort(cohort, False)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/xos/synchronizers/new_base/tests/test_run.py b/xos/synchronizers/new_base/tests/test_run.py
new file mode 100644
index 0000000..1ca1a2c
--- /dev/null
+++ b/xos/synchronizers/new_base/tests/test_run.py
@@ -0,0 +1,82 @@
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import unittest
+from mock import patch
+import mock
+import pdb
+import networkx as nx
+
+import os, sys
+
+sys.path.append("../..")
+sys.path.append("../../new_base")
+config = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/test_config.yaml")
+from xosconfig import Config
+Config.init(config, 'synchronizer-config-schema.yaml')
+
+import synchronizers.new_base.modelaccessor
+from steps.mock_modelaccessor import *
+import mock
+import event_loop
+import backend
+import json
+
+import steps.sync_instances
+import steps.sync_controller_slices
+
+ANSIBLE_FILE='/tmp/payload_test'
+
+def run_fake_ansible_template(*args,**kwargs):
+ opts = args[1]
+ open(ANSIBLE_FILE,'w').write(json.dumps(opts))
+
+def get_ansible_output():
+ ansible_str = open(ANSIBLE_FILE).read()
+ return json.loads(ansible_str)
+
+class TestRun(unittest.TestCase):
+ def setUp(self):
+ b = backend.Backend()
+ steps_dir = Config.get("steps_dir")
+ self.steps = b.load_sync_step_modules(steps_dir)
+ self.synchronizer = event_loop.XOSObserver(self.steps)
+ os.remove('/tmp/sync_ports')
+ os.remove('/tmp/delete_ports')
+
+ @mock.patch("steps.sync_instances.syncstep.run_template",side_effect=run_fake_ansible_template)
+ @mock.patch("event_loop.model_accessor")
+ def test_run_once(self, mock_run_template, mock_accessor, *_other_accessors):
+ pending_objects, pending_steps = self.synchronizer.fetch_pending()
+ pending_objects2 = list(pending_objects)
+
+ any_cs = next(obj for obj in pending_objects if obj.leaf_model_name == 'ControllerSlice')
+ any_instance = next(obj for obj in pending_objects2 if obj.leaf_model_name == 'Instance')
+
+ slice = Slice()
+ any_instance.slice = slice
+ any_cs.slice = slice
+
+ self.synchronizer.run_once()
+ sync_ports = open('/tmp/sync_ports').read()
+ delete_ports = open('/tmp/delete_ports').read()
+
+ self.assertIn("successful", sync_ports)
+ self.assertIn("successful", delete_ports)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/xos/synchronizers/new_base/tests/test_scheduler.py b/xos/synchronizers/new_base/tests/test_scheduler.py
new file mode 100644
index 0000000..8bf5652
--- /dev/null
+++ b/xos/synchronizers/new_base/tests/test_scheduler.py
@@ -0,0 +1,172 @@
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import unittest
+from mock import patch
+import mock
+import pdb
+import networkx as nx
+
+import os, sys
+
+sys.path.append("../..")
+sys.path.append("../../new_base")
+config = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/test_config.yaml")
+from xosconfig import Config
+Config.init(config, 'synchronizer-config-schema.yaml')
+
+import synchronizers.new_base.modelaccessor
+from steps.mock_modelaccessor import *
+import event_loop
+import backend
+
+class TestScheduling(unittest.TestCase):
+ def setUp(self):
+ # self.policy = TenantWithContainerPolicy()
+ # self.user = User(email="testadmin@test.org")
+ # self.tenant = Tenant(creator=self.user)
+ # self.flavor = Flavor(name="m1.small")
+ # model_policy_tenantwithcontainer.Instance = Instance
+ # model_policy_tenantwithcontainer.Flavor = Flavor
+
+ b = backend.Backend()
+ steps_dir = Config.get("steps_dir")
+ self.steps = b.load_sync_step_modules(steps_dir)
+ self.synchronizer = event_loop.XOSObserver(self.steps)
+
+ def test_concrete_path_no_model_path(self):
+ p = Port()
+ n = NetworkParameter()
+ verdict = self.synchronizer.concrete_path_exists(p, n)
+ self.assertFalse(verdict)
+
+ def test_concrete_no_object_path_adjacent(self):
+ p = Instance()
+ s1 = Slice()
+ s2 = Slice()
+ p.slice = s2
+ verdict = self.synchronizer.concrete_path_exists(p, s1)
+
+ self.assertFalse(verdict)
+
+ def test_concrete_object_path_adjacent(self):
+ p = Instance()
+ s = Slice()
+ p.slice = s
+ verdict = self.synchronizer.concrete_path_exists(p, s)
+
+ self.assertTrue(verdict)
+
+ def test_concrete_object_controller_path_adjacent(self):
+ p = Instance()
+ q = Instance()
+ cs = ControllerSlice()
+ cs2 = ControllerSlice()
+ s1 = Slice()
+ s2 = Slice()
+ p.slice = s1
+ q.slice = s2
+ cs.slice = s1
+
+ verdict1 = self.synchronizer.concrete_path_exists(p, cs)
+ verdict2 = self.synchronizer.concrete_path_exists(q, cs)
+ verdict3 = self.synchronizer.concrete_path_exists(p, cs2)
+
+ self.assertTrue(verdict1)
+ self.assertFalse(verdict2)
+ self.assertFalse(verdict3)
+
+ def test_concrete_object_controller_path_distant(self):
+ p = Instance()
+ s = Slice()
+ t = Site()
+ ct = ControllerSite()
+ ct.site = t
+ p.slice = s
+ s.site = t
+ verdict = self.synchronizer.concrete_path_exists(p, ct)
+ self.assertTrue(verdict)
+
+ def test_concrete_object_path_distant(self):
+ p = Instance()
+ s = Slice()
+ t = Site()
+ p.slice = s
+ s.site = t
+ verdict = self.synchronizer.concrete_path_exists(p, t)
+ self.assertTrue(verdict)
+
+ def test_concrete_no_object_path_distant(self):
+ p = Instance()
+ s = Slice()
+ t = Site()
+ ct = ControllerSite()
+ ct.site = Site()
+ p.slice = s
+ s.site = t
+ verdict = self.synchronizer.concrete_path_exists(p, ct)
+ self.assertTrue(verdict)
+
+ def test_cohorting_independent(self):
+ i = Image()
+ p = Slice()
+ c = Instance()
+ cohorts = self.synchronizer.compute_dependent_cohorts([i,p,c], False)
+ self.assertEqual(len(cohorts), 3)
+
+ def test_cohorting_related(self):
+ i = Image()
+ p = Port()
+ c = Instance()
+ c.image = i
+ s = ControllerSlice()
+
+ cohorts = self.synchronizer.compute_dependent_cohorts([i,p,c,s], False)
+ self.assertIn([i,c], cohorts)
+ self.assertIn([p], cohorts)
+ self.assertIn([s], cohorts)
+
+ def test_cohorting_related_multi(self):
+ i = Image()
+ p = Port()
+ c = Instance()
+ c.image = i
+ cs = ControllerSlice()
+ s = Slice()
+ cs.slice = s
+ c.slice = s
+
+ cohorts = self.synchronizer.compute_dependent_cohorts([i,p,c,s,cs], False)
+
+ big_cohort = max(cohorts, key=len)
+ self.assertGreater(big_cohort.index(c), big_cohort.index(i))
+ self.assertGreater(big_cohort.index(cs), big_cohort.index(s))
+ self.assertIn([p], cohorts)
+
+ def test_cohorting_related_delete(self):
+ i = Image()
+ p = Port()
+ c = Instance()
+ c.image = i
+ s = ControllerSlice()
+
+ cohorts = self.synchronizer.compute_dependent_cohorts([i,p,c,s], True)
+ self.assertIn([c,i], cohorts)
+ self.assertIn([p], cohorts)
+ self.assertIn([s], cohorts)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/xos/synchronizers/new_base/tests/test_services.py b/xos/synchronizers/new_base/tests/test_services.py
new file mode 100644
index 0000000..d3b67c7
--- /dev/null
+++ b/xos/synchronizers/new_base/tests/test_services.py
@@ -0,0 +1,55 @@
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import unittest
+from mock import patch
+import mock
+import pdb
+import networkx as nx
+
+import os, sys
+
+sys.path.append("../..")
+sys.path.append("../../new_base")
+
+config = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/test_config_onos.yaml")
+from xosconfig import Config
+Config.init(config, 'synchronizer-config-schema.yaml')
+
+import synchronizers.new_base.modelaccessor
+from steps.mock_modelaccessor import *
+import event_loop
+import backend
+
+class TestServices(unittest.TestCase):
+ def setUp(self):
+ b = backend.Backend()
+ steps_dir = Config.get("steps_dir")
+ self.steps = b.load_sync_step_modules(steps_dir)
+ self.synchronizer = event_loop.XOSObserver(self.steps)
+
+ def test_service_models(self):
+ o = ONOSApp()
+ t = ONOSService()
+
+ cohorts = self.synchronizer.compute_dependent_cohorts([o,t], False)
+ self.assertIn([t,o], cohorts)
+
+ cohorts = self.synchronizer.compute_dependent_cohorts([t,o], False)
+ self.assertIn([t,o], cohorts)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/xos/synchronizers/new_base/toposort.py b/xos/synchronizers/new_base/toposort.py
deleted file mode 100644
index c8d58b5..0000000
--- a/xos/synchronizers/new_base/toposort.py
+++ /dev/null
@@ -1,88 +0,0 @@
-
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-#!/usr/bin/env python
-
-import time
-import traceback
-import commands
-import threading
-import json
-import pdb
-
-from datetime import datetime
-from collections import defaultdict
-
-# Topological sort
-# Notes:
-# - Uses a stack instead of recursion
-# - Forfeits optimization involving tracking currently visited nodes
-def toposort(g, steps=None):
- # Get set of all nodes, including those without outgoing edges
- keys = set(g.keys())
- values = set({})
- for v in g.values():
- values=values | set(v)
-
- all_nodes=list(keys|values)
- if (not steps):
- steps = all_nodes
-
- # Final order
- order = []
-
- # DFS stack, not using recursion
- stack = []
-
- # Unmarked set
- unmarked = all_nodes
-
- # visiting = [] - skip, don't expect 1000s of nodes, |E|/|V| is small
-
- while unmarked:
- stack.insert(0,unmarked[0]) # push first unmarked
-
- while (stack):
- n = stack[0]
- add = True
- try:
- for m in g[n]:
- if (m in unmarked):
- add = False
- stack.insert(0,m)
- except KeyError:
- pass
- if (add):
- if (n in steps and n not in order):
- order.append(n)
- item = stack.pop(0)
- try:
- unmarked.remove(item)
- except ValueError:
- pass
-
- noorder = list(set(steps) - set(order))
- return order + noorder
-
-def main():
- graph_file=open('xos.deps').read()
- g = json.loads(graph_file)
- print toposort(g)
-
-if (__name__=='__main__'):
- main()
-
-#print toposort({'a':'b','b':'c','c':'d','d':'c'},['d','c','b','a'])
diff --git a/xos/synchronizers/new_base/watchers.py b/xos/synchronizers/new_base/watchers.py
index 377ac3b..5794f83 100644
--- a/xos/synchronizers/new_base/watchers.py
+++ b/xos/synchronizers/new_base/watchers.py
@@ -38,7 +38,6 @@
log = create_logger(Config().get('logging'))
-
class XOSWatcher:
def load_sync_step_modules(self, step_dir=None):
if step_dir is None:
diff --git a/xos/synchronizers/new_base/xos-synchronizer.py b/xos/synchronizers/new_base/xos-synchronizer.py
index fd824e1..447954c 100644
--- a/xos/synchronizers/new_base/xos-synchronizer.py
+++ b/xos/synchronizers/new_base/xos-synchronizer.py
@@ -48,7 +48,9 @@
if (wait):
time.sleep(60) # Safety factor, seeing that we stumbled waiting for the data model to come up.
- backend = Backend()
+
+ log_closure = log.bind(synchronizer_name = Config().get('name'))
+ backend = Backend(log = log_closure)
backend.run()
if __name__ == '__main__':
diff --git a/xos/xos/logger.py b/xos/xos/logger.py
index d69062b..e7cf2cf 100644
--- a/xos/xos/logger.py
+++ b/xos/xos/logger.py
@@ -58,7 +58,6 @@
class Logger:
-
def __init__(self, logfile=None, loggername=None, level=logging.INFO):
# Logstash config
@@ -68,6 +67,7 @@
logstash_host, int(logstash_port), version=1)
# always log at DEBUG level to logstash
logstash_handler.setLevel(logging.DEBUG)
+ raise Exception("Disabled")
except:
# if connection fails (eg: logstash is not there) just move on
logstash_handler = None