blob: 6cfc9f68a719a9cb66c7942a83f2dec2020563b3 [file] [log] [blame]
Scott Baker45fb7a12013-12-31 00:56:19 -08001import os
2import imp
3import inspect
Sapan Bhatia24836f12013-08-27 10:16:05 -04004import time
Sapan Bhatia6b6c2182015-01-27 03:58:11 +00005import sys
Sapan Bhatia24836f12013-08-27 10:16:05 -04006import traceback
7import commands
8import threading
9import json
Sapan Bhatiaab202a62014-09-03 11:30:21 -040010import pdb
Sapan Bhatia6b6c2182015-01-27 03:58:11 +000011import pprint
12
Sapan Bhatia24836f12013-08-27 10:16:05 -040013
14from datetime import datetime
15from collections import defaultdict
16from core.models import *
17from django.db.models import F, Q
Scott Bakerc7ca6552014-09-05 14:48:38 -070018from django.db import connection
Scott Baker8bbc77c2015-06-22 10:56:16 -070019from django.db import reset_queries
Tony Mack387a73f2013-09-18 07:59:14 -040020#from openstack.manager import OpenStackManager
21from openstack.driver import OpenStackDriver
Scott Bakerf154cc22016-01-14 16:07:32 -080022from xos.logger import Logger, logging, logger
Sapan Bhatia24836f12013-08-27 10:16:05 -040023#from timeout import timeout
Scott Baker76a840e2015-02-11 21:38:09 -080024from xos.config import Config, XOS_DIR
Sapan Bhatia003e84c2016-01-15 11:05:52 -050025from synchronizers.base.steps import *
Scott Baker45fb7a12013-12-31 00:56:19 -080026from syncstep import SyncStep
Sapan Bhatia45cbbc32014-03-11 17:48:30 -040027from toposort import toposort
Sapan Bhatia003e84c2016-01-15 11:05:52 -050028from synchronizers.base.error_mapper import *
Sapan Bhatiab570e0e2016-01-15 11:32:25 -050029from synchronizers.openstack.openstacksyncstep import OpenStackSyncStep
Sapan Bhatia67ea0d72016-01-14 11:41:38 -050030from synchronizers.base.steps.sync_object import SyncObject
Sapan Bhatiaf51edea2015-11-03 13:53:11 -050031
32# Load app models
33
Sapan Bhatia193dc342015-11-03 15:35:54 -050034try:
Sapan Bhatia891e0182015-11-18 18:20:46 +010035 app_module_names = Config().observer_applist.split(',')
Sapan Bhatia193dc342015-11-03 15:35:54 -050036except AttributeError:
37 app_module_names = []
38
Sapan Bhatiaf51edea2015-11-03 13:53:11 -050039if (type(app_module_names)!=list):
40 app_module_names=[app_module_names]
41
42app_modules = []
43
44for m in app_module_names:
45 model_path = m+'.models'
46 module = __import__(model_path,fromlist=[m])
47 app_modules.append(module)
Sapan Bhatiacb6f8d62015-01-17 01:03:52 +000048
Sapan Bhatia24836f12013-08-27 10:16:05 -040049
Sapan Bhatia13c7f112013-09-02 14:19:35 -040050debug_mode = False
Sapan Bhatia24836f12013-08-27 10:16:05 -040051
Sapan Bhatiacb6f8d62015-01-17 01:03:52 +000052class bcolors:
53 HEADER = '\033[95m'
54 OKBLUE = '\033[94m'
55 OKGREEN = '\033[92m'
56 WARNING = '\033[93m'
57 FAIL = '\033[91m'
58 ENDC = '\033[0m'
59 BOLD = '\033[1m'
60 UNDERLINE = '\033[4m'
61
Andy Bavier04111b72013-10-22 16:47:10 -040062logger = Logger(level=logging.INFO)
Sapan Bhatia24836f12013-08-27 10:16:05 -040063
Sapan Bhatia13c7f112013-09-02 14:19:35 -040064class StepNotReady(Exception):
Sapan Bhatiaf73664b2014-04-28 13:07:18 -040065 pass
Sapan Bhatia24836f12013-08-27 10:16:05 -040066
Scott Baker7771f412014-01-02 16:36:41 -080067class NoOpDriver:
Sapan Bhatiaf73664b2014-04-28 13:07:18 -040068 def __init__(self):
69 self.enabled = True
Sapan Bhatiaab202a62014-09-03 11:30:21 -040070 self.dependency_graph = None
71
72STEP_STATUS_WORKING=1
73STEP_STATUS_OK=2
74STEP_STATUS_KO=3
75
76def invert_graph(g):
77 ig = {}
78 for k,v in g.items():
79 for v0 in v:
80 try:
81 ig[v0].append(k)
82 except:
Sapan Bhatia47944b32015-03-04 10:21:06 -050083 ig[v0]=[k]
Sapan Bhatiaab202a62014-09-03 11:30:21 -040084 return ig
Scott Baker7771f412014-01-02 16:36:41 -080085
Scott Baker286a78f2015-02-18 16:13:48 -080086class XOSObserver:
Sapan Bhatiaf73664b2014-04-28 13:07:18 -040087 sync_steps = []
Sapan Bhatia24836f12013-08-27 10:16:05 -040088
Scott Bakerf4da7902015-09-17 21:37:44 -070089
Sapan Bhatiaf73664b2014-04-28 13:07:18 -040090 def __init__(self):
91 # The Condition object that gets signalled by Feefie events
92 self.step_lookup = {}
93 self.load_sync_step_modules()
94 self.load_sync_steps()
95 self.event_cond = threading.Condition()
Scott Baker7771f412014-01-02 16:36:41 -080096
Sapan Bhatiaf73664b2014-04-28 13:07:18 -040097 self.driver_kind = getattr(Config(), "observer_driver", "openstack")
Scott Bakerf0db81c2015-03-31 15:06:55 -070098 self.observer_name = getattr(Config(), "observer_name", "")
Sapan Bhatiaf73664b2014-04-28 13:07:18 -040099 if self.driver_kind=="openstack":
100 self.driver = OpenStackDriver()
101 else:
102 self.driver = NoOpDriver()
Sapan Bhatia24836f12013-08-27 10:16:05 -0400103
Scott Bakerf4da7902015-09-17 21:37:44 -0700104 def consolePrint(self, what):
105 if getattr(Config(), "observer_console_print", True):
106 print what
107
Sapan Bhatiaf73664b2014-04-28 13:07:18 -0400108 def wait_for_event(self, timeout):
109 self.event_cond.acquire()
110 self.event_cond.wait(timeout)
111 self.event_cond.release()
Scott Baker45fb7a12013-12-31 00:56:19 -0800112
Sapan Bhatiaf73664b2014-04-28 13:07:18 -0400113 def wake_up(self):
114 logger.info('Wake up routine called. Event cond %r'%self.event_cond)
115 self.event_cond.acquire()
116 self.event_cond.notify()
117 self.event_cond.release()
Sapan Bhatia24836f12013-08-27 10:16:05 -0400118
Sapan Bhatiaf73664b2014-04-28 13:07:18 -0400119 def load_sync_step_modules(self, step_dir=None):
120 if step_dir is None:
121 if hasattr(Config(), "observer_steps_dir"):
122 step_dir = Config().observer_steps_dir
123 else:
Sapan Bhatiace915222016-01-15 11:34:05 -0500124 step_dir = XOS_DIR + "/synchronizers/openstack/steps"
Scott Baker45fb7a12013-12-31 00:56:19 -0800125
Sapan Bhatiaf73664b2014-04-28 13:07:18 -0400126 for fn in os.listdir(step_dir):
127 pathname = os.path.join(step_dir,fn)
128 if os.path.isfile(pathname) and fn.endswith(".py") and (fn!="__init__.py"):
129 module = imp.load_source(fn[:-3],pathname)
130 for classname in dir(module):
131 c = getattr(module, classname, None)
Scott Baker45fb7a12013-12-31 00:56:19 -0800132
Sapan Bhatiaf73664b2014-04-28 13:07:18 -0400133 # make sure 'c' is a descendent of SyncStep and has a
134 # provides field (this eliminates the abstract base classes
135 # since they don't have a provides)
Scott Baker45fb7a12013-12-31 00:56:19 -0800136
Sapan Bhatia43c7f8c2015-01-17 01:04:10 +0000137 if inspect.isclass(c) and (issubclass(c, SyncStep) or issubclass(c,OpenStackSyncStep)) and hasattr(c,"provides") and (c not in self.sync_steps):
Sapan Bhatiaf73664b2014-04-28 13:07:18 -0400138 self.sync_steps.append(c)
139 logger.info('loaded sync steps: %s' % ",".join([x.__name__ for x in self.sync_steps]))
Scott Baker45fb7a12013-12-31 00:56:19 -0800140
Sapan Bhatiaf73664b2014-04-28 13:07:18 -0400141 def load_sync_steps(self):
142 dep_path = Config().observer_dependency_graph
143 logger.info('Loading model dependency graph from %s' % dep_path)
144 try:
145 # This contains dependencies between records, not sync steps
146 self.model_dependency_graph = json.loads(open(dep_path).read())
Sapan Bhatia709bebd2015-02-08 06:35:36 +0000147 for left,lst in self.model_dependency_graph.items():
148 new_lst = []
Sapan Bhatia6b6c2182015-01-27 03:58:11 +0000149 for k in lst:
150 try:
Sapan Bhatia709bebd2015-02-08 06:35:36 +0000151 tup = (k,k.lower())
152 new_lst.append(tup)
Sapan Bhatia6b6c2182015-01-27 03:58:11 +0000153 deps = self.model_dependency_graph[k]
154 except:
155 self.model_dependency_graph[k] = []
Sapan Bhatia709bebd2015-02-08 06:35:36 +0000156
157 self.model_dependency_graph[left] = new_lst
Sapan Bhatiaf73664b2014-04-28 13:07:18 -0400158 except Exception,e:
159 raise e
Sapan Bhatia24836f12013-08-27 10:16:05 -0400160
Sapan Bhatiaf73664b2014-04-28 13:07:18 -0400161 try:
162 backend_path = Config().observer_pl_dependency_graph
163 logger.info('Loading backend dependency graph from %s' % backend_path)
164 # This contains dependencies between backend records
165 self.backend_dependency_graph = json.loads(open(backend_path).read())
Sapan Bhatia0926e652015-01-29 20:51:13 +0000166 for k,v in self.backend_dependency_graph.items():
167 try:
168 self.model_dependency_graph[k].extend(v)
169 except KeyError:
170 self.model_dependency_graphp[k] = v
171
Sapan Bhatiaf73664b2014-04-28 13:07:18 -0400172 except Exception,e:
173 logger.info('Backend dependency graph not loaded')
174 # We can work without a backend graph
175 self.backend_dependency_graph = {}
Sapan Bhatia24836f12013-08-27 10:16:05 -0400176
Sapan Bhatiaf73664b2014-04-28 13:07:18 -0400177 provides_dict = {}
178 for s in self.sync_steps:
179 self.step_lookup[s.__name__] = s
180 for m in s.provides:
181 try:
182 provides_dict[m.__name__].append(s.__name__)
183 except KeyError:
184 provides_dict[m.__name__]=[s.__name__]
Sapan Bhatia04c94ad2013-09-02 18:00:28 -0400185
Sapan Bhatiaf73664b2014-04-28 13:07:18 -0400186 step_graph = {}
Sapan Bhatiaf51edea2015-11-03 13:53:11 -0500187 phantom_steps = []
Sapan Bhatia709bebd2015-02-08 06:35:36 +0000188 for k,v in self.model_dependency_graph.items():
Sapan Bhatiaf73664b2014-04-28 13:07:18 -0400189 try:
190 for source in provides_dict[k]:
Sapan Bhatia6b6c2182015-01-27 03:58:11 +0000191 if (not v):
192 step_graph[source] = []
193
Sapan Bhatia709bebd2015-02-08 06:35:36 +0000194 for m,_ in v:
Sapan Bhatiaf73664b2014-04-28 13:07:18 -0400195 try:
196 for dest in provides_dict[m]:
197 # no deps, pass
198 try:
199 if (dest not in step_graph[source]):
200 step_graph[source].append(dest)
201 except:
202 step_graph[source]=[dest]
203 except KeyError:
Sapan Bhatia3bf42b82015-09-10 11:07:32 -0400204 if (not provides_dict.has_key(m)):
Sapan Bhatiaf51edea2015-11-03 13:53:11 -0500205 try:
206 step_graph[source]+=['#%s'%m]
207 except:
208 step_graph[source]=['#%s'%m]
209
210 phantom_steps+=['#%s'%m]
Sapan Bhatiaf73664b2014-04-28 13:07:18 -0400211 pass
212
213 except KeyError:
214 pass
215 # no dependencies, pass
216
Sapan Bhatia24836f12013-08-27 10:16:05 -0400217
Sapan Bhatiaab202a62014-09-03 11:30:21 -0400218 self.dependency_graph = step_graph
219 self.deletion_dependency_graph = invert_graph(step_graph)
Sapan Bhatia24836f12013-08-27 10:16:05 -0400220
Sapan Bhatia6b6c2182015-01-27 03:58:11 +0000221 pp = pprint.PrettyPrinter(indent=4)
Scott Bakerf4da7902015-09-17 21:37:44 -0700222 logger.info(pp.pformat(step_graph))
Sapan Bhatiaf51edea2015-11-03 13:53:11 -0500223 self.ordered_steps = toposort(self.dependency_graph, phantom_steps+map(lambda s:s.__name__,self.sync_steps))
224 self.ordered_steps = [i for i in self.ordered_steps if i!='SyncObject']
Sapan Bhatia6b6c2182015-01-27 03:58:11 +0000225
Scott Bakerf4da7902015-09-17 21:37:44 -0700226 logger.info("Order of steps=%s" % self.ordered_steps)
Sapan Bhatia6b6c2182015-01-27 03:58:11 +0000227
Sapan Bhatiaf73664b2014-04-28 13:07:18 -0400228 self.load_run_times()
229
Sapan Bhatia24836f12013-08-27 10:16:05 -0400230
Sapan Bhatiaf73664b2014-04-28 13:07:18 -0400231 def check_duration(self, step, duration):
232 try:
233 if (duration > step.deadline):
234 logger.info('Sync step %s missed deadline, took %.2f seconds'%(step.name,duration))
235 except AttributeError:
236 # S doesn't have a deadline
237 pass
Sapan Bhatia24836f12013-08-27 10:16:05 -0400238
Sapan Bhatia285decb2014-04-30 00:31:44 -0400239 def update_run_time(self, step, deletion):
240 if (not deletion):
241 self.last_run_times[step.__name__]=time.time()
242 else:
243 self.last_deletion_run_times[step.__name__]=time.time()
Sapan Bhatia13c7f112013-09-02 14:19:35 -0400244
Sapan Bhatia285decb2014-04-30 00:31:44 -0400245
246 def check_schedule(self, step, deletion):
247 last_run_times = self.last_run_times if not deletion else self.last_deletion_run_times
248
249 time_since_last_run = time.time() - last_run_times.get(step.__name__, 0)
Sapan Bhatiaf73664b2014-04-28 13:07:18 -0400250 try:
251 if (time_since_last_run < step.requested_interval):
252 raise StepNotReady
253 except AttributeError:
254 logger.info('Step %s does not have requested_interval set'%step.__name__)
255 raise StepNotReady
256
257 def load_run_times(self):
258 try:
Scott Bakerf0db81c2015-03-31 15:06:55 -0700259 jrun_times = open('/tmp/%sobserver_run_times'%self.observer_name).read()
Sapan Bhatiaf73664b2014-04-28 13:07:18 -0400260 self.last_run_times = json.loads(jrun_times)
261 except:
262 self.last_run_times={}
263 for e in self.ordered_steps:
264 self.last_run_times[e]=0
Sapan Bhatia285decb2014-04-30 00:31:44 -0400265 try:
Scott Bakerf0db81c2015-03-31 15:06:55 -0700266 jrun_times = open('/tmp/%sobserver_deletion_run_times'%self.observer_name).read()
Sapan Bhatia285decb2014-04-30 00:31:44 -0400267 self.last_deletion_run_times = json.loads(jrun_times)
268 except:
269 self.last_deletion_run_times={}
270 for e in self.ordered_steps:
271 self.last_deletion_run_times[e]=0
272
Sapan Bhatiaf51edea2015-11-03 13:53:11 -0500273 def lookup_step_class(self,s):
274 if ('#' in s):
275 return SyncObject
276 else:
277 step = self.step_lookup[s]
278 return step
279
Sapan Bhatia3bf42b82015-09-10 11:07:32 -0400280 def lookup_step(self,s):
281 if ('#' in s):
282 objname = s[1:]
283 so = SyncObject()
Sapan Bhatiaf51edea2015-11-03 13:53:11 -0500284
285 try:
286 obj = globals()[objname]
287 except:
288 for m in app_modules:
289 if (hasattr(m,objname)):
290 obj = getattr(m,objname)
291
292 so.provides=[obj]
293 so.observes=[obj]
Sapan Bhatia3bf42b82015-09-10 11:07:32 -0400294 step = so
295 else:
Sapan Bhatiaf51edea2015-11-03 13:53:11 -0500296 step_class = self.step_lookup[s]
297 step = step_class(driver=self.driver,error_map=self.error_mapper)
Sapan Bhatia3bf42b82015-09-10 11:07:32 -0400298 return step
299
Sapan Bhatiaf73664b2014-04-28 13:07:18 -0400300 def save_run_times(self):
301 run_times = json.dumps(self.last_run_times)
Scott Bakerf0db81c2015-03-31 15:06:55 -0700302 open('/tmp/%sobserver_run_times'%self.observer_name,'w').write(run_times)
Sapan Bhatia36938ca2013-09-02 14:35:24 -0400303
Sapan Bhatia285decb2014-04-30 00:31:44 -0400304 deletion_run_times = json.dumps(self.last_deletion_run_times)
Scott Bakerf0db81c2015-03-31 15:06:55 -0700305 open('/tmp/%sobserver_deletion_run_times'%self.observer_name,'w').write(deletion_run_times)
Sapan Bhatia285decb2014-04-30 00:31:44 -0400306
Sapan Bhatiaf73664b2014-04-28 13:07:18 -0400307 def check_class_dependency(self, step, failed_steps):
308 step.dependenices = []
309 for obj in step.provides:
Sapan Bhatia709bebd2015-02-08 06:35:36 +0000310 lst = self.model_dependency_graph.get(obj.__name__, [])
311 nlst = map(lambda(a,b):b,lst)
312 step.dependenices.extend(nlst)
Sapan Bhatiaf73664b2014-04-28 13:07:18 -0400313 for failed_step in failed_steps:
314 if (failed_step in step.dependencies):
315 raise StepNotReady
316
Sapan Bhatiaab202a62014-09-03 11:30:21 -0400317 def sync(self, S, deletion):
Scott Bakerc7ca6552014-09-05 14:48:38 -0700318 try:
Sapan Bhatiaf51edea2015-11-03 13:53:11 -0500319 step = self.lookup_step_class(S)
Sapan Bhatiaab202a62014-09-03 11:30:21 -0400320 start_time=time.time()
Scott Bakeradc73172014-09-04 10:36:51 -0700321
Scott Bakerc9e18622015-03-09 16:25:11 -0700322 logger.info("Starting to work on step %s, deletion=%s" % (step.__name__, str(deletion)))
Scott Bakerf0db81c2015-03-31 15:06:55 -0700323
Sapan Bhatiaab202a62014-09-03 11:30:21 -0400324 dependency_graph = self.dependency_graph if not deletion else self.deletion_dependency_graph
Sapan Bhatia47944b32015-03-04 10:21:06 -0500325 step_conditions = self.step_conditions# if not deletion else self.deletion_step_conditions
326 step_status = self.step_status# if not deletion else self.deletion_step_status
Sapan Bhatia51f48932014-08-25 04:17:12 -0400327
Sapan Bhatiaab202a62014-09-03 11:30:21 -0400328 # Wait for step dependencies to be met
329 try:
Sapan Bhatia47944b32015-03-04 10:21:06 -0500330 deps = dependency_graph[S]
Sapan Bhatiaab202a62014-09-03 11:30:21 -0400331 has_deps = True
332 except KeyError:
333 has_deps = False
Sapan Bhatia51f48932014-08-25 04:17:12 -0400334
Sapan Bhatia6b6c2182015-01-27 03:58:11 +0000335 go = True
Sapan Bhatia475c5972014-11-05 10:32:41 -0500336
Sapan Bhatia6b6c2182015-01-27 03:58:11 +0000337 failed_dep = None
Sapan Bhatiaab202a62014-09-03 11:30:21 -0400338 if (has_deps):
339 for d in deps:
Scott Bakeradc73172014-09-04 10:36:51 -0700340 if d==step.__name__:
341 logger.info(" step %s self-wait skipped" % step.__name__)
Sapan Bhatia475c5972014-11-05 10:32:41 -0500342 go = True
Scott Bakeradc73172014-09-04 10:36:51 -0700343 continue
344
Sapan Bhatia47944b32015-03-04 10:21:06 -0500345 cond = step_conditions[d]
Sapan Bhatiaab202a62014-09-03 11:30:21 -0400346 cond.acquire()
Sapan Bhatia47944b32015-03-04 10:21:06 -0500347 if (step_status[d] is STEP_STATUS_WORKING):
Scott Bakeradc73172014-09-04 10:36:51 -0700348 logger.info(" step %s wait on dep %s" % (step.__name__, d))
Sapan Bhatiaab202a62014-09-03 11:30:21 -0400349 cond.wait()
Sapan Bhatia47944b32015-03-04 10:21:06 -0500350 elif step_status[d] == STEP_STATUS_OK:
Sapan Bhatia6b6c2182015-01-27 03:58:11 +0000351 go = True
352 else:
353 go = False
354 failed_dep = d
Sapan Bhatiaab202a62014-09-03 11:30:21 -0400355 cond.release()
Sapan Bhatia6b6c2182015-01-27 03:58:11 +0000356 if (not go):
357 break
Sapan Bhatiaab202a62014-09-03 11:30:21 -0400358 else:
359 go = True
360
361 if (not go):
Scott Bakerf4da7902015-09-17 21:37:44 -0700362 self.consolePrint(bcolors.FAIL + "Step %r skipped on %r" % (step,failed_dep) + bcolors.ENDC)
Scott Bakeradc73172014-09-04 10:36:51 -0700363 # SMBAKER: sync_step was not defined here, so I changed
364 # this from 'sync_step' to 'step'. Verify.
365 self.failed_steps.append(step)
Sapan Bhatiaab202a62014-09-03 11:30:21 -0400366 my_status = STEP_STATUS_KO
367 else:
Sapan Bhatiaf51edea2015-11-03 13:53:11 -0500368 sync_step = self.lookup_step(S)
Sapan Bhatia709bebd2015-02-08 06:35:36 +0000369 sync_step. __name__= step.__name__
Sapan Bhatia51f48932014-08-25 04:17:12 -0400370 sync_step.dependencies = []
371 try:
372 mlist = sync_step.provides
Scott Bakeradc73172014-09-04 10:36:51 -0700373
Sapan Bhatiaf51edea2015-11-03 13:53:11 -0500374 try:
375 for m in mlist:
376 lst = self.model_dependency_graph[m.__name__]
377 nlst = map(lambda(a,b):b,lst)
378 sync_step.dependencies.extend(nlst)
379 except Exception,e:
380 raise e
381
Sapan Bhatia51f48932014-08-25 04:17:12 -0400382 except KeyError:
383 pass
384 sync_step.debug_mode = debug_mode
385
386 should_run = False
387 try:
388 # Various checks that decide whether
389 # this step runs or not
Sapan Bhatiaab202a62014-09-03 11:30:21 -0400390 self.check_class_dependency(sync_step, self.failed_steps) # dont run Slices if Sites failed
Sapan Bhatia51f48932014-08-25 04:17:12 -0400391 self.check_schedule(sync_step, deletion) # dont run sync_network_routes if time since last run < 1 hour
392 should_run = True
393 except StepNotReady:
Scott Bakeradc73172014-09-04 10:36:51 -0700394 logger.info('Step not ready: %s'%sync_step.__name__)
Sapan Bhatiaab202a62014-09-03 11:30:21 -0400395 self.failed_steps.append(sync_step)
396 my_status = STEP_STATUS_KO
Sapan Bhatia51f48932014-08-25 04:17:12 -0400397 except Exception,e:
Scott Bakeradc73172014-09-04 10:36:51 -0700398 logger.error('%r' % e)
Sapan Bhatia51f48932014-08-25 04:17:12 -0400399 logger.log_exc("sync step failed: %r. Deletion: %r"%(sync_step,deletion))
Sapan Bhatiaab202a62014-09-03 11:30:21 -0400400 self.failed_steps.append(sync_step)
401 my_status = STEP_STATUS_KO
Sapan Bhatia51f48932014-08-25 04:17:12 -0400402
403 if (should_run):
404 try:
405 duration=time.time() - start_time
406
Scott Baker1d45f662015-08-21 16:41:47 -0700407 logger.info('Executing step %s, deletion=%s' % (sync_step.__name__, deletion))
Sapan Bhatia51f48932014-08-25 04:17:12 -0400408
Scott Bakerf4da7902015-09-17 21:37:44 -0700409 self.consolePrint(bcolors.OKBLUE + "Executing step %s" % sync_step.__name__ + bcolors.ENDC)
Sapan Bhatiaab202a62014-09-03 11:30:21 -0400410 failed_objects = sync_step(failed=list(self.failed_step_objects), deletion=deletion)
Sapan Bhatia51f48932014-08-25 04:17:12 -0400411
412 self.check_duration(sync_step, duration)
Sapan Bhatia51f48932014-08-25 04:17:12 -0400413
Sapan Bhatiaab202a62014-09-03 11:30:21 -0400414 if failed_objects:
415 self.failed_step_objects.update(failed_objects)
416
Scott Baker1d45f662015-08-21 16:41:47 -0700417 logger.info("Step %r succeeded" % sync_step.__name__)
Scott Bakerf4da7902015-09-17 21:37:44 -0700418 self.consolePrint(bcolors.OKGREEN + "Step %r succeeded" % sync_step.__name__ + bcolors.ENDC)
Sapan Bhatiaab202a62014-09-03 11:30:21 -0400419 my_status = STEP_STATUS_OK
Sapan Bhatia51f48932014-08-25 04:17:12 -0400420 self.update_run_time(sync_step,deletion)
421 except Exception,e:
Scott Bakerf4da7902015-09-17 21:37:44 -0700422 self.consolePrint(bcolors.FAIL + "Model step %r failed" % (sync_step.__name__) + bcolors.ENDC)
Scott Baker1d45f662015-08-21 16:41:47 -0700423 logger.error('Model step %r failed. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!' % (sync_step.__name__, e))
Sapan Bhatia51f48932014-08-25 04:17:12 -0400424 logger.log_exc(e)
Sapan Bhatiaab202a62014-09-03 11:30:21 -0400425 self.failed_steps.append(S)
426 my_status = STEP_STATUS_KO
427 else:
Scott Bakeradc73172014-09-04 10:36:51 -0700428 logger.info("Step %r succeeded due to non-run" % step)
Sapan Bhatiaab202a62014-09-03 11:30:21 -0400429 my_status = STEP_STATUS_OK
Scott Bakeradc73172014-09-04 10:36:51 -0700430
Sapan Bhatiaab202a62014-09-03 11:30:21 -0400431 try:
Sapan Bhatia47944b32015-03-04 10:21:06 -0500432 my_cond = step_conditions[S]
Sapan Bhatiaab202a62014-09-03 11:30:21 -0400433 my_cond.acquire()
Sapan Bhatia47944b32015-03-04 10:21:06 -0500434 step_status[S]=my_status
Sapan Bhatiaab202a62014-09-03 11:30:21 -0400435 my_cond.notify_all()
436 my_cond.release()
437 except KeyError,e:
Scott Bakeradc73172014-09-04 10:36:51 -0700438 logger.info('Step %r is a leaf' % step)
Sapan Bhatiaab202a62014-09-03 11:30:21 -0400439 pass
Scott Bakerc7ca6552014-09-05 14:48:38 -0700440 finally:
Scott Baker8bbc77c2015-06-22 10:56:16 -0700441 try:
442 reset_queries()
443 except:
444 # this shouldn't happen, but in case it does, catch it...
445 logger.log_exc("exception in reset_queries")
446
Scott Bakerc7ca6552014-09-05 14:48:38 -0700447 connection.close()
Sapan Bhatiaab202a62014-09-03 11:30:21 -0400448
Sapan Bhatiaf73664b2014-04-28 13:07:18 -0400449 def run(self):
450 if not self.driver.enabled:
451 return
Sapan Bhatiaab202a62014-09-03 11:30:21 -0400452
Sapan Bhatiaf73664b2014-04-28 13:07:18 -0400453 if (self.driver_kind=="openstack") and (not self.driver.has_openstack):
454 return
455
456 while True:
Scott Bakere611f0a2015-09-17 16:58:36 -0700457 logger.info('Waiting for event')
458 self.wait_for_event(timeout=5)
459 logger.info('Observer woke up')
Sapan Bhatiaab202a62014-09-03 11:30:21 -0400460
Scott Bakere611f0a2015-09-17 16:58:36 -0700461 self.run_once()
Sapan Bhatiaf73664b2014-04-28 13:07:18 -0400462
Scott Bakere611f0a2015-09-17 16:58:36 -0700463 def run_once(self):
464 try:
465 loop_start = time.time()
466 error_map_file = getattr(Config(), "error_map_path", XOS_DIR + "/error_map.txt")
467 self.error_mapper = ErrorMapper(error_map_file)
Sapan Bhatia47944b32015-03-04 10:21:06 -0500468
Scott Bakere611f0a2015-09-17 16:58:36 -0700469 # Two passes. One for sync, the other for deletion.
470 for deletion in [False,True]:
471 # Set of individual objects within steps that failed
472 self.failed_step_objects = set()
Sapan Bhatia47944b32015-03-04 10:21:06 -0500473
Scott Bakere611f0a2015-09-17 16:58:36 -0700474 # Set up conditions and step status
475 # This is needed for steps to run in parallel
476 # while obeying dependencies.
Sapan Bhatia47944b32015-03-04 10:21:06 -0500477
Scott Bakere611f0a2015-09-17 16:58:36 -0700478 providers = set()
479 dependency_graph = self.dependency_graph if not deletion else self.deletion_dependency_graph
Sapan Bhatia47944b32015-03-04 10:21:06 -0500480
Scott Bakere611f0a2015-09-17 16:58:36 -0700481 for v in dependency_graph.values():
482 if (v):
483 providers.update(v)
Sapan Bhatia47944b32015-03-04 10:21:06 -0500484
Scott Bakere611f0a2015-09-17 16:58:36 -0700485 self.step_conditions = {}
486 self.step_status = {}
Sapan Bhatia47944b32015-03-04 10:21:06 -0500487
Scott Bakere611f0a2015-09-17 16:58:36 -0700488 for p in list(providers):
489 self.step_conditions[p] = threading.Condition()
Sapan Bhatia47944b32015-03-04 10:21:06 -0500490
Scott Bakere611f0a2015-09-17 16:58:36 -0700491 self.step_status[p] = STEP_STATUS_WORKING
Sapan Bhatia47944b32015-03-04 10:21:06 -0500492
Scott Bakere611f0a2015-09-17 16:58:36 -0700493 self.failed_steps = []
Sapan Bhatiaab202a62014-09-03 11:30:21 -0400494
Scott Bakere611f0a2015-09-17 16:58:36 -0700495 threads = []
496 logger.info('Deletion=%r...'%deletion)
497 schedule = self.ordered_steps if not deletion else reversed(self.ordered_steps)
Sapan Bhatiaab202a62014-09-03 11:30:21 -0400498
Scott Bakere611f0a2015-09-17 16:58:36 -0700499 for S in schedule:
500 thread = threading.Thread(target=self.sync, args=(S, deletion))
Sapan Bhatia285decb2014-04-30 00:31:44 -0400501
Scott Bakere611f0a2015-09-17 16:58:36 -0700502 logger.info('Deletion=%r...'%deletion)
503 threads.append(thread)
Sapan Bhatiaf73664b2014-04-28 13:07:18 -0400504
Scott Bakere611f0a2015-09-17 16:58:36 -0700505 # Start threads
506 for t in threads:
507 t.start()
Scott Baker8bbc77c2015-06-22 10:56:16 -0700508
Scott Bakere611f0a2015-09-17 16:58:36 -0700509 # another spot to clean up debug state
510 try:
511 reset_queries()
512 except:
513 # this shouldn't happen, but in case it does, catch it...
514 logger.log_exc("exception in reset_queries")
Sapan Bhatia285decb2014-04-30 00:31:44 -0400515
Scott Bakere611f0a2015-09-17 16:58:36 -0700516 # Wait for all threads to finish before continuing with the run loop
517 for t in threads:
518 t.join()
Scott Baker8bbc77c2015-06-22 10:56:16 -0700519
Scott Bakere611f0a2015-09-17 16:58:36 -0700520 self.save_run_times()
521
522 loop_end = time.time()
523 open('/tmp/%sobserver_last_run'%self.observer_name,'w').write(json.dumps({'last_run': loop_end, 'last_duration':loop_end - loop_start}))
524 except Exception, e:
525 logger.error('Core error. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!' % e)
526 logger.log_exc("Exception in observer run loop")
527 traceback.print_exc()