blob: f4f7f02b1db757f57e4cb80bfcaedd6aab302cf7 [file] [log] [blame]
Sapan Bhatia24836f12013-08-27 10:16:05 -04001import time
2import traceback
3import commands
4import threading
5import json
6
7from datetime import datetime
8from collections import defaultdict
9from core.models import *
10from django.db.models import F, Q
Tony Mack387a73f2013-09-18 07:59:14 -040011#from openstack.manager import OpenStackManager
12from openstack.driver import OpenStackDriver
Sapan Bhatia24836f12013-08-27 10:16:05 -040013from util.logger import Logger, logging, logger
14#from timeout import timeout
Sapan Bhatia757e0b62013-09-02 16:55:00 -040015from planetstack.config import Config
Sapan Bhatia04c94ad2013-09-02 18:00:28 -040016from observer.steps import *
Sapan Bhatia24836f12013-08-27 10:16:05 -040017
Sapan Bhatia13c7f112013-09-02 14:19:35 -040018debug_mode = False
Sapan Bhatia24836f12013-08-27 10:16:05 -040019
20logger = Logger(logfile='observer.log', level=logging.INFO)
21
Sapan Bhatia13c7f112013-09-02 14:19:35 -040022class StepNotReady(Exception):
Tony Mackce79de02013-09-24 10:12:33 -040023 pass
Sapan Bhatia13c7f112013-09-02 14:19:35 -040024
Sapan Bhatia24836f12013-08-27 10:16:05 -040025def toposort(g, steps):
Tony Mackce79de02013-09-24 10:12:33 -040026 reverse = {}
Sapan Bhatia24836f12013-08-27 10:16:05 -040027
Tony Mackce79de02013-09-24 10:12:33 -040028 for k,v in g.items():
29 for rk in v:
30 try:
31 reverse[rk].append(k)
32 except:
33 reverse[rk]=k
Sapan Bhatia24836f12013-08-27 10:16:05 -040034
Tony Mackce79de02013-09-24 10:12:33 -040035 sources = []
36 for k,v in g.items():
37 if not reverse.has_key(k):
38 sources.append(k)
Sapan Bhatia24836f12013-08-27 10:16:05 -040039
40
Tony Mackce79de02013-09-24 10:12:33 -040041 for k,v in reverse.iteritems():
42 if (not v):
43 sources.append(k)
Sapan Bhatia24836f12013-08-27 10:16:05 -040044
Tony Mackce79de02013-09-24 10:12:33 -040045 order = []
46 marked = []
Sapan Bhatia04c94ad2013-09-02 18:00:28 -040047
Tony Mackce79de02013-09-24 10:12:33 -040048 while sources:
49 n = sources.pop()
50 try:
51 for m in g[n]:
52 if m not in marked:
53 sources.append(m)
54 marked.append(m)
55 except KeyError:
56 pass
57 order.append(n)
58 return order
Sapan Bhatia24836f12013-08-27 10:16:05 -040059
60class PlanetStackObserver:
Tony Mack66646d52013-09-24 21:47:12 -040061 sync_steps = [SyncNetworks,SyncNetworkSlivers,SyncSites,SyncSitePrivileges,SyncSlices,SyncSliceMemberships,SyncSlivers,SyncSliverIps,SyncExternalRoutes,SyncUsers,GarbageCollector]
Sapan Bhatia24836f12013-08-27 10:16:05 -040062
Tony Mackce79de02013-09-24 10:12:33 -040063 def __init__(self):
64 # The Condition object that gets signalled by Feefie events
65 self.load_sync_steps()
66 self.event_cond = threading.Condition()
Tony Mack387a73f2013-09-18 07:59:14 -040067 self.driver = OpenStackDriver()
Sapan Bhatia24836f12013-08-27 10:16:05 -040068
Tony Mackce79de02013-09-24 10:12:33 -040069 def wait_for_event(self, timeout):
70 self.event_cond.acquire()
71 self.event_cond.wait(timeout)
72 self.event_cond.release()
73
74 def wake_up(self):
75 logger.info('Wake up routine called. Event cond %r'%self.event_cond)
76 self.event_cond.acquire()
77 self.event_cond.notify()
78 self.event_cond.release()
Sapan Bhatia24836f12013-08-27 10:16:05 -040079
Tony Mackce79de02013-09-24 10:12:33 -040080 def load_sync_steps(self):
81 dep_path = Config().observer_backend_dependency_graph
82 try:
83 # This contains dependencies between records, not sync steps
84 self.model_dependency_graph = json.loads(open(dep_path).read())
85 except Exception,e:
86 raise e
Sapan Bhatia24836f12013-08-27 10:16:05 -040087
Tony Mackce79de02013-09-24 10:12:33 -040088 try:
Tony Mackc12d5ca2013-09-24 10:30:39 -040089 backend_path = Config().observer_pl_dependency_graph
Tony Mackce79de02013-09-24 10:12:33 -040090 # This contains dependencies between backend records
91 self.backend_dependency_graph = json.loads(open(backend_path).read())
92 except Exception,e:
93 # We can work without a backend graph
94 self.backend_dependency_graph = {}
Sapan Bhatia24836f12013-08-27 10:16:05 -040095
Tony Mackce79de02013-09-24 10:12:33 -040096 provides_dict = {}
97 for s in self.sync_steps:
98 for m in s.provides:
99 try:
100 provides_dict[m.__name__].append(s.__name__)
101 except KeyError:
102 provides_dict[m.__name__]=[s.__name__]
Sapan Bhatia04c94ad2013-09-02 18:00:28 -0400103
Tony Mackce79de02013-09-24 10:12:33 -0400104
105 step_graph = {}
106 for k,v in self.model_dependency_graph.iteritems():
107 try:
108 for source in provides_dict[k]:
109 for m in v:
110 try:
111 for dest in provides_dict[m]:
112 # no deps, pass
113 try:
114 step_graph[source].append(dest)
115 except:
116 step_graph[source]=[dest]
117 except KeyError:
118 pass
119
120 except KeyError:
121 pass
122 # no dependencies, pass
123
124 #import pdb
125 #pdb.set_trace()
126 if (self.backend_dependency_graph):
127 backend_dict = {}
128 for s in self.sync_steps:
129 for m in s.serves:
130 backend_dict[m]=s.__name__
131
132 for k,v in backend_dependency_graph.iteritems():
133 try:
134 source = backend_dict[k]
135 for m in v:
136 try:
137 dest = backend_dict[m]
138 except KeyError:
139 # no deps, pass
140 pass
141 step_graph[source]=dest
142
143 except KeyError:
144 pass
145 # no dependencies, pass
Sapan Bhatia24836f12013-08-27 10:16:05 -0400146
Tony Mackce79de02013-09-24 10:12:33 -0400147 dependency_graph = step_graph
Sapan Bhatia24836f12013-08-27 10:16:05 -0400148
Tony Mackce79de02013-09-24 10:12:33 -0400149 self.ordered_steps = toposort(dependency_graph, self.sync_steps)
150 print "Order of steps=",self.ordered_steps
151 self.load_run_times()
152
Sapan Bhatia24836f12013-08-27 10:16:05 -0400153
Tony Mackce79de02013-09-24 10:12:33 -0400154 def check_duration(self):
155 try:
156 if (duration > S.deadline):
157 logger.info('Sync step %s missed deadline, took %.2f seconds'%(S.name,duration))
158 except AttributeError:
159 # S doesn't have a deadline
160 pass
Sapan Bhatia24836f12013-08-27 10:16:05 -0400161
Tony Mackce79de02013-09-24 10:12:33 -0400162 def update_run_time(self, step):
163 self.last_run_times[step.name]=time.time()
Sapan Bhatia13c7f112013-09-02 14:19:35 -0400164
Tony Mackce79de02013-09-24 10:12:33 -0400165 def check_schedule(self, step):
166 time_since_last_run = time.time() - self.last_run_times[step.name]
167 try:
168 if (time_since_last_run < step.requested_interval):
169 raise StepNotReady
170 except AttributeError:
171 logger.info('Step %s does not have requested_interval set'%step.name)
172 raise StepNotReady
173
174 def load_run_times(self):
175 try:
176 jrun_times = open('/tmp/observer_run_times').read()
177 self.last_run_times = json.loads(jrun_times)
178 except:
179 self.last_run_times={}
180 for e in self.ordered_steps:
Tony Mackc12d5ca2013-09-24 10:30:39 -0400181 self.last_run_times[e]=0
Sapan Bhatia36938ca2013-09-02 14:35:24 -0400182
183
184
Tony Mackce79de02013-09-24 10:12:33 -0400185 def save_run_times(self):
186 run_times = json.dumps(self.last_run_times)
187 open('/tmp/observer_run_times','w').write(run_times)
Sapan Bhatia36938ca2013-09-02 14:35:24 -0400188
Tony Mackce79de02013-09-24 10:12:33 -0400189 def check_class_dependency(self, step, failed_steps):
190 for failed_step in failed_steps:
191 if (failed_step in self.dependency_graph[step.name]):
192 raise StepNotReady
Sapan Bhatia13c7f112013-09-02 14:19:35 -0400193
Tony Mackce79de02013-09-24 10:12:33 -0400194 def run(self):
195 if not self.driver.enabled or not self.driver.has_openstack:
196 return
Sapan Bhatia13c7f112013-09-02 14:19:35 -0400197
Tony Mackce79de02013-09-24 10:12:33 -0400198 while True:
199 try:
200 logger.info('Waiting for event')
201 tBeforeWait = time.time()
202 self.wait_for_event(timeout=300)
203 logger.info('Observer woke up')
Sapan Bhatia13c7f112013-09-02 14:19:35 -0400204
Tony Mackce79de02013-09-24 10:12:33 -0400205 # Set of whole steps that failed
206 failed_steps = []
Sapan Bhatia13c7f112013-09-02 14:19:35 -0400207
Tony Mackce79de02013-09-24 10:12:33 -0400208 # Set of individual objects within steps that failed
209 failed_step_objects = []
Sapan Bhatia24836f12013-08-27 10:16:05 -0400210
Tony Mackce79de02013-09-24 10:12:33 -0400211 for S in self.ordered_steps:
212 start_time=time.time()
213
214 sync_step = S(driver=self.driver)
215 sync_step.dependencies = self.dependencies[sync_step.name]
216 sync_step.debug_mode = debug_mode
Sapan Bhatia24836f12013-08-27 10:16:05 -0400217
Tony Mackce79de02013-09-24 10:12:33 -0400218 should_run = False
219 try:
220 # Various checks that decide whether
221 # this step runs or not
222 self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed
223 self.check_schedule(sync_step) # dont run sync_network_routes if time since last run < 1 hour
224 should_run = True
225 except StepNotReady:
226 logging.info('Step not ready: %s'%sync_step.name)
227 failed_steps.add(sync_step)
228 except:
229 failed_steps.add(sync_step)
Sapan Bhatia24836f12013-08-27 10:16:05 -0400230
Tony Mackce79de02013-09-24 10:12:33 -0400231 if (should_run):
232 try:
233 duration=time.time() - start_time
Sapan Bhatia13c7f112013-09-02 14:19:35 -0400234
Tony Mackce79de02013-09-24 10:12:33 -0400235 # ********* This is the actual sync step
236 failed_objects = sync_step(failed=failed_step_objects)
Sapan Bhatia13c7f112013-09-02 14:19:35 -0400237
238
Tony Mackce79de02013-09-24 10:12:33 -0400239 check_deadline(sync_step, duration)
240 failed_step_objects.extend(failed_objects)
241 self.update_run_time(sync_step)
242 except:
243 failed_steps.add(S)
244 self.save_run_times()
245 except:
246 logger.log_exc("Exception in observer run loop")
247 traceback.print_exc()