Realign logging levels. autopep8'd event_loop.py as it was an indentation mess
uuids for task, more verbose, remove double logging
Change-Id: I4729ebc352d1a5f3b7262105cf084773c5e58f6f
diff --git a/xos/synchronizers/base/SyncInstanceUsingAnsible.py b/xos/synchronizers/base/SyncInstanceUsingAnsible.py
index bb7eee5..b7f11d2 100644
--- a/xos/synchronizers/base/SyncInstanceUsingAnsible.py
+++ b/xos/synchronizers/base/SyncInstanceUsingAnsible.py
@@ -32,6 +32,8 @@
return False
def defer_sync(self, o, reason):
+ # zdw, 2017-02-18 - is raising the exception here necessary? - seems like
+ # it's just logging the same thing twice
logger.info("defer object %s due to %s" % (str(o), reason),extra=o.tologdict())
raise Exception("defer object %s due to %s" % (str(o), reason))
@@ -272,7 +274,7 @@
if hasattr(self, "map_delete_outputs"):
self.map_delete_outputs(o,res)
- #In order to enable the XOS watcher functionality for a synchronizer, define the 'watches' attribute
+ #In order to enable the XOS watcher functionality for a synchronizer, define the 'watches' attribute
#in the derived class: eg. watches = [ModelLink(CoarseTenant,via='coarsetenant')]
#This base class implements the notification handler for handling CoarseTenant model notifications
#If a synchronizer need to watch on multiple objects, the additional handlers need to be implemented
@@ -345,7 +347,7 @@
target_network = target_networks[0]
src_ip = instance.get_network_ip(src_network.name)
target_subnet = target_network.controllernetworks.all()[0].subnet
-
+
#Run ansible playbook to update the routing table entries in the instance
fields = self.get_ansible_fields(instance)
fields["ansible_tag"] = obj.__class__.__name__ + "_" + str(obj.id) + "_service_composition"
diff --git a/xos/synchronizers/base/ansible_helper.py b/xos/synchronizers/base/ansible_helper.py
index 524afff..6f40f37 100644
--- a/xos/synchronizers/base/ansible_helper.py
+++ b/xos/synchronizers/base/ansible_helper.py
@@ -95,14 +95,19 @@
except:
pass
+ # FIXME (zdw, 2017-02-19) - may not be needed with new callback logging
if (object):
oprops = object.tologdict()
ansible = x._result
- oprops['ansible']=1
- oprops['failed']=failed
- oprops['ansible_results']=json.dumps(ansible)
+ oprops['xos_type']='ansible'
+ oprops['ansible_result']=json.dumps(ansible)
- logger.info(x._task, extra=oprops)
+ if failed == 0:
+ oprops['ansible_status']='OK'
+ else:
+ oprops['ansible_status']='FAILED'
+
+ # logger.info(x._task, extra=oprops)
if (expected_num is not None) and (len(ok_results) != expected_num):
diff --git a/xos/synchronizers/base/ansible_runner.py b/xos/synchronizers/base/ansible_runner.py
index d2c3a9d..34823ae 100644
--- a/xos/synchronizers/base/ansible_runner.py
+++ b/xos/synchronizers/base/ansible_runner.py
@@ -4,6 +4,7 @@
import sys
import pdb
import json
+import uuid
from tempfile import NamedTemporaryFile
from ansible.inventory import Inventory
@@ -23,12 +24,14 @@
def __init__(self):
super(ResultCallback, self).__init__()
self.results = []
+ self.uuid = str(uuid.uuid1())
self.playbook_status = 'OK'
def v2_playbook_on_start(self, playbook):
self.playbook = playbook._file_name
log_extra = {
'xos_type': "ansible",
+ 'ansible_uuid': self.uuid,
'ansible_type': "playbook start",
'ansible_status': "OK",
'ansible_playbook': self.playbook
@@ -42,6 +45,7 @@
log_extra = {
'xos_type': "ansible",
+ 'ansible_uuid': self.uuid,
'ansible_type': "playbook stats",
'ansible_status': self.playbook_status,
'ansible_playbook': self.playbook,
@@ -56,17 +60,20 @@
def v2_playbook_on_play_start(self, play):
log_extra = {
'xos_type': "ansible",
+ 'ansible_uuid': self.uuid,
'ansible_type': "play start",
+ 'ansible_status': self.playbook_status,
'ansible_playbook': self.playbook
}
- logger.debug("PLAY [%s]" % play.name, extra=log_extra)
+ logger.debug("PLAY START [%s]" % play.name, extra=log_extra)
def v2_runner_on_ok(self, result, **kwargs):
log_extra = {
'xos_type': "ansible",
+ 'ansible_uuid': self.uuid,
'ansible_type': "task",
'ansible_status': "OK",
- 'ansible_result': self._dump_results(result._result),
+ 'ansible_result': json.dumps(result._result),
'ansible_task': result._task,
'ansible_playbook': self.playbook,
'ansible_host': result._host.get_name()
@@ -78,9 +85,10 @@
self.playbook_status = "FAILED"
log_extra = {
'xos_type': "ansible",
+ 'ansible_uuid': self.uuid,
'ansible_type': "task",
'ansible_status': "FAILED",
- 'ansible_result': self._dump_results(result._result),
+ 'ansible_result': json.dumps(result._result),
'ansible_task': result._task,
'ansible_playbook': self.playbook,
'ansible_host': result._host.get_name()
@@ -92,9 +100,10 @@
self.playbook_status = "FAILED"
log_extra = {
'xos_type': "ansible",
+ 'ansible_uuid': self.uuid,
'ansible_type': "task",
'ansible_status': "ASYNC FAILED",
- 'ansible_result': self._dump_results(result._result),
+ 'ansible_result': json.dumps(result._result),
'ansible_task': result._task,
'ansible_playbook': self.playbook,
'ansible_host': result._host.get_name()
@@ -104,8 +113,10 @@
def v2_runner_on_skipped(self, result, **kwargs):
log_extra = {
'xos_type': "ansible",
+ 'ansible_uuid': self.uuid,
'ansible_type': "task",
'ansible_status': "SKIPPED",
+ 'ansible_result': json.dumps(result._result),
'ansible_task': result._task,
'ansible_playbook': self.playbook,
'ansible_host': result._host.get_name()
@@ -116,9 +127,10 @@
def v2_runner_on_unreachable(self, result, **kwargs):
log_extra = {
'xos_type': "ansible",
+ 'ansible_uuid': self.uuid,
'ansible_type': "task",
'ansible_status': "UNREACHABLE",
- 'ansible_result': self._dump_results(result._result),
+ 'ansible_result': json.dumps(result._result),
'ansible_task': result._task,
'ansible_playbook': self.playbook,
'ansible_host': result._host.get_name()
@@ -129,9 +141,10 @@
def v2_runner_retry(self, result, **kwargs):
log_extra = {
'xos_type': "ansible",
+ 'ansible_uuid': self.uuid,
'ansible_type': "task",
'ansible_status': "RETRY",
- 'ansible_result': self._dump_results(result._result),
+ 'ansible_result': json.dumps(result._result),
'ansible_task': result._task,
'ansible_playbook': self.playbook,
'ansible_host': result._host.get_name()
@@ -142,6 +155,7 @@
def v2_playbook_on_handler_task_start(self, task, **kwargs):
log_extra = {
'xos_type': "ansible",
+ 'ansible_uuid': self.uuid,
'ansible_type': "task",
'ansible_status': "HANDLER",
'ansible_task': task.get_name().strip(),
@@ -153,8 +167,10 @@
def v2_playbook_on_import_for_host(self, result, imported_file):
log_extra = {
'xos_type': "ansible",
+ 'ansible_uuid': self.uuid,
'ansible_type': "import",
'ansible_status': "IMPORT",
+ 'ansible_result': json.dumps(result._result),
'ansible_playbook': self.playbook,
'ansible_host': result._host.get_name()
}
@@ -164,8 +180,10 @@
def v2_playbook_on_not_import_for_host(self, result, missing_file):
log_extra = {
'xos_type': "ansible",
+ 'ansible_uuid': self.uuid,
'ansible_type': "import",
'ansible_status': "MISSING IMPORT",
+ 'ansible_result': json.dumps(result._result),
'ansible_playbook': self.playbook,
'ansible_host': result._host.get_name()
}
diff --git a/xos/synchronizers/base/event_loop.py b/xos/synchronizers/base/event_loop.py
index 4c7ab1b..6e61051 100644
--- a/xos/synchronizers/base/event_loop.py
+++ b/xos/synchronizers/base/event_loop.py
@@ -36,19 +36,20 @@
except AttributeError:
app_module_names = []
-if (type(app_module_names)!=list):
- app_module_names=[app_module_names]
+if (not isinstance(app_module_names, list)):
+ app_module_names = [app_module_names]
app_modules = []
for m in app_module_names:
- model_path = m+'.models'
- module = __import__(model_path,fromlist=[m])
+ model_path = m + '.models'
+ module = __import__(model_path, fromlist=[m])
app_modules.append(module)
debug_mode = False
+
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
@@ -61,492 +62,563 @@
logger = Logger(level=logging.INFO)
+
class StepNotReady(Exception):
- pass
+ pass
+
class NoOpDriver:
- def __init__(self):
- self.enabled = True
- self.dependency_graph = None
+
+ def __init__(self):
+ self.enabled = True
+ self.dependency_graph = None
# Everyone gets NoOpDriver by default. To use a different driver, call
# set_driver() below.
DRIVER = NoOpDriver()
+
def set_driver(x):
global DRIVER
DRIVER = x
-STEP_STATUS_WORKING=1
-STEP_STATUS_OK=2
-STEP_STATUS_KO=3
+STEP_STATUS_WORKING = 1
+STEP_STATUS_OK = 2
+STEP_STATUS_KO = 3
+
def invert_graph(g):
- ig = {}
- for k,v in g.items():
- for v0 in v:
- try:
- ig[v0].append(k)
- except:
- ig[v0]=[k]
- return ig
+ ig = {}
+ for k, v in g.items():
+ for v0 in v:
+ try:
+ ig[v0].append(k)
+ except:
+ ig[v0] = [k]
+ return ig
+
class XOSObserver:
- sync_steps = []
+ sync_steps = []
+ def __init__(self, sync_steps):
+ # The Condition object that gets signalled by Feefie events
+ self.step_lookup = {}
+ # self.load_sync_step_modules()
+ self.sync_steps = sync_steps
+ self.load_sync_steps()
+ self.event_cond = threading.Condition()
- def __init__(self,sync_steps):
- # The Condition object that gets signalled by Feefie events
- self.step_lookup = {}
- #self.load_sync_step_modules()
- self.sync_steps = sync_steps
- self.load_sync_steps()
- self.event_cond = threading.Condition()
+ self.driver = DRIVER
+ self.observer_name = getattr(Config(), "observer_name", "")
- self.driver = DRIVER
- self.observer_name = getattr(Config(), "observer_name", "")
+ def consolePrint(self, what):
+ if getattr(Config(), "observer_console_print", True):
+ print what
- def consolePrint(self, what):
- if getattr(Config(), "observer_console_print", True):
- print what
+ def wait_for_event(self, timeout):
+ self.event_cond.acquire()
+ self.event_cond.wait(timeout)
+ self.event_cond.release()
- def wait_for_event(self, timeout):
- self.event_cond.acquire()
- self.event_cond.wait(timeout)
- self.event_cond.release()
+ def wake_up(self):
+ logger.info('Wake up routine called. Event cond %r' % self.event_cond)
+ self.event_cond.acquire()
+ self.event_cond.notify()
+ self.event_cond.release()
- def wake_up(self):
- logger.info('Wake up routine called. Event cond %r'%self.event_cond)
- self.event_cond.acquire()
- self.event_cond.notify()
- self.event_cond.release()
+ def load_sync_step_modules(self, step_dir=None):
+ if step_dir is None:
+ step_dir = Config().observer_steps_dir
- def load_sync_step_modules(self, step_dir=None):
- if step_dir is None:
- step_dir = Config().observer_steps_dir
+ for fn in os.listdir(step_dir):
+ pathname = os.path.join(step_dir, fn)
+ if os.path.isfile(pathname) and fn.endswith(
+ ".py") and (fn != "__init__.py"):
+ module = imp.load_source(fn[:-3], pathname)
+ for classname in dir(module):
+ c = getattr(module, classname, None)
- for fn in os.listdir(step_dir):
- pathname = os.path.join(step_dir,fn)
- if os.path.isfile(pathname) and fn.endswith(".py") and (fn!="__init__.py"):
- module = imp.load_source(fn[:-3],pathname)
- for classname in dir(module):
- c = getattr(module, classname, None)
+ # make sure 'c' is a descendent of SyncStep and has a
+ # provides field (this eliminates the abstract base classes
+ # since they don't have a provides)
- # make sure 'c' is a descendent of SyncStep and has a
- # provides field (this eliminates the abstract base classes
- # since they don't have a provides)
+ if inspect.isclass(c) and issubclass(
+ c, SyncStep) and hasattr(
+ c, "provides") and (
+ c not in self.sync_steps):
+ self.sync_steps.append(c)
+ logger.info('loaded sync steps: %s' %
+ ",".join([x.__name__ for x in self.sync_steps]))
- if inspect.isclass(c) and issubclass(c, SyncStep) and hasattr(c,"provides") and (c not in self.sync_steps):
- self.sync_steps.append(c)
- logger.info('loaded sync steps: %s' % ",".join([x.__name__ for x in self.sync_steps]))
+ def load_sync_steps(self):
+ dep_path = Config().observer_dependency_graph
+ logger.info('Loading model dependency graph from %s' % dep_path)
+ try:
+ # This contains dependencies between records, not sync steps
+ self.model_dependency_graph = json.loads(open(dep_path).read())
+ for left, lst in self.model_dependency_graph.items():
+ new_lst = []
+ for k in lst:
+ try:
+ tup = (k, k.lower())
+ new_lst.append(tup)
+ deps = self.model_dependency_graph[k]
+ except:
+ self.model_dependency_graph[k] = []
- def load_sync_steps(self):
- dep_path = Config().observer_dependency_graph
- logger.info('Loading model dependency graph from %s' % dep_path)
- try:
- # This contains dependencies between records, not sync steps
- self.model_dependency_graph = json.loads(open(dep_path).read())
- for left,lst in self.model_dependency_graph.items():
- new_lst = []
- for k in lst:
- try:
- tup = (k,k.lower())
- new_lst.append(tup)
- deps = self.model_dependency_graph[k]
- except:
- self.model_dependency_graph[k] = []
+ self.model_dependency_graph[left] = new_lst
+ except Exception as e:
+ raise e
- self.model_dependency_graph[left] = new_lst
- except Exception,e:
- raise e
+ try:
+ backend_path = Config().observer_pl_dependency_graph
+ logger.info(
+ 'Loading backend dependency graph from %s' %
+ backend_path)
+ # This contains dependencies between backend records
+ self.backend_dependency_graph = json.loads(
+ open(backend_path).read())
+ for k, v in self.backend_dependency_graph.items():
+ try:
+ self.model_dependency_graph[k].extend(v)
+ except KeyError:
+ self.model_dependency_graphp[k] = v
- try:
- backend_path = Config().observer_pl_dependency_graph
- logger.info('Loading backend dependency graph from %s' % backend_path)
- # This contains dependencies between backend records
- self.backend_dependency_graph = json.loads(open(backend_path).read())
- for k,v in self.backend_dependency_graph.items():
- try:
- self.model_dependency_graph[k].extend(v)
- except KeyError:
- self.model_dependency_graphp[k] = v
+ except Exception as e:
+ logger.info('Backend dependency graph not loaded')
+ # We can work without a backend graph
+ self.backend_dependency_graph = {}
- except Exception,e:
- logger.info('Backend dependency graph not loaded')
- # We can work without a backend graph
- self.backend_dependency_graph = {}
+ provides_dict = {}
+ for s in self.sync_steps:
+ self.step_lookup[s.__name__] = s
+ for m in s.provides:
+ try:
+ provides_dict[m.__name__].append(s.__name__)
+ except KeyError:
+ provides_dict[m.__name__] = [s.__name__]
- provides_dict = {}
- for s in self.sync_steps:
- self.step_lookup[s.__name__] = s
- for m in s.provides:
- try:
- provides_dict[m.__name__].append(s.__name__)
- except KeyError:
- provides_dict[m.__name__]=[s.__name__]
-
- step_graph = {}
- phantom_steps = []
- for k,v in self.model_dependency_graph.items():
- try:
- for source in provides_dict[k]:
- if (not v):
- step_graph[source] = []
-
- for m,_ in v:
- try:
- for dest in provides_dict[m]:
- # no deps, pass
- try:
- if (dest not in step_graph[source]):
- step_graph[source].append(dest)
- except:
- step_graph[source]=[dest]
- except KeyError:
- if (not provides_dict.has_key(m)):
- try:
- step_graph[source]+=['#%s'%m]
- except:
- step_graph[source]=['#%s'%m]
-
- phantom_steps+=['#%s'%m]
- pass
-
- except KeyError:
- pass
- # no dependencies, pass
-
-
- self.dependency_graph = step_graph
- self.deletion_dependency_graph = invert_graph(step_graph)
-
- pp = pprint.PrettyPrinter(indent=4)
- logger.info(pp.pformat(step_graph))
- self.ordered_steps = toposort(self.dependency_graph, phantom_steps+map(lambda s:s.__name__,self.sync_steps))
- self.ordered_steps = [i for i in self.ordered_steps if i!='SyncObject']
-
- logger.info("Order of steps=%s" % self.ordered_steps)
-
- self.load_run_times()
-
-
- def check_duration(self, step, duration):
- try:
- if (duration > step.deadline):
- logger.info('Sync step %s missed deadline, took %.2f seconds'%(step.name,duration))
- except AttributeError:
- # S doesn't have a deadline
- pass
-
- def update_run_time(self, step, deletion):
- if (not deletion):
- self.last_run_times[step.__name__]=time.time()
- else:
- self.last_deletion_run_times[step.__name__]=time.time()
-
-
- def check_schedule(self, step, deletion):
- last_run_times = self.last_run_times if not deletion else self.last_deletion_run_times
-
- time_since_last_run = time.time() - last_run_times.get(step.__name__, 0)
- try:
- if (time_since_last_run < step.requested_interval):
- raise StepNotReady
- except AttributeError:
- logger.info('Step %s does not have requested_interval set'%step.__name__)
- raise StepNotReady
-
- def load_run_times(self):
- try:
- jrun_times = open('/tmp/%sobserver_run_times'%self.observer_name).read()
- self.last_run_times = json.loads(jrun_times)
- except:
- self.last_run_times={}
- for e in self.ordered_steps:
- self.last_run_times[e]=0
- try:
- jrun_times = open('/tmp/%sobserver_deletion_run_times'%self.observer_name).read()
- self.last_deletion_run_times = json.loads(jrun_times)
- except:
- self.last_deletion_run_times={}
- for e in self.ordered_steps:
- self.last_deletion_run_times[e]=0
-
- def lookup_step_class(self,s):
- if ('#' in s):
- return SyncObject
- else:
- step = self.step_lookup[s]
- return step
-
- def lookup_step(self,s):
- if ('#' in s):
- objname = s[1:]
- so = SyncObject()
-
- try:
- obj = globals()[objname]
- except:
- for m in app_modules:
- if (hasattr(m,objname)):
- obj = getattr(m,objname)
-
- so.provides=[obj]
- so.observes=[obj]
- step = so
- else:
- step_class = self.step_lookup[s]
- step = step_class(driver=self.driver,error_map=self.error_mapper)
- return step
-
- def save_run_times(self):
- run_times = json.dumps(self.last_run_times)
- open('/tmp/%sobserver_run_times'%self.observer_name,'w').write(run_times)
-
- deletion_run_times = json.dumps(self.last_deletion_run_times)
- open('/tmp/%sobserver_deletion_run_times'%self.observer_name,'w').write(deletion_run_times)
-
- def check_class_dependency(self, step, failed_steps):
- step.dependenices = []
- for obj in step.provides:
- lst = self.model_dependency_graph.get(obj.__name__, [])
- nlst = map(lambda(a,b):b,lst)
- step.dependenices.extend(nlst)
- for failed_step in failed_steps:
- if (failed_step in step.dependencies):
- raise StepNotReady
-
- def sync(self, S, deletion):
+ step_graph = {}
+ phantom_steps = []
+ for k, v in self.model_dependency_graph.items():
try:
- step = self.lookup_step_class(S)
- start_time=time.time()
+ for source in provides_dict[k]:
+ if (not v):
+ step_graph[source] = []
- logger.info("Starting to work on step %s, deletion=%s" % (step.__name__, str(deletion)))
-
- dependency_graph = self.dependency_graph if not deletion else self.deletion_dependency_graph
- step_conditions = self.step_conditions# if not deletion else self.deletion_step_conditions
- step_status = self.step_status# if not deletion else self.deletion_step_status
-
- # Wait for step dependencies to be met
- try:
- deps = dependency_graph[S]
- has_deps = True
- except KeyError:
- has_deps = False
-
- go = True
-
- failed_dep = None
- if (has_deps):
- for d in deps:
- if d==step.__name__:
- logger.info(" step %s self-wait skipped" % step.__name__)
- go = True
- continue
-
- cond = step_conditions[d]
- cond.acquire()
- if (step_status[d] is STEP_STATUS_WORKING):
- logger.info(" step %s wait on dep %s" % (step.__name__, d))
- cond.wait()
- logger.info(" step %s wait on dep %s cond returned" % (step.__name__, d))
- elif step_status[d] == STEP_STATUS_OK:
- go = True
- else:
- logger.info(" step %s has failed dep %s" % (step.__name__, d))
- go = False
- failed_dep = d
- cond.release()
- if (not go):
- break
- else:
- go = True
-
- if (not go):
- logger.info("Step %s skipped" % step.__name__)
- self.consolePrint(bcolors.FAIL + "Step %r skipped on %r" % (step,failed_dep) + bcolors.ENDC)
- # SMBAKER: sync_step was not defined here, so I changed
- # this from 'sync_step' to 'step'. Verify.
- self.failed_steps.append(step)
- my_status = STEP_STATUS_KO
- else:
- sync_step = self.lookup_step(S)
- sync_step. __name__= step.__name__
- sync_step.dependencies = []
- try:
- mlist = sync_step.provides
-
+ for m, _ in v:
+ try:
+ for dest in provides_dict[m]:
+ # no deps, pass
try:
- for m in mlist:
- lst = self.model_dependency_graph[m.__name__]
- nlst = map(lambda(a,b):b,lst)
- sync_step.dependencies.extend(nlst)
- except Exception,e:
- raise e
+ if (dest not in step_graph[source]):
+ step_graph[source].append(dest)
+ except:
+ step_graph[source] = [dest]
+ except KeyError:
+ if (m not in provides_dict):
+ try:
+ step_graph[source] += ['#%s' % m]
+ except:
+ step_graph[source] = ['#%s' % m]
- except KeyError:
- pass
- sync_step.debug_mode = debug_mode
+ phantom_steps += ['#%s' % m]
+ pass
- should_run = False
- try:
- # Various checks that decide whether
- # this step runs or not
- self.check_class_dependency(sync_step, self.failed_steps) # dont run Slices if Sites failed
- self.check_schedule(sync_step, deletion) # dont run sync_network_routes if time since last run < 1 hour
- should_run = True
- except StepNotReady:
- logger.info('Step not ready: %s'%sync_step.__name__)
- self.failed_steps.append(sync_step)
- my_status = STEP_STATUS_KO
- except Exception,e:
- logger.error('%r' % e)
- logger.log_exc("sync step failed: %r. Deletion: %r"%(sync_step,deletion))
- self.failed_steps.append(sync_step)
- my_status = STEP_STATUS_KO
+ except KeyError:
+ pass
+ # no dependencies, pass
- if (should_run):
- try:
- duration=time.time() - start_time
+ self.dependency_graph = step_graph
+ self.deletion_dependency_graph = invert_graph(step_graph)
- logger.info('Executing step %s, deletion=%s' % (sync_step.__name__, deletion))
+ pp = pprint.PrettyPrinter(indent=4)
+ logger.debug(pp.pformat(step_graph))
+ self.ordered_steps = toposort(
+ self.dependency_graph, phantom_steps + map(lambda s: s.__name__, self.sync_steps))
+ self.ordered_steps = [
+ i for i in self.ordered_steps if i != 'SyncObject']
- self.consolePrint(bcolors.OKBLUE + "Executing step %s" % sync_step.__name__ + bcolors.ENDC)
- failed_objects = sync_step(failed=list(self.failed_step_objects), deletion=deletion)
+ logger.info("Order of steps=%s" % self.ordered_steps)
- self.check_duration(sync_step, duration)
+ self.load_run_times()
- if failed_objects:
- self.failed_step_objects.update(failed_objects)
+ def check_duration(self, step, duration):
+ try:
+ if (duration > step.deadline):
+ logger.info(
+ 'Sync step %s missed deadline, took %.2f seconds' %
+ (step.name, duration))
+ except AttributeError:
+ # S doesn't have a deadline
+ pass
- logger.info("Step %r succeeded, deletion=%s" % (sync_step.__name__, deletion))
- self.consolePrint(bcolors.OKGREEN + "Step %r succeeded" % sync_step.__name__ + bcolors.ENDC)
- my_status = STEP_STATUS_OK
- self.update_run_time(sync_step,deletion)
- except Exception,e:
- self.consolePrint(bcolors.FAIL + "Model step %r failed" % (sync_step.__name__) + bcolors.ENDC)
- logger.error('Model step %r failed. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!' % (sync_step.__name__, e))
- logger.log_exc("Exception in sync step")
- self.failed_steps.append(S)
- my_status = STEP_STATUS_KO
- else:
- logger.info("Step %r succeeded due to non-run" % step)
- my_status = STEP_STATUS_OK
+ def update_run_time(self, step, deletion):
+ if (not deletion):
+ self.last_run_times[step.__name__] = time.time()
+ else:
+ self.last_deletion_run_times[step.__name__] = time.time()
- try:
- my_cond = step_conditions[S]
- my_cond.acquire()
- step_status[S]=my_status
- my_cond.notify_all()
- my_cond.release()
- except KeyError,e:
- logger.info('Step %r is a leaf' % step)
- pass
- finally:
+ def check_schedule(self, step, deletion):
+ last_run_times = self.last_run_times if not deletion else self.last_deletion_run_times
+
+ time_since_last_run = time.time() - last_run_times.get(step.__name__, 0)
+ try:
+ if (time_since_last_run < step.requested_interval):
+ raise StepNotReady
+ except AttributeError:
+ logger.info(
+ 'Step %s does not have requested_interval set' %
+ step.__name__)
+ raise StepNotReady
+
+ def load_run_times(self):
+ try:
+ jrun_times = open(
+ '/tmp/%sobserver_run_times' %
+ self.observer_name).read()
+ self.last_run_times = json.loads(jrun_times)
+ except:
+ self.last_run_times = {}
+ for e in self.ordered_steps:
+ self.last_run_times[e] = 0
+ try:
+ jrun_times = open(
+ '/tmp/%sobserver_deletion_run_times' %
+ self.observer_name).read()
+ self.last_deletion_run_times = json.loads(jrun_times)
+ except:
+ self.last_deletion_run_times = {}
+ for e in self.ordered_steps:
+ self.last_deletion_run_times[e] = 0
+
+ def lookup_step_class(self, s):
+ if ('#' in s):
+ return SyncObject
+ else:
+ step = self.step_lookup[s]
+ return step
+
+ def lookup_step(self, s):
+ if ('#' in s):
+ objname = s[1:]
+ so = SyncObject()
+
+ try:
+ obj = globals()[objname]
+ except:
+ for m in app_modules:
+ if (hasattr(m, objname)):
+ obj = getattr(m, objname)
+
+ so.provides = [obj]
+ so.observes = [obj]
+ step = so
+ else:
+ step_class = self.step_lookup[s]
+ step = step_class(driver=self.driver, error_map=self.error_mapper)
+ return step
+
+ def save_run_times(self):
+ run_times = json.dumps(self.last_run_times)
+ open(
+ '/tmp/%sobserver_run_times' %
+ self.observer_name,
+ 'w').write(run_times)
+
+ deletion_run_times = json.dumps(self.last_deletion_run_times)
+ open('/tmp/%sobserver_deletion_run_times' %
+ self.observer_name, 'w').write(deletion_run_times)
+
+ def check_class_dependency(self, step, failed_steps):
+ step.dependenices = []
+ for obj in step.provides:
+ lst = self.model_dependency_graph.get(obj.__name__, [])
+ nlst = map(lambda a_b1: a_b1[1], lst)
+ step.dependenices.extend(nlst)
+ for failed_step in failed_steps:
+ if (failed_step in step.dependencies):
+ raise StepNotReady
+
+ def sync(self, S, deletion):
+ try:
+ step = self.lookup_step_class(S)
+ start_time = time.time()
+
+ logger.debug(
+ "Starting to work on step %s, deletion=%s" %
+ (step.__name__, str(deletion)))
+
+ dependency_graph = self.dependency_graph if not deletion else self.deletion_dependency_graph
+ # if not deletion else self.deletion_step_conditions
+ step_conditions = self.step_conditions
+ step_status = self.step_status # if not deletion else self.deletion_step_status
+
+ # Wait for step dependencies to be met
+ try:
+ deps = dependency_graph[S]
+ has_deps = True
+ except KeyError:
+ has_deps = False
+
+ go = True
+
+ failed_dep = None
+ if (has_deps):
+ for d in deps:
+ if d == step.__name__:
+ logger.debug(
+ " step %s self-wait skipped" %
+ step.__name__)
+ go = True
+ continue
+
+ cond = step_conditions[d]
+ cond.acquire()
+ if (step_status[d] is STEP_STATUS_WORKING):
+ logger.debug(
+ " step %s wait on dep %s" %
+ (step.__name__, d))
+ cond.wait()
+ logger.debug(
+ " step %s wait on dep %s cond returned" %
+ (step.__name__, d))
+ elif step_status[d] == STEP_STATUS_OK:
+ go = True
+ else:
+ logger.debug(
+ " step %s has failed dep %s" %
+ (step.__name__, d))
+ go = False
+ failed_dep = d
+ cond.release()
+ if (not go):
+ break
+ else:
+ go = True
+
+ if (not go):
+ logger.debug("Step %s skipped" % step.__name__)
+ self.consolePrint(
+ bcolors.FAIL + "Step %r skipped on %r" %
+ (step, failed_dep) + bcolors.ENDC)
+ # SMBAKER: sync_step was not defined here, so I changed
+ # this from 'sync_step' to 'step'. Verify.
+ self.failed_steps.append(step)
+ my_status = STEP_STATUS_KO
+ else:
+ sync_step = self.lookup_step(S)
+ sync_step. __name__ = step.__name__
+ sync_step.dependencies = []
+ try:
+ mlist = sync_step.provides
+
+ try:
+ for m in mlist:
+ lst = self.model_dependency_graph[m.__name__]
+ nlst = map(lambda a_b: a_b[1], lst)
+ sync_step.dependencies.extend(nlst)
+ except Exception as e:
+ raise e
+
+ except KeyError:
+ pass
+ sync_step.debug_mode = debug_mode
+
+ should_run = False
+ try:
+ # Various checks that decide whether
+ # this step runs or not
+ self.check_class_dependency(
+ sync_step, self.failed_steps) # dont run Slices if Sites failed
+ # dont run sync_network_routes if time since last run < 1
+ # hour
+ self.check_schedule(sync_step, deletion)
+ should_run = True
+ except StepNotReady:
+ logger.info('Step not ready: %s' % sync_step.__name__)
+ self.failed_steps.append(sync_step)
+ my_status = STEP_STATUS_KO
+ except Exception as e:
+ logger.error('%r' % e)
+ logger.log_exc(
+ "sync step failed: %r. Deletion: %r" %
+ (sync_step, deletion))
+ self.failed_steps.append(sync_step)
+ my_status = STEP_STATUS_KO
+
+ if (should_run):
+ try:
+ duration = time.time() - start_time
+
+ logger.debug(
+ 'Executing step %s, deletion=%s' %
+ (sync_step.__name__, deletion))
+
+ self.consolePrint(
+ bcolors.OKBLUE + "Executing step %s" %
+ sync_step.__name__ + bcolors.ENDC)
+ failed_objects = sync_step(
+ failed=list(
+ self.failed_step_objects),
+ deletion=deletion)
+
+ self.check_duration(sync_step, duration)
+
+ if failed_objects:
+ self.failed_step_objects.update(failed_objects)
+
+ logger.debug(
+ "Step %r succeeded, deletion=%s" %
+ (sync_step.__name__, deletion))
+ self.consolePrint(
+ bcolors.OKGREEN + "Step %r succeeded" %
+ sync_step.__name__ + bcolors.ENDC)
+ my_status = STEP_STATUS_OK
+ self.update_run_time(sync_step, deletion)
+ except Exception as e:
+ self.consolePrint(
+ bcolors.FAIL + "Model step %r failed" %
+ (sync_step.__name__) + bcolors.ENDC)
+ logger.error(
+ 'Model step %r failed. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!' %
+ (sync_step.__name__, e))
+ logger.log_exc("Exception in sync step")
+ self.failed_steps.append(S)
+ my_status = STEP_STATUS_KO
+ else:
+ logger.info("Step %r succeeded due to non-run" % step)
+ my_status = STEP_STATUS_OK
+
+ try:
+ my_cond = step_conditions[S]
+ my_cond.acquire()
+ step_status[S] = my_status
+ my_cond.notify_all()
+ my_cond.release()
+ except KeyError as e:
+ logger.debug('Step %r is a leaf' % step)
+ pass
+ finally:
+ try:
+ reset_queries()
+ except:
+ # this shouldn't happen, but in case it does, catch it...
+ logger.log_exc("exception in reset_queries")
+
+ connection.close()
+
+ def run(self):
+ if not self.driver.enabled:
+ return
+
+ while True:
+ logger.debug('Waiting for event')
+ self.wait_for_event(timeout=5)
+ logger.debug('Observer woke up')
+
+ self.run_once()
+
+ def check_db_connection_okay(self):
+ # django implodes if the database connection is closed by
+ # docker-compose
+ try:
+ diag = Diag.objects.filter(name="foo").first()
+ except Exception as e:
+ from django import db
+ if "connection already closed" in traceback.format_exc():
+ logger.error("XXX connection already closed")
+ try:
+ # if db.connection:
+ # db.connection.close()
+ db.close_old_connections()
+ except:
+ logger.log_exc("XXX we failed to fix the failure")
+ else:
+ logger.log_exc("XXX some other error")
+
+ def run_once(self):
+ try:
+ self.check_db_connection_okay()
+
+ loop_start = time.time()
+ error_map_file = getattr(
+ Config(),
+ "error_map_path",
+ XOS_DIR +
+ "/error_map.txt")
+ self.error_mapper = ErrorMapper(error_map_file)
+
+ # Two passes. One for sync, the other for deletion.
+ for deletion in [False, True]:
+ # Set of individual objects within steps that failed
+ self.failed_step_objects = set()
+
+ # Set up conditions and step status
+ # This is needed for steps to run in parallel
+ # while obeying dependencies.
+
+ providers = set()
+ dependency_graph = self.dependency_graph if not deletion else self.deletion_dependency_graph
+
+ for v in dependency_graph.values():
+ if (v):
+ providers.update(v)
+
+ self.step_conditions = {}
+ self.step_status = {}
+
+ for p in list(providers):
+ self.step_conditions[p] = threading.Condition()
+
+ self.step_status[p] = STEP_STATUS_WORKING
+
+ self.failed_steps = []
+
+ threads = []
+ logger.debug('Deletion=%r...' % deletion)
+ schedule = self.ordered_steps if not deletion else reversed(
+ self.ordered_steps)
+
+ for S in schedule:
+ thread = threading.Thread(
+ target=self.sync, name='synchronizer', args=(
+ S, deletion))
+
+ logger.debug('Deletion=%r...' % deletion)
+ threads.append(thread)
+
+ # Start threads
+ for t in threads:
+ t.start()
+
+ # another spot to clean up debug state
try:
reset_queries()
except:
# this shouldn't happen, but in case it does, catch it...
logger.log_exc("exception in reset_queries")
- connection.close()
+ # Wait for all threads to finish before continuing with the run
+ # loop
+ for t in threads:
+ t.join()
- def run(self):
- if not self.driver.enabled:
- return
+ self.save_run_times()
- while True:
- logger.info('Waiting for event')
- self.wait_for_event(timeout=5)
- logger.info('Observer woke up')
+ loop_end = time.time()
- self.run_once()
+ update_diag(
+ loop_end=loop_end,
+ loop_start=loop_start,
+ backend_status="1 - Bottom Of Loop")
- def check_db_connection_okay(self):
- # django implodes if the database connection is closed by docker-compose
- try:
- diag = Diag.objects.filter(name="foo").first()
- except Exception, e:
- from django import db
- if "connection already closed" in traceback.format_exc():
- logger.error("XXX connection already closed")
- try:
-# if db.connection:
-# db.connection.close()
- db.close_old_connections()
- except:
- logger.log_exc("XXX we failed to fix the failure")
- else:
- logger.log_exc("XXX some other error")
-
- def run_once(self):
- try:
- self.check_db_connection_okay()
-
- loop_start = time.time()
- error_map_file = getattr(Config(), "error_map_path", XOS_DIR + "/error_map.txt")
- self.error_mapper = ErrorMapper(error_map_file)
-
- # Two passes. One for sync, the other for deletion.
- for deletion in [False,True]:
- # Set of individual objects within steps that failed
- self.failed_step_objects = set()
-
- # Set up conditions and step status
- # This is needed for steps to run in parallel
- # while obeying dependencies.
-
- providers = set()
- dependency_graph = self.dependency_graph if not deletion else self.deletion_dependency_graph
-
- for v in dependency_graph.values():
- if (v):
- providers.update(v)
-
- self.step_conditions = {}
- self.step_status = {}
-
- for p in list(providers):
- self.step_conditions[p] = threading.Condition()
-
- self.step_status[p] = STEP_STATUS_WORKING
-
- self.failed_steps = []
-
- threads = []
- logger.info('Deletion=%r...'%deletion)
- schedule = self.ordered_steps if not deletion else reversed(self.ordered_steps)
-
- for S in schedule:
- thread = threading.Thread(target=self.sync, name='synchronizer', args=(S, deletion))
-
- logger.info('Deletion=%r...'%deletion)
- threads.append(thread)
-
- # Start threads
- for t in threads:
- t.start()
-
- # another spot to clean up debug state
- try:
- reset_queries()
- except:
- # this shouldn't happen, but in case it does, catch it...
- logger.log_exc("exception in reset_queries")
-
- # Wait for all threads to finish before continuing with the run loop
- for t in threads:
- t.join()
-
- self.save_run_times()
-
- loop_end = time.time()
-
- update_diag(loop_end=loop_end, loop_start=loop_start, backend_status="1 - Bottom Of Loop")
-
- except Exception, e:
- logger.error('Core error. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!' % e)
- logger.log_exc("Exception in observer run loop")
- traceback.print_exc()
- update_diag(backend_status="2 - Exception in Event Loop")
+ except Exception as e:
+ logger.error(
+ 'Core error. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!' %
+ e)
+ logger.log_exc("Exception in observer run loop")
+ traceback.print_exc()
+ update_diag(backend_status="2 - Exception in Event Loop")
diff --git a/xos/synchronizers/base/syncstep.py b/xos/synchronizers/base/syncstep.py
index 6bd4109..2c22db4 100644
--- a/xos/synchronizers/base/syncstep.py
+++ b/xos/synchronizers/base/syncstep.py
@@ -16,7 +16,7 @@
import time
import pdb
-logger = Logger(level=logging.INFO)
+logger = Logger(level=logging.DEBUG)
def f7(seq):
seen = set()
@@ -141,7 +141,7 @@
def sync_record(self, o):
- logger.info("Sync_record called for %s %s" % (o.__class__.__name__, str(o)))
+ logger.debug("Sync_record called for %s %s" % (o.__class__.__name__, str(o)))
try:
controller = o.get_controller()
diff --git a/xos/synchronizers/model_policy.py b/xos/synchronizers/model_policy.py
index 4b4ae24..5d60dda 100644
--- a/xos/synchronizers/model_policy.py
+++ b/xos/synchronizers/model_policy.py
@@ -19,7 +19,7 @@
model_policies = {}
-logger = Logger(level=logging.INFO)
+logger = Logger(level=logging.DEBUG)
def EnableModelPolicy(x):
global modelPolicyEnabled
@@ -70,7 +70,7 @@
if os.path.isfile(pathname) and fn.startswith("model_policy_") and fn.endswith(".py") and (fn!="__init__.py"):
model_policies[fn[:-3]] = imp.load_source(fn[:-3],pathname)
- logger.info("Loaded model polices %s from %s" % (",".join(model_policies.keys()), policies_dir))
+ logger.debug("Loaded model polices %s from %s" % (",".join(model_policies.keys()), policies_dir))
#@atomic
def execute_model_policy(instance, deleted):
@@ -92,7 +92,7 @@
try:
policy_handler = model_policies.get(policy_name, None) # getattr(model_policies, policy_name, None)
- logger.info("MODEL POLICY: handler %s %s" % (policy_name, policy_handler))
+ logger.debug("MODEL POLICY: handler %s %s" % (policy_name, policy_handler))
if policy_handler is not None:
if (deleted):
try:
@@ -101,7 +101,7 @@
pass
else:
policy_handler.handle(instance)
- logger.info("MODEL POLICY: completed handler %s %s" % (policy_name, policy_handler))
+ logger.debug("MODEL POLICY: completed handler %s %s" % (policy_name, policy_handler))
except:
logger.log_exc("MODEL POLICY: Exception when running handler")
@@ -175,7 +175,7 @@
objects = []
deleted_objects = []
- logger.info("MODEL POLICY: run_policy_once()")
+ logger.debug("MODEL POLICY: run_policy_once()")
check_db_connection_okay()
@@ -225,4 +225,4 @@
# this shouldn't happen, but in case it does, catch it...
logger.log_exc("MODEL POLICY: exception in reset_queries")
- logger.info("MODEL POLICY: finished run_policy_once()")
+ logger.debug("MODEL POLICY: finished run_policy_once()")