blob: 671bdc30eb80eb112979c61d4812c6400db60cc8 [file] [log] [blame]
Sapan Bhatia24836f12013-08-27 10:16:05 -04001import time
2import traceback
3import commands
4import threading
5import json
6
7from datetime import datetime
8from collections import defaultdict
9from core.models import *
10from django.db.models import F, Q
11from openstack.manager import OpenStackManager
12from util.logger import Logger, logging, logger
13#from timeout import timeout
Sapan Bhatia757e0b62013-09-02 16:55:00 -040014from planetstack.config import Config
Sapan Bhatia04c94ad2013-09-02 18:00:28 -040015from observer.steps import *
Sapan Bhatia24836f12013-08-27 10:16:05 -040016
Sapan Bhatia13c7f112013-09-02 14:19:35 -040017debug_mode = False
Sapan Bhatia24836f12013-08-27 10:16:05 -040018
19logger = Logger(logfile='observer.log', level=logging.INFO)
20
Sapan Bhatia13c7f112013-09-02 14:19:35 -040021class StepNotReady(Exception):
22 pass
23
Sapan Bhatia24836f12013-08-27 10:16:05 -040024def toposort(g, steps):
25 reverse = {}
26
27 for k,v in g.items():
28 for rk in v:
29 try:
30 reverse[rk].append(k)
31 except:
32 reverse[rk]=k
33
34 sources = []
35 for k,v in g.items():
36 if not reverse.has_key(k):
37 sources.append(k)
38
39
40 for k,v in reverse.iteritems():
41 if (not v):
42 sources.append(k)
43
44 order = []
45 marked = []
Sapan Bhatia04c94ad2013-09-02 18:00:28 -040046
Sapan Bhatia24836f12013-08-27 10:16:05 -040047 while sources:
48 n = sources.pop()
49 try:
50 for m in g[n]:
51 if m not in marked:
52 sources.append(m)
53 marked.append(m)
54 except KeyError:
55 pass
Sapan Bhatia04c94ad2013-09-02 18:00:28 -040056 order.append(n)
Sapan Bhatia24836f12013-08-27 10:16:05 -040057 return order
58
59class PlanetStackObserver:
Sapan Bhatia04c94ad2013-09-02 18:00:28 -040060 sync_steps = [SyncNetworks,SyncNetworkSlivers,SyncSites,SyncSitePrivileges,SyncSlices,SyncSliceMemberships,SyncSlivers,SyncSliverIps]
Sapan Bhatia24836f12013-08-27 10:16:05 -040061
Sapan Bhatia13c7f112013-09-02 14:19:35 -040062 def __init__(self):
Sapan Bhatia13c7f112013-09-02 14:19:35 -040063 # The Condition object that gets signalled by Feefie events
Sapan Bhatia24836f12013-08-27 10:16:05 -040064 self.load_sync_steps()
Sapan Bhatia13c7f112013-09-02 14:19:35 -040065 self.event_cond = threading.Condition()
Sapan Bhatia24836f12013-08-27 10:16:05 -040066
Sapan Bhatia13c7f112013-09-02 14:19:35 -040067 def wait_for_event(self, timeout):
68 self.event_cond.acquire()
69 self.event_cond.wait(timeout)
70 self.event_cond.release()
71
72 def wake_up(self):
73 logger.info('Wake up routine called. Event cond %r'%self.event_cond)
74 self.event_cond.acquire()
75 self.event_cond.notify()
76 self.event_cond.release()
Sapan Bhatia24836f12013-08-27 10:16:05 -040077
78 def load_sync_steps(self):
Sapan Bhatia04c94ad2013-09-02 18:00:28 -040079 dep_path = Config().observer_dependency_path
Sapan Bhatia24836f12013-08-27 10:16:05 -040080 try:
81 # This contains dependencies between records, not sync steps
82 self.model_dependency_graph = json.loads(open(dep_path).read())
83 except Exception,e:
84 raise e
85
Sapan Bhatia04c94ad2013-09-02 18:00:28 -040086 backend_path = Config().observer_backend_dependency_path
Sapan Bhatia24836f12013-08-27 10:16:05 -040087 try:
88 # This contains dependencies between backend records
89 self.backend_dependency_graph = json.loads(open(backend_path).read())
90 except Exception,e:
Sapan Bhatia04c94ad2013-09-02 18:00:28 -040091 # We can work without a backend graph
92 self.backend_dependency_graph = {}
Sapan Bhatia24836f12013-08-27 10:16:05 -040093
94 provides_dict = {}
Sapan Bhatia04c94ad2013-09-02 18:00:28 -040095 for s in self.sync_steps:
Sapan Bhatia24836f12013-08-27 10:16:05 -040096 for m in s.provides:
Sapan Bhatia04c94ad2013-09-02 18:00:28 -040097 try:
98 provides_dict[m.__name__].append(s.__name__)
99 except KeyError:
100 provides_dict[m.__name__]=[s.__name__]
101
Sapan Bhatia24836f12013-08-27 10:16:05 -0400102
103 step_graph = {}
Sapan Bhatia04c94ad2013-09-02 18:00:28 -0400104 for k,v in self.model_dependency_graph.iteritems():
Sapan Bhatia24836f12013-08-27 10:16:05 -0400105 try:
Sapan Bhatia04c94ad2013-09-02 18:00:28 -0400106 for source in provides_dict[k]:
107 for m in v:
108 try:
109 for dest in provides_dict[m]:
110 # no deps, pass
111 try:
112 step_graph[source].append(dest)
113 except:
114 step_graph[source]=[dest]
115 except KeyError:
116 pass
Sapan Bhatia24836f12013-08-27 10:16:05 -0400117
118 except KeyError:
119 pass
120 # no dependencies, pass
121
Sapan Bhatia04c94ad2013-09-02 18:00:28 -0400122 import pdb
123 pdb.set_trace()
124 if (self.backend_dependency_graph):
Sapan Bhatia24836f12013-08-27 10:16:05 -0400125 backend_dict = {}
126 for s in sync_steps:
127 for m in s.serves:
128 backend_dict[m]=s.__name__
129
130 for k,v in backend_dependency_graph.iteritems():
131 try:
132 source = backend_dict[k]
133 for m in v:
134 try:
135 dest = backend_dict[m]
136 except KeyError:
137 # no deps, pass
Sapan Bhatia13c7f112013-09-02 14:19:35 -0400138 pass
Sapan Bhatia24836f12013-08-27 10:16:05 -0400139 step_graph[source]=dest
140
141 except KeyError:
142 pass
143 # no dependencies, pass
144
145 dependency_graph = step_graph
146
Sapan Bhatia04c94ad2013-09-02 18:00:28 -0400147 self.ordered_steps = toposort(dependency_graph, self.sync_steps)
148 print "Order of steps=",self.ordered_steps
Sapan Bhatia36938ca2013-09-02 14:35:24 -0400149 self.load_run_times()
150
Sapan Bhatia24836f12013-08-27 10:16:05 -0400151
Sapan Bhatia13c7f112013-09-02 14:19:35 -0400152 def check_duration(self):
153 try:
154 if (duration > S.deadline):
155 logger.info('Sync step %s missed deadline, took %.2f seconds'%(S.name,duration))
156 except AttributeError:
157 # S doesn't have a deadline
158 pass
Sapan Bhatia24836f12013-08-27 10:16:05 -0400159
Sapan Bhatia13c7f112013-09-02 14:19:35 -0400160 def update_run_time(self, step):
161 self.last_run_times[step.name]=time.time()
162
163 def check_schedule(self, step):
164 time_since_last_run = time.time() - self.last_run_times[step.name]
165 try:
166 if (time_since_last_run < step.requested_interval):
167 raise StepNotReady
168 except AttributeError:
169 logger.info('Step %s does not have requested_interval set'%step.name)
170 raise StepNotReady
171
Sapan Bhatia36938ca2013-09-02 14:35:24 -0400172 def load_run_times(self):
173 try:
174 jrun_times = open('/tmp/observer_run_times').read()
175 self.last_run_times = json.loads(jrun_times)
176 except:
177 self.last_run_times={}
178 for e in self.ordered_steps:
179 self.last_run_times[e.name]=0
180
181
182
183 def save_run_times(self):
184 run_times = json.dumps(self.last_run_times)
185 open('/tmp/observer_run_times','w').write(run_times)
186
Sapan Bhatia13c7f112013-09-02 14:19:35 -0400187 def check_class_dependency(self, step, failed_steps):
188 for failed_step in failed_steps:
189 if (failed_step in self.dependency_graph[step.name]):
190 raise StepNotReady
191
192 def run(self):
193 if not self.manager.enabled or not self.manager.has_openstack:
194 return
195
196 while True:
197 try:
198 logger.info('Waiting for event')
199 tBeforeWait = time.time()
200 self.wait_for_event(timeout=300)
201 logger.info('Observer woke up')
202
203 # Set of whole steps that failed
204 failed_steps = []
205
206 # Set of individual objects within steps that failed
207 failed_step_objects = []
Sapan Bhatia24836f12013-08-27 10:16:05 -0400208
209 for S in self.ordered_steps:
Sapan Bhatia13c7f112013-09-02 14:19:35 -0400210 start_time=time.time()
211
Sapan Bhatia24836f12013-08-27 10:16:05 -0400212 sync_step = S()
Sapan Bhatia13c7f112013-09-02 14:19:35 -0400213 sync_step.dependencies = self.dependencies[sync_step.name]
214 sync_step.debug_mode = debug_mode
Sapan Bhatia24836f12013-08-27 10:16:05 -0400215
Sapan Bhatia13c7f112013-09-02 14:19:35 -0400216 should_run = False
217 try:
218 # Various checks that decide whether
219 # this step runs or not
220 self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed
221 self.check_schedule(sync_step) # dont run sync_network_routes if time since last run < 1 hour
222 should_run = True
223 except StepNotReady:
224 logging.info('Step not ready: %s'%sync_step.name)
225 failed_steps.add(sync_step)
226 except:
227 failed_steps.add(sync_step)
Sapan Bhatia24836f12013-08-27 10:16:05 -0400228
Sapan Bhatia13c7f112013-09-02 14:19:35 -0400229 if (should_run):
230 try:
231 duration=time.time() - start_time
232
233 # ********* This is the actual sync step
234 failed_objects = sync_step(failed=failed_step_objects)
235
236
237 check_deadline(sync_step, duration)
238 failed_step_objects.extend(failed_objects)
239 self.update_run_time(sync_step)
240 except:
241 failed_steps.add(S)
Sapan Bhatia36938ca2013-09-02 14:35:24 -0400242 self.save_run_times()
Sapan Bhatia13c7f112013-09-02 14:19:35 -0400243 except:
244 logger.log_exc("Exception in observer run loop")
245 traceback.print_exc()