blob: 7c71d2bdfea05f44c36c44d7bc718698a922be8c [file] [log] [blame]
Sapan Bhatia037c9472016-01-14 11:44:43 -05001import os
2import imp
3import inspect
4import time
5import sys
6import traceback
7import commands
8import threading
9import json
10import pdb
11import pprint
Scott Baker675fe272016-06-09 19:05:12 -070012import traceback
Sapan Bhatia037c9472016-01-14 11:44:43 -050013
14
15from datetime import datetime
16from collections import defaultdict
17from core.models import *
18from django.db.models import F, Q
19from django.db import connection
20from django.db import reset_queries
Zack Williams34408ac2016-04-27 12:50:31 -070021#from openstack_xos.manager import OpenStackManager
22from openstack_xos.driver import OpenStackDriver
Scott Baker3f417a82016-01-14 16:07:32 -080023from xos.logger import Logger, logging, logger
Sapan Bhatia037c9472016-01-14 11:44:43 -050024#from timeout import timeout
25from xos.config import Config, XOS_DIR
Sapan Bhatiaf0538b82016-01-15 11:05:52 -050026from synchronizers.base.steps import *
Sapan Bhatia037c9472016-01-14 11:44:43 -050027from syncstep import SyncStep
28from toposort import toposort
Sapan Bhatiaf0538b82016-01-15 11:05:52 -050029from synchronizers.base.error_mapper import *
Sapan Bhatia985d2262016-01-15 11:32:25 -050030from synchronizers.openstack.openstacksyncstep import OpenStackSyncStep
Sapan Bhatia037c9472016-01-14 11:44:43 -050031from synchronizers.base.steps.sync_object import SyncObject
Scott Bakerb18b81b2016-05-27 14:55:44 -070032from django.utils import timezone
33from diag import update_diag
Sapan Bhatia037c9472016-01-14 11:44:43 -050034
35# Load app models
36
37try:
38 app_module_names = Config().observer_applist.split(',')
39except AttributeError:
40 app_module_names = []
41
42if (type(app_module_names)!=list):
43 app_module_names=[app_module_names]
44
45app_modules = []
46
47for m in app_module_names:
48 model_path = m+'.models'
49 module = __import__(model_path,fromlist=[m])
50 app_modules.append(module)
51
52
53debug_mode = False
54
55class bcolors:
56 HEADER = '\033[95m'
57 OKBLUE = '\033[94m'
58 OKGREEN = '\033[92m'
59 WARNING = '\033[93m'
60 FAIL = '\033[91m'
61 ENDC = '\033[0m'
62 BOLD = '\033[1m'
63 UNDERLINE = '\033[4m'
64
65logger = Logger(level=logging.INFO)
66
67class StepNotReady(Exception):
68 pass
69
70class NoOpDriver:
71 def __init__(self):
72 self.enabled = True
73 self.dependency_graph = None
74
75STEP_STATUS_WORKING=1
76STEP_STATUS_OK=2
77STEP_STATUS_KO=3
78
79def invert_graph(g):
80 ig = {}
81 for k,v in g.items():
82 for v0 in v:
83 try:
84 ig[v0].append(k)
85 except:
86 ig[v0]=[k]
87 return ig
88
89class XOSObserver:
90 sync_steps = []
91
92
93 def __init__(self):
94 # The Condition object that gets signalled by Feefie events
95 self.step_lookup = {}
96 self.load_sync_step_modules()
97 self.load_sync_steps()
98 self.event_cond = threading.Condition()
99
100 self.driver_kind = getattr(Config(), "observer_driver", "openstack")
101 self.observer_name = getattr(Config(), "observer_name", "")
102 if self.driver_kind=="openstack":
103 self.driver = OpenStackDriver()
104 else:
105 self.driver = NoOpDriver()
106
107 def consolePrint(self, what):
108 if getattr(Config(), "observer_console_print", True):
109 print what
110
111 def wait_for_event(self, timeout):
112 self.event_cond.acquire()
113 self.event_cond.wait(timeout)
114 self.event_cond.release()
115
116 def wake_up(self):
117 logger.info('Wake up routine called. Event cond %r'%self.event_cond)
118 self.event_cond.acquire()
119 self.event_cond.notify()
120 self.event_cond.release()
121
122 def load_sync_step_modules(self, step_dir=None):
123 if step_dir is None:
124 if hasattr(Config(), "observer_steps_dir"):
125 step_dir = Config().observer_steps_dir
126 else:
Sapan Bhatiaf4441882016-01-18 09:19:42 -0500127 step_dir = XOS_DIR + "/synchronizers/openstack/steps"
Sapan Bhatia037c9472016-01-14 11:44:43 -0500128
129 for fn in os.listdir(step_dir):
130 pathname = os.path.join(step_dir,fn)
131 if os.path.isfile(pathname) and fn.endswith(".py") and (fn!="__init__.py"):
132 module = imp.load_source(fn[:-3],pathname)
133 for classname in dir(module):
134 c = getattr(module, classname, None)
135
136 # make sure 'c' is a descendent of SyncStep and has a
137 # provides field (this eliminates the abstract base classes
138 # since they don't have a provides)
139
140 if inspect.isclass(c) and (issubclass(c, SyncStep) or issubclass(c,OpenStackSyncStep)) and hasattr(c,"provides") and (c not in self.sync_steps):
141 self.sync_steps.append(c)
142 logger.info('loaded sync steps: %s' % ",".join([x.__name__ for x in self.sync_steps]))
143
144 def load_sync_steps(self):
145 dep_path = Config().observer_dependency_graph
146 logger.info('Loading model dependency graph from %s' % dep_path)
147 try:
148 # This contains dependencies between records, not sync steps
149 self.model_dependency_graph = json.loads(open(dep_path).read())
150 for left,lst in self.model_dependency_graph.items():
151 new_lst = []
152 for k in lst:
153 try:
154 tup = (k,k.lower())
155 new_lst.append(tup)
156 deps = self.model_dependency_graph[k]
157 except:
158 self.model_dependency_graph[k] = []
159
160 self.model_dependency_graph[left] = new_lst
161 except Exception,e:
162 raise e
163
164 try:
165 backend_path = Config().observer_pl_dependency_graph
166 logger.info('Loading backend dependency graph from %s' % backend_path)
167 # This contains dependencies between backend records
168 self.backend_dependency_graph = json.loads(open(backend_path).read())
169 for k,v in self.backend_dependency_graph.items():
170 try:
171 self.model_dependency_graph[k].extend(v)
172 except KeyError:
173 self.model_dependency_graphp[k] = v
174
175 except Exception,e:
176 logger.info('Backend dependency graph not loaded')
177 # We can work without a backend graph
178 self.backend_dependency_graph = {}
179
180 provides_dict = {}
181 for s in self.sync_steps:
182 self.step_lookup[s.__name__] = s
183 for m in s.provides:
184 try:
185 provides_dict[m.__name__].append(s.__name__)
186 except KeyError:
187 provides_dict[m.__name__]=[s.__name__]
188
189 step_graph = {}
190 phantom_steps = []
191 for k,v in self.model_dependency_graph.items():
192 try:
193 for source in provides_dict[k]:
194 if (not v):
195 step_graph[source] = []
196
197 for m,_ in v:
198 try:
199 for dest in provides_dict[m]:
200 # no deps, pass
201 try:
202 if (dest not in step_graph[source]):
203 step_graph[source].append(dest)
204 except:
205 step_graph[source]=[dest]
206 except KeyError:
207 if (not provides_dict.has_key(m)):
208 try:
209 step_graph[source]+=['#%s'%m]
210 except:
211 step_graph[source]=['#%s'%m]
212
213 phantom_steps+=['#%s'%m]
214 pass
215
216 except KeyError:
217 pass
218 # no dependencies, pass
219
220
221 self.dependency_graph = step_graph
222 self.deletion_dependency_graph = invert_graph(step_graph)
223
224 pp = pprint.PrettyPrinter(indent=4)
225 logger.info(pp.pformat(step_graph))
226 self.ordered_steps = toposort(self.dependency_graph, phantom_steps+map(lambda s:s.__name__,self.sync_steps))
227 self.ordered_steps = [i for i in self.ordered_steps if i!='SyncObject']
228
229 logger.info("Order of steps=%s" % self.ordered_steps)
230
231 self.load_run_times()
232
233
234 def check_duration(self, step, duration):
235 try:
236 if (duration > step.deadline):
237 logger.info('Sync step %s missed deadline, took %.2f seconds'%(step.name,duration))
238 except AttributeError:
239 # S doesn't have a deadline
240 pass
241
242 def update_run_time(self, step, deletion):
243 if (not deletion):
244 self.last_run_times[step.__name__]=time.time()
245 else:
246 self.last_deletion_run_times[step.__name__]=time.time()
247
248
249 def check_schedule(self, step, deletion):
250 last_run_times = self.last_run_times if not deletion else self.last_deletion_run_times
251
252 time_since_last_run = time.time() - last_run_times.get(step.__name__, 0)
253 try:
254 if (time_since_last_run < step.requested_interval):
255 raise StepNotReady
256 except AttributeError:
257 logger.info('Step %s does not have requested_interval set'%step.__name__)
258 raise StepNotReady
259
260 def load_run_times(self):
261 try:
262 jrun_times = open('/tmp/%sobserver_run_times'%self.observer_name).read()
263 self.last_run_times = json.loads(jrun_times)
264 except:
265 self.last_run_times={}
266 for e in self.ordered_steps:
267 self.last_run_times[e]=0
268 try:
269 jrun_times = open('/tmp/%sobserver_deletion_run_times'%self.observer_name).read()
270 self.last_deletion_run_times = json.loads(jrun_times)
271 except:
272 self.last_deletion_run_times={}
273 for e in self.ordered_steps:
274 self.last_deletion_run_times[e]=0
275
276 def lookup_step_class(self,s):
277 if ('#' in s):
278 return SyncObject
279 else:
280 step = self.step_lookup[s]
281 return step
282
283 def lookup_step(self,s):
284 if ('#' in s):
285 objname = s[1:]
286 so = SyncObject()
287
288 try:
289 obj = globals()[objname]
290 except:
291 for m in app_modules:
292 if (hasattr(m,objname)):
293 obj = getattr(m,objname)
294
295 so.provides=[obj]
296 so.observes=[obj]
297 step = so
298 else:
299 step_class = self.step_lookup[s]
300 step = step_class(driver=self.driver,error_map=self.error_mapper)
301 return step
302
303 def save_run_times(self):
304 run_times = json.dumps(self.last_run_times)
305 open('/tmp/%sobserver_run_times'%self.observer_name,'w').write(run_times)
306
307 deletion_run_times = json.dumps(self.last_deletion_run_times)
308 open('/tmp/%sobserver_deletion_run_times'%self.observer_name,'w').write(deletion_run_times)
309
310 def check_class_dependency(self, step, failed_steps):
311 step.dependenices = []
312 for obj in step.provides:
313 lst = self.model_dependency_graph.get(obj.__name__, [])
314 nlst = map(lambda(a,b):b,lst)
315 step.dependenices.extend(nlst)
316 for failed_step in failed_steps:
317 if (failed_step in step.dependencies):
318 raise StepNotReady
319
320 def sync(self, S, deletion):
321 try:
322 step = self.lookup_step_class(S)
323 start_time=time.time()
324
325 logger.info("Starting to work on step %s, deletion=%s" % (step.__name__, str(deletion)))
326
327 dependency_graph = self.dependency_graph if not deletion else self.deletion_dependency_graph
328 step_conditions = self.step_conditions# if not deletion else self.deletion_step_conditions
329 step_status = self.step_status# if not deletion else self.deletion_step_status
330
331 # Wait for step dependencies to be met
332 try:
333 deps = dependency_graph[S]
334 has_deps = True
335 except KeyError:
336 has_deps = False
337
338 go = True
339
340 failed_dep = None
341 if (has_deps):
342 for d in deps:
343 if d==step.__name__:
344 logger.info(" step %s self-wait skipped" % step.__name__)
345 go = True
346 continue
347
348 cond = step_conditions[d]
349 cond.acquire()
350 if (step_status[d] is STEP_STATUS_WORKING):
351 logger.info(" step %s wait on dep %s" % (step.__name__, d))
352 cond.wait()
Scott Baker7a2d0592016-06-09 21:56:19 -0700353 logger.info(" step %s wait on dep %s cond returned" % (step.__name__, d))
Sapan Bhatia037c9472016-01-14 11:44:43 -0500354 elif step_status[d] == STEP_STATUS_OK:
355 go = True
356 else:
Scott Baker7a2d0592016-06-09 21:56:19 -0700357 logger.info(" step %s has failed dep %s" % (step.__name__, d))
Sapan Bhatia037c9472016-01-14 11:44:43 -0500358 go = False
359 failed_dep = d
360 cond.release()
361 if (not go):
362 break
363 else:
364 go = True
365
366 if (not go):
Scott Baker7a2d0592016-06-09 21:56:19 -0700367 logger.info("Step %s skipped" % step.__name__)
Sapan Bhatia037c9472016-01-14 11:44:43 -0500368 self.consolePrint(bcolors.FAIL + "Step %r skipped on %r" % (step,failed_dep) + bcolors.ENDC)
369 # SMBAKER: sync_step was not defined here, so I changed
370 # this from 'sync_step' to 'step'. Verify.
371 self.failed_steps.append(step)
372 my_status = STEP_STATUS_KO
373 else:
374 sync_step = self.lookup_step(S)
375 sync_step. __name__= step.__name__
376 sync_step.dependencies = []
377 try:
378 mlist = sync_step.provides
379
380 try:
381 for m in mlist:
382 lst = self.model_dependency_graph[m.__name__]
383 nlst = map(lambda(a,b):b,lst)
384 sync_step.dependencies.extend(nlst)
385 except Exception,e:
386 raise e
387
388 except KeyError:
389 pass
390 sync_step.debug_mode = debug_mode
391
392 should_run = False
393 try:
394 # Various checks that decide whether
395 # this step runs or not
396 self.check_class_dependency(sync_step, self.failed_steps) # dont run Slices if Sites failed
397 self.check_schedule(sync_step, deletion) # dont run sync_network_routes if time since last run < 1 hour
398 should_run = True
399 except StepNotReady:
400 logger.info('Step not ready: %s'%sync_step.__name__)
401 self.failed_steps.append(sync_step)
402 my_status = STEP_STATUS_KO
403 except Exception,e:
404 logger.error('%r' % e)
405 logger.log_exc("sync step failed: %r. Deletion: %r"%(sync_step,deletion))
406 self.failed_steps.append(sync_step)
407 my_status = STEP_STATUS_KO
408
409 if (should_run):
410 try:
411 duration=time.time() - start_time
412
413 logger.info('Executing step %s, deletion=%s' % (sync_step.__name__, deletion))
414
415 self.consolePrint(bcolors.OKBLUE + "Executing step %s" % sync_step.__name__ + bcolors.ENDC)
416 failed_objects = sync_step(failed=list(self.failed_step_objects), deletion=deletion)
417
418 self.check_duration(sync_step, duration)
419
420 if failed_objects:
421 self.failed_step_objects.update(failed_objects)
422
Scott Baker7a2d0592016-06-09 21:56:19 -0700423 logger.info("Step %r succeeded, deletion=%s" % (sync_step.__name__, deletion))
Sapan Bhatia037c9472016-01-14 11:44:43 -0500424 self.consolePrint(bcolors.OKGREEN + "Step %r succeeded" % sync_step.__name__ + bcolors.ENDC)
425 my_status = STEP_STATUS_OK
426 self.update_run_time(sync_step,deletion)
427 except Exception,e:
428 self.consolePrint(bcolors.FAIL + "Model step %r failed" % (sync_step.__name__) + bcolors.ENDC)
429 logger.error('Model step %r failed. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!' % (sync_step.__name__, e))
Scott Bakerfa61bc42016-02-15 09:28:32 -0800430 logger.log_exc("Exception in sync step")
Sapan Bhatia037c9472016-01-14 11:44:43 -0500431 self.failed_steps.append(S)
432 my_status = STEP_STATUS_KO
433 else:
434 logger.info("Step %r succeeded due to non-run" % step)
435 my_status = STEP_STATUS_OK
436
437 try:
438 my_cond = step_conditions[S]
439 my_cond.acquire()
440 step_status[S]=my_status
441 my_cond.notify_all()
442 my_cond.release()
443 except KeyError,e:
444 logger.info('Step %r is a leaf' % step)
445 pass
446 finally:
447 try:
448 reset_queries()
449 except:
450 # this shouldn't happen, but in case it does, catch it...
451 logger.log_exc("exception in reset_queries")
452
453 connection.close()
454
455 def run(self):
456 if not self.driver.enabled:
457 return
458
459 if (self.driver_kind=="openstack") and (not self.driver.has_openstack):
460 return
461
462 while True:
463 logger.info('Waiting for event')
464 self.wait_for_event(timeout=5)
465 logger.info('Observer woke up')
466
467 self.run_once()
468
Scott Baker7a2d0592016-06-09 21:56:19 -0700469 def check_db_connection_okay(self):
470 # django implodes if the database connection is closed by docker-compose
471 try:
472 diag = Diag.objects.filter(name="foo").first()
473 except Exception, e:
474 from django import db
475 if "connection already closed" in traceback.format_exc():
476 logger.error("XXX connection already closed")
477 try:
478# if db.connection:
479# db.connection.close()
Zack Williams13b02812016-06-10 10:03:46 -0700480 db.close_old_connections()
Scott Baker7a2d0592016-06-09 21:56:19 -0700481 except:
482 logger.log_exc("XXX we failed to fix the failure")
483 else:
484 logger.log_exc("XXX some other error")
485
Sapan Bhatia037c9472016-01-14 11:44:43 -0500486 def run_once(self):
487 try:
Scott Baker7a2d0592016-06-09 21:56:19 -0700488 self.check_db_connection_okay()
Scott Baker8b24b982016-06-09 17:21:43 -0700489
Sapan Bhatia037c9472016-01-14 11:44:43 -0500490 loop_start = time.time()
491 error_map_file = getattr(Config(), "error_map_path", XOS_DIR + "/error_map.txt")
492 self.error_mapper = ErrorMapper(error_map_file)
493
494 # Two passes. One for sync, the other for deletion.
495 for deletion in [False,True]:
496 # Set of individual objects within steps that failed
497 self.failed_step_objects = set()
498
499 # Set up conditions and step status
500 # This is needed for steps to run in parallel
501 # while obeying dependencies.
502
503 providers = set()
504 dependency_graph = self.dependency_graph if not deletion else self.deletion_dependency_graph
505
506 for v in dependency_graph.values():
507 if (v):
508 providers.update(v)
509
510 self.step_conditions = {}
511 self.step_status = {}
512
513 for p in list(providers):
514 self.step_conditions[p] = threading.Condition()
515
516 self.step_status[p] = STEP_STATUS_WORKING
517
518 self.failed_steps = []
519
520 threads = []
521 logger.info('Deletion=%r...'%deletion)
522 schedule = self.ordered_steps if not deletion else reversed(self.ordered_steps)
523
524 for S in schedule:
Sapan Bhatia30f6e272016-06-08 17:58:59 +0200525 thread = threading.Thread(target=self.sync, name='synchronizer', args=(S, deletion))
Sapan Bhatia037c9472016-01-14 11:44:43 -0500526
527 logger.info('Deletion=%r...'%deletion)
528 threads.append(thread)
529
530 # Start threads
531 for t in threads:
532 t.start()
533
534 # another spot to clean up debug state
535 try:
536 reset_queries()
537 except:
538 # this shouldn't happen, but in case it does, catch it...
539 logger.log_exc("exception in reset_queries")
540
541 # Wait for all threads to finish before continuing with the run loop
542 for t in threads:
543 t.join()
544
545 self.save_run_times()
546
547 loop_end = time.time()
Sapan Bhatiabcff59f2016-03-24 07:56:38 +0100548
Scott Bakerb18b81b2016-05-27 14:55:44 -0700549 update_diag(loop_end=loop_end, loop_start=loop_start, backend_status="1 - Bottom Of Loop")
Sapan Bhatia1b5540c2016-04-27 19:19:07 +0200550
Sapan Bhatia037c9472016-01-14 11:44:43 -0500551 except Exception, e:
552 logger.error('Core error. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!' % e)
553 logger.log_exc("Exception in observer run loop")
554 traceback.print_exc()
Scott Bakerb18b81b2016-05-27 14:55:44 -0700555 update_diag(backend_status="2 - Exception in Event Loop")