blob: c1b9cda6792aa854d52a8484e63e5eea30675bd5 [file] [log] [blame]
Sapan Bhatia037c9472016-01-14 11:44:43 -05001import os
2import imp
3import inspect
4import time
5import sys
6import traceback
7import commands
8import threading
9import json
10import pdb
11import pprint
12
13
14from datetime import datetime
15from collections import defaultdict
16from core.models import *
17from django.db.models import F, Q
18from django.db import connection
19from django.db import reset_queries
20#from openstack.manager import OpenStackManager
21from openstack.driver import OpenStackDriver
Scott Baker3f417a82016-01-14 16:07:32 -080022from xos.logger import Logger, logging, logger
Sapan Bhatia037c9472016-01-14 11:44:43 -050023#from timeout import timeout
24from xos.config import Config, XOS_DIR
Sapan Bhatiaf0538b82016-01-15 11:05:52 -050025from synchronizers.base.steps import *
Sapan Bhatia037c9472016-01-14 11:44:43 -050026from syncstep import SyncStep
27from toposort import toposort
Sapan Bhatiaf0538b82016-01-15 11:05:52 -050028from synchronizers.base.error_mapper import *
Sapan Bhatia985d2262016-01-15 11:32:25 -050029from synchronizers.openstack.openstacksyncstep import OpenStackSyncStep
Sapan Bhatia037c9472016-01-14 11:44:43 -050030from synchronizers.base.steps.sync_object import SyncObject
31
32# Load app models
33
34try:
35 app_module_names = Config().observer_applist.split(',')
36except AttributeError:
37 app_module_names = []
38
39if (type(app_module_names)!=list):
40 app_module_names=[app_module_names]
41
42app_modules = []
43
44for m in app_module_names:
45 model_path = m+'.models'
46 module = __import__(model_path,fromlist=[m])
47 app_modules.append(module)
48
49
50debug_mode = False
51
52class bcolors:
53 HEADER = '\033[95m'
54 OKBLUE = '\033[94m'
55 OKGREEN = '\033[92m'
56 WARNING = '\033[93m'
57 FAIL = '\033[91m'
58 ENDC = '\033[0m'
59 BOLD = '\033[1m'
60 UNDERLINE = '\033[4m'
61
62logger = Logger(level=logging.INFO)
63
64class StepNotReady(Exception):
65 pass
66
67class NoOpDriver:
68 def __init__(self):
69 self.enabled = True
70 self.dependency_graph = None
71
72STEP_STATUS_WORKING=1
73STEP_STATUS_OK=2
74STEP_STATUS_KO=3
75
76def invert_graph(g):
77 ig = {}
78 for k,v in g.items():
79 for v0 in v:
80 try:
81 ig[v0].append(k)
82 except:
83 ig[v0]=[k]
84 return ig
85
86class XOSObserver:
87 sync_steps = []
88
89
90 def __init__(self):
91 # The Condition object that gets signalled by Feefie events
92 self.step_lookup = {}
93 self.load_sync_step_modules()
94 self.load_sync_steps()
95 self.event_cond = threading.Condition()
96
97 self.driver_kind = getattr(Config(), "observer_driver", "openstack")
98 self.observer_name = getattr(Config(), "observer_name", "")
99 if self.driver_kind=="openstack":
100 self.driver = OpenStackDriver()
101 else:
102 self.driver = NoOpDriver()
103
104 def consolePrint(self, what):
105 if getattr(Config(), "observer_console_print", True):
106 print what
107
108 def wait_for_event(self, timeout):
109 self.event_cond.acquire()
110 self.event_cond.wait(timeout)
111 self.event_cond.release()
112
113 def wake_up(self):
114 logger.info('Wake up routine called. Event cond %r'%self.event_cond)
115 self.event_cond.acquire()
116 self.event_cond.notify()
117 self.event_cond.release()
118
119 def load_sync_step_modules(self, step_dir=None):
120 if step_dir is None:
121 if hasattr(Config(), "observer_steps_dir"):
122 step_dir = Config().observer_steps_dir
123 else:
Sapan Bhatiaf4441882016-01-18 09:19:42 -0500124 step_dir = XOS_DIR + "/synchronizers/openstack/steps"
Sapan Bhatia037c9472016-01-14 11:44:43 -0500125
126 for fn in os.listdir(step_dir):
127 pathname = os.path.join(step_dir,fn)
128 if os.path.isfile(pathname) and fn.endswith(".py") and (fn!="__init__.py"):
129 module = imp.load_source(fn[:-3],pathname)
130 for classname in dir(module):
131 c = getattr(module, classname, None)
132
133 # make sure 'c' is a descendent of SyncStep and has a
134 # provides field (this eliminates the abstract base classes
135 # since they don't have a provides)
136
137 if inspect.isclass(c) and (issubclass(c, SyncStep) or issubclass(c,OpenStackSyncStep)) and hasattr(c,"provides") and (c not in self.sync_steps):
138 self.sync_steps.append(c)
139 logger.info('loaded sync steps: %s' % ",".join([x.__name__ for x in self.sync_steps]))
140
141 def load_sync_steps(self):
142 dep_path = Config().observer_dependency_graph
143 logger.info('Loading model dependency graph from %s' % dep_path)
144 try:
145 # This contains dependencies between records, not sync steps
146 self.model_dependency_graph = json.loads(open(dep_path).read())
147 for left,lst in self.model_dependency_graph.items():
148 new_lst = []
149 for k in lst:
150 try:
151 tup = (k,k.lower())
152 new_lst.append(tup)
153 deps = self.model_dependency_graph[k]
154 except:
155 self.model_dependency_graph[k] = []
156
157 self.model_dependency_graph[left] = new_lst
158 except Exception,e:
159 raise e
160
161 try:
162 backend_path = Config().observer_pl_dependency_graph
163 logger.info('Loading backend dependency graph from %s' % backend_path)
164 # This contains dependencies between backend records
165 self.backend_dependency_graph = json.loads(open(backend_path).read())
166 for k,v in self.backend_dependency_graph.items():
167 try:
168 self.model_dependency_graph[k].extend(v)
169 except KeyError:
170 self.model_dependency_graphp[k] = v
171
172 except Exception,e:
173 logger.info('Backend dependency graph not loaded')
174 # We can work without a backend graph
175 self.backend_dependency_graph = {}
176
177 provides_dict = {}
178 for s in self.sync_steps:
179 self.step_lookup[s.__name__] = s
180 for m in s.provides:
181 try:
182 provides_dict[m.__name__].append(s.__name__)
183 except KeyError:
184 provides_dict[m.__name__]=[s.__name__]
185
186 step_graph = {}
187 phantom_steps = []
188 for k,v in self.model_dependency_graph.items():
189 try:
190 for source in provides_dict[k]:
191 if (not v):
192 step_graph[source] = []
193
194 for m,_ in v:
195 try:
196 for dest in provides_dict[m]:
197 # no deps, pass
198 try:
199 if (dest not in step_graph[source]):
200 step_graph[source].append(dest)
201 except:
202 step_graph[source]=[dest]
203 except KeyError:
204 if (not provides_dict.has_key(m)):
205 try:
206 step_graph[source]+=['#%s'%m]
207 except:
208 step_graph[source]=['#%s'%m]
209
210 phantom_steps+=['#%s'%m]
211 pass
212
213 except KeyError:
214 pass
215 # no dependencies, pass
216
217
218 self.dependency_graph = step_graph
219 self.deletion_dependency_graph = invert_graph(step_graph)
220
221 pp = pprint.PrettyPrinter(indent=4)
222 logger.info(pp.pformat(step_graph))
223 self.ordered_steps = toposort(self.dependency_graph, phantom_steps+map(lambda s:s.__name__,self.sync_steps))
224 self.ordered_steps = [i for i in self.ordered_steps if i!='SyncObject']
225
226 logger.info("Order of steps=%s" % self.ordered_steps)
227
228 self.load_run_times()
229
230
231 def check_duration(self, step, duration):
232 try:
233 if (duration > step.deadline):
234 logger.info('Sync step %s missed deadline, took %.2f seconds'%(step.name,duration))
235 except AttributeError:
236 # S doesn't have a deadline
237 pass
238
239 def update_run_time(self, step, deletion):
240 if (not deletion):
241 self.last_run_times[step.__name__]=time.time()
242 else:
243 self.last_deletion_run_times[step.__name__]=time.time()
244
245
246 def check_schedule(self, step, deletion):
247 last_run_times = self.last_run_times if not deletion else self.last_deletion_run_times
248
249 time_since_last_run = time.time() - last_run_times.get(step.__name__, 0)
250 try:
251 if (time_since_last_run < step.requested_interval):
252 raise StepNotReady
253 except AttributeError:
254 logger.info('Step %s does not have requested_interval set'%step.__name__)
255 raise StepNotReady
256
257 def load_run_times(self):
258 try:
259 jrun_times = open('/tmp/%sobserver_run_times'%self.observer_name).read()
260 self.last_run_times = json.loads(jrun_times)
261 except:
262 self.last_run_times={}
263 for e in self.ordered_steps:
264 self.last_run_times[e]=0
265 try:
266 jrun_times = open('/tmp/%sobserver_deletion_run_times'%self.observer_name).read()
267 self.last_deletion_run_times = json.loads(jrun_times)
268 except:
269 self.last_deletion_run_times={}
270 for e in self.ordered_steps:
271 self.last_deletion_run_times[e]=0
272
273 def lookup_step_class(self,s):
274 if ('#' in s):
275 return SyncObject
276 else:
277 step = self.step_lookup[s]
278 return step
279
280 def lookup_step(self,s):
281 if ('#' in s):
282 objname = s[1:]
283 so = SyncObject()
284
285 try:
286 obj = globals()[objname]
287 except:
288 for m in app_modules:
289 if (hasattr(m,objname)):
290 obj = getattr(m,objname)
291
292 so.provides=[obj]
293 so.observes=[obj]
294 step = so
295 else:
296 step_class = self.step_lookup[s]
297 step = step_class(driver=self.driver,error_map=self.error_mapper)
298 return step
299
300 def save_run_times(self):
301 run_times = json.dumps(self.last_run_times)
302 open('/tmp/%sobserver_run_times'%self.observer_name,'w').write(run_times)
303
304 deletion_run_times = json.dumps(self.last_deletion_run_times)
305 open('/tmp/%sobserver_deletion_run_times'%self.observer_name,'w').write(deletion_run_times)
306
307 def check_class_dependency(self, step, failed_steps):
308 step.dependenices = []
309 for obj in step.provides:
310 lst = self.model_dependency_graph.get(obj.__name__, [])
311 nlst = map(lambda(a,b):b,lst)
312 step.dependenices.extend(nlst)
313 for failed_step in failed_steps:
314 if (failed_step in step.dependencies):
315 raise StepNotReady
316
317 def sync(self, S, deletion):
318 try:
319 step = self.lookup_step_class(S)
320 start_time=time.time()
321
322 logger.info("Starting to work on step %s, deletion=%s" % (step.__name__, str(deletion)))
323
324 dependency_graph = self.dependency_graph if not deletion else self.deletion_dependency_graph
325 step_conditions = self.step_conditions# if not deletion else self.deletion_step_conditions
326 step_status = self.step_status# if not deletion else self.deletion_step_status
327
328 # Wait for step dependencies to be met
329 try:
330 deps = dependency_graph[S]
331 has_deps = True
332 except KeyError:
333 has_deps = False
334
335 go = True
336
337 failed_dep = None
338 if (has_deps):
339 for d in deps:
340 if d==step.__name__:
341 logger.info(" step %s self-wait skipped" % step.__name__)
342 go = True
343 continue
344
345 cond = step_conditions[d]
346 cond.acquire()
347 if (step_status[d] is STEP_STATUS_WORKING):
348 logger.info(" step %s wait on dep %s" % (step.__name__, d))
349 cond.wait()
350 elif step_status[d] == STEP_STATUS_OK:
351 go = True
352 else:
353 go = False
354 failed_dep = d
355 cond.release()
356 if (not go):
357 break
358 else:
359 go = True
360
361 if (not go):
362 self.consolePrint(bcolors.FAIL + "Step %r skipped on %r" % (step,failed_dep) + bcolors.ENDC)
363 # SMBAKER: sync_step was not defined here, so I changed
364 # this from 'sync_step' to 'step'. Verify.
365 self.failed_steps.append(step)
366 my_status = STEP_STATUS_KO
367 else:
368 sync_step = self.lookup_step(S)
369 sync_step. __name__= step.__name__
370 sync_step.dependencies = []
371 try:
372 mlist = sync_step.provides
373
374 try:
375 for m in mlist:
376 lst = self.model_dependency_graph[m.__name__]
377 nlst = map(lambda(a,b):b,lst)
378 sync_step.dependencies.extend(nlst)
379 except Exception,e:
380 raise e
381
382 except KeyError:
383 pass
384 sync_step.debug_mode = debug_mode
385
386 should_run = False
387 try:
388 # Various checks that decide whether
389 # this step runs or not
390 self.check_class_dependency(sync_step, self.failed_steps) # dont run Slices if Sites failed
391 self.check_schedule(sync_step, deletion) # dont run sync_network_routes if time since last run < 1 hour
392 should_run = True
393 except StepNotReady:
394 logger.info('Step not ready: %s'%sync_step.__name__)
395 self.failed_steps.append(sync_step)
396 my_status = STEP_STATUS_KO
397 except Exception,e:
398 logger.error('%r' % e)
399 logger.log_exc("sync step failed: %r. Deletion: %r"%(sync_step,deletion))
400 self.failed_steps.append(sync_step)
401 my_status = STEP_STATUS_KO
402
403 if (should_run):
404 try:
405 duration=time.time() - start_time
406
407 logger.info('Executing step %s, deletion=%s' % (sync_step.__name__, deletion))
408
409 self.consolePrint(bcolors.OKBLUE + "Executing step %s" % sync_step.__name__ + bcolors.ENDC)
410 failed_objects = sync_step(failed=list(self.failed_step_objects), deletion=deletion)
411
412 self.check_duration(sync_step, duration)
413
414 if failed_objects:
415 self.failed_step_objects.update(failed_objects)
416
417 logger.info("Step %r succeeded" % sync_step.__name__)
418 self.consolePrint(bcolors.OKGREEN + "Step %r succeeded" % sync_step.__name__ + bcolors.ENDC)
419 my_status = STEP_STATUS_OK
420 self.update_run_time(sync_step,deletion)
421 except Exception,e:
422 self.consolePrint(bcolors.FAIL + "Model step %r failed" % (sync_step.__name__) + bcolors.ENDC)
423 logger.error('Model step %r failed. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!' % (sync_step.__name__, e))
Scott Bakerfa61bc42016-02-15 09:28:32 -0800424 logger.log_exc("Exception in sync step")
Sapan Bhatia037c9472016-01-14 11:44:43 -0500425 self.failed_steps.append(S)
426 my_status = STEP_STATUS_KO
427 else:
428 logger.info("Step %r succeeded due to non-run" % step)
429 my_status = STEP_STATUS_OK
430
431 try:
432 my_cond = step_conditions[S]
433 my_cond.acquire()
434 step_status[S]=my_status
435 my_cond.notify_all()
436 my_cond.release()
437 except KeyError,e:
438 logger.info('Step %r is a leaf' % step)
439 pass
440 finally:
441 try:
442 reset_queries()
443 except:
444 # this shouldn't happen, but in case it does, catch it...
445 logger.log_exc("exception in reset_queries")
446
447 connection.close()
448
449 def run(self):
450 if not self.driver.enabled:
451 return
452
453 if (self.driver_kind=="openstack") and (not self.driver.has_openstack):
454 return
455
456 while True:
457 logger.info('Waiting for event')
458 self.wait_for_event(timeout=5)
459 logger.info('Observer woke up')
460
461 self.run_once()
462
463 def run_once(self):
464 try:
465 loop_start = time.time()
466 error_map_file = getattr(Config(), "error_map_path", XOS_DIR + "/error_map.txt")
467 self.error_mapper = ErrorMapper(error_map_file)
468
469 # Two passes. One for sync, the other for deletion.
470 for deletion in [False,True]:
471 # Set of individual objects within steps that failed
472 self.failed_step_objects = set()
473
474 # Set up conditions and step status
475 # This is needed for steps to run in parallel
476 # while obeying dependencies.
477
478 providers = set()
479 dependency_graph = self.dependency_graph if not deletion else self.deletion_dependency_graph
480
481 for v in dependency_graph.values():
482 if (v):
483 providers.update(v)
484
485 self.step_conditions = {}
486 self.step_status = {}
487
488 for p in list(providers):
489 self.step_conditions[p] = threading.Condition()
490
491 self.step_status[p] = STEP_STATUS_WORKING
492
493 self.failed_steps = []
494
495 threads = []
496 logger.info('Deletion=%r...'%deletion)
497 schedule = self.ordered_steps if not deletion else reversed(self.ordered_steps)
498
499 for S in schedule:
500 thread = threading.Thread(target=self.sync, args=(S, deletion))
501
502 logger.info('Deletion=%r...'%deletion)
503 threads.append(thread)
504
505 # Start threads
506 for t in threads:
507 t.start()
508
509 # another spot to clean up debug state
510 try:
511 reset_queries()
512 except:
513 # this shouldn't happen, but in case it does, catch it...
514 logger.log_exc("exception in reset_queries")
515
516 # Wait for all threads to finish before continuing with the run loop
517 for t in threads:
518 t.join()
519
520 self.save_run_times()
521
522 loop_end = time.time()
523 open('/tmp/%sobserver_last_run'%self.observer_name,'w').write(json.dumps({'last_run': loop_end, 'last_duration':loop_end - loop_start}))
524 except Exception, e:
525 logger.error('Core error. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!' % e)
526 logger.log_exc("Exception in observer run loop")
527 traceback.print_exc()