blob: 492cd9a9df75bbfdaf868d9077be77712374a5cb [file] [log] [blame]
Sapan Bhatia24836f12013-08-27 10:16:05 -04001import time
2import traceback
3import commands
4import threading
5import json
6
7from datetime import datetime
8from collections import defaultdict
9from core.models import *
10from django.db.models import F, Q
Tony Mack387a73f2013-09-18 07:59:14 -040011#from openstack.manager import OpenStackManager
12from openstack.driver import OpenStackDriver
Sapan Bhatia24836f12013-08-27 10:16:05 -040013from util.logger import Logger, logging, logger
14#from timeout import timeout
Sapan Bhatia757e0b62013-09-02 16:55:00 -040015from planetstack.config import Config
Sapan Bhatia04c94ad2013-09-02 18:00:28 -040016from observer.steps import *
Sapan Bhatia24836f12013-08-27 10:16:05 -040017
Sapan Bhatia13c7f112013-09-02 14:19:35 -040018debug_mode = False
Sapan Bhatia24836f12013-08-27 10:16:05 -040019
20logger = Logger(logfile='observer.log', level=logging.INFO)
21
Sapan Bhatia13c7f112013-09-02 14:19:35 -040022class StepNotReady(Exception):
23 pass
24
Sapan Bhatia24836f12013-08-27 10:16:05 -040025def toposort(g, steps):
26 reverse = {}
27
28 for k,v in g.items():
29 for rk in v:
30 try:
31 reverse[rk].append(k)
32 except:
33 reverse[rk]=k
34
35 sources = []
36 for k,v in g.items():
37 if not reverse.has_key(k):
38 sources.append(k)
39
40
41 for k,v in reverse.iteritems():
42 if (not v):
43 sources.append(k)
44
45 order = []
46 marked = []
Sapan Bhatia04c94ad2013-09-02 18:00:28 -040047
Sapan Bhatia24836f12013-08-27 10:16:05 -040048 while sources:
49 n = sources.pop()
50 try:
51 for m in g[n]:
52 if m not in marked:
53 sources.append(m)
54 marked.append(m)
55 except KeyError:
56 pass
Sapan Bhatia04c94ad2013-09-02 18:00:28 -040057 order.append(n)
Sapan Bhatia24836f12013-08-27 10:16:05 -040058 return order
59
60class PlanetStackObserver:
Sapan Bhatia04c94ad2013-09-02 18:00:28 -040061 sync_steps = [SyncNetworks,SyncNetworkSlivers,SyncSites,SyncSitePrivileges,SyncSlices,SyncSliceMemberships,SyncSlivers,SyncSliverIps]
Sapan Bhatia24836f12013-08-27 10:16:05 -040062
Sapan Bhatia13c7f112013-09-02 14:19:35 -040063 def __init__(self):
Sapan Bhatia13c7f112013-09-02 14:19:35 -040064 # The Condition object that gets signalled by Feefie events
Sapan Bhatia24836f12013-08-27 10:16:05 -040065 self.load_sync_steps()
Sapan Bhatia13c7f112013-09-02 14:19:35 -040066 self.event_cond = threading.Condition()
Tony Mack387a73f2013-09-18 07:59:14 -040067 self.driver = OpenStackDriver()
Sapan Bhatia24836f12013-08-27 10:16:05 -040068
Sapan Bhatia13c7f112013-09-02 14:19:35 -040069 def wait_for_event(self, timeout):
70 self.event_cond.acquire()
71 self.event_cond.wait(timeout)
72 self.event_cond.release()
73
74 def wake_up(self):
75 logger.info('Wake up routine called. Event cond %r'%self.event_cond)
76 self.event_cond.acquire()
77 self.event_cond.notify()
78 self.event_cond.release()
Sapan Bhatia24836f12013-08-27 10:16:05 -040079
80 def load_sync_steps(self):
Sapan Bhatia04c94ad2013-09-02 18:00:28 -040081 dep_path = Config().observer_dependency_path
Sapan Bhatia24836f12013-08-27 10:16:05 -040082 try:
83 # This contains dependencies between records, not sync steps
84 self.model_dependency_graph = json.loads(open(dep_path).read())
85 except Exception,e:
86 raise e
87
Sapan Bhatia04c94ad2013-09-02 18:00:28 -040088 backend_path = Config().observer_backend_dependency_path
Sapan Bhatia24836f12013-08-27 10:16:05 -040089 try:
90 # This contains dependencies between backend records
91 self.backend_dependency_graph = json.loads(open(backend_path).read())
92 except Exception,e:
Sapan Bhatia04c94ad2013-09-02 18:00:28 -040093 # We can work without a backend graph
94 self.backend_dependency_graph = {}
Sapan Bhatia24836f12013-08-27 10:16:05 -040095
96 provides_dict = {}
Sapan Bhatia04c94ad2013-09-02 18:00:28 -040097 for s in self.sync_steps:
Sapan Bhatia24836f12013-08-27 10:16:05 -040098 for m in s.provides:
Sapan Bhatia04c94ad2013-09-02 18:00:28 -040099 try:
100 provides_dict[m.__name__].append(s.__name__)
101 except KeyError:
102 provides_dict[m.__name__]=[s.__name__]
103
Sapan Bhatia24836f12013-08-27 10:16:05 -0400104
105 step_graph = {}
Sapan Bhatia04c94ad2013-09-02 18:00:28 -0400106 for k,v in self.model_dependency_graph.iteritems():
Sapan Bhatia24836f12013-08-27 10:16:05 -0400107 try:
Sapan Bhatia04c94ad2013-09-02 18:00:28 -0400108 for source in provides_dict[k]:
109 for m in v:
110 try:
111 for dest in provides_dict[m]:
112 # no deps, pass
113 try:
114 step_graph[source].append(dest)
115 except:
116 step_graph[source]=[dest]
117 except KeyError:
118 pass
Sapan Bhatia24836f12013-08-27 10:16:05 -0400119
120 except KeyError:
121 pass
122 # no dependencies, pass
123
Sapan Bhatia04c94ad2013-09-02 18:00:28 -0400124 import pdb
125 pdb.set_trace()
126 if (self.backend_dependency_graph):
Sapan Bhatia24836f12013-08-27 10:16:05 -0400127 backend_dict = {}
128 for s in sync_steps:
129 for m in s.serves:
130 backend_dict[m]=s.__name__
131
132 for k,v in backend_dependency_graph.iteritems():
133 try:
134 source = backend_dict[k]
135 for m in v:
136 try:
137 dest = backend_dict[m]
138 except KeyError:
139 # no deps, pass
Sapan Bhatia13c7f112013-09-02 14:19:35 -0400140 pass
Sapan Bhatia24836f12013-08-27 10:16:05 -0400141 step_graph[source]=dest
142
143 except KeyError:
144 pass
145 # no dependencies, pass
146
147 dependency_graph = step_graph
148
Sapan Bhatia04c94ad2013-09-02 18:00:28 -0400149 self.ordered_steps = toposort(dependency_graph, self.sync_steps)
150 print "Order of steps=",self.ordered_steps
Sapan Bhatia36938ca2013-09-02 14:35:24 -0400151 self.load_run_times()
152
Sapan Bhatia24836f12013-08-27 10:16:05 -0400153
Sapan Bhatia13c7f112013-09-02 14:19:35 -0400154 def check_duration(self):
155 try:
156 if (duration > S.deadline):
157 logger.info('Sync step %s missed deadline, took %.2f seconds'%(S.name,duration))
158 except AttributeError:
159 # S doesn't have a deadline
160 pass
Sapan Bhatia24836f12013-08-27 10:16:05 -0400161
Sapan Bhatia13c7f112013-09-02 14:19:35 -0400162 def update_run_time(self, step):
163 self.last_run_times[step.name]=time.time()
164
165 def check_schedule(self, step):
166 time_since_last_run = time.time() - self.last_run_times[step.name]
167 try:
168 if (time_since_last_run < step.requested_interval):
169 raise StepNotReady
170 except AttributeError:
171 logger.info('Step %s does not have requested_interval set'%step.name)
172 raise StepNotReady
173
Sapan Bhatia36938ca2013-09-02 14:35:24 -0400174 def load_run_times(self):
175 try:
176 jrun_times = open('/tmp/observer_run_times').read()
177 self.last_run_times = json.loads(jrun_times)
178 except:
179 self.last_run_times={}
180 for e in self.ordered_steps:
181 self.last_run_times[e.name]=0
182
183
184
185 def save_run_times(self):
186 run_times = json.dumps(self.last_run_times)
187 open('/tmp/observer_run_times','w').write(run_times)
188
Sapan Bhatia13c7f112013-09-02 14:19:35 -0400189 def check_class_dependency(self, step, failed_steps):
190 for failed_step in failed_steps:
191 if (failed_step in self.dependency_graph[step.name]):
192 raise StepNotReady
193
194 def run(self):
Tony Mack387a73f2013-09-18 07:59:14 -0400195 if not self.driver.enabled or not self.driver.has_openstack:
Sapan Bhatia13c7f112013-09-02 14:19:35 -0400196 return
197
198 while True:
199 try:
200 logger.info('Waiting for event')
201 tBeforeWait = time.time()
202 self.wait_for_event(timeout=300)
203 logger.info('Observer woke up')
204
205 # Set of whole steps that failed
206 failed_steps = []
207
208 # Set of individual objects within steps that failed
209 failed_step_objects = []
Sapan Bhatia24836f12013-08-27 10:16:05 -0400210
211 for S in self.ordered_steps:
Sapan Bhatia13c7f112013-09-02 14:19:35 -0400212 start_time=time.time()
213
Tony Mack387a73f2013-09-18 07:59:14 -0400214 sync_step = S(driver=self.driver)
Sapan Bhatia13c7f112013-09-02 14:19:35 -0400215 sync_step.dependencies = self.dependencies[sync_step.name]
216 sync_step.debug_mode = debug_mode
Sapan Bhatia24836f12013-08-27 10:16:05 -0400217
Sapan Bhatia13c7f112013-09-02 14:19:35 -0400218 should_run = False
219 try:
220 # Various checks that decide whether
221 # this step runs or not
222 self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed
223 self.check_schedule(sync_step) # dont run sync_network_routes if time since last run < 1 hour
224 should_run = True
225 except StepNotReady:
226 logging.info('Step not ready: %s'%sync_step.name)
227 failed_steps.add(sync_step)
228 except:
229 failed_steps.add(sync_step)
Sapan Bhatia24836f12013-08-27 10:16:05 -0400230
Sapan Bhatia13c7f112013-09-02 14:19:35 -0400231 if (should_run):
232 try:
233 duration=time.time() - start_time
234
235 # ********* This is the actual sync step
236 failed_objects = sync_step(failed=failed_step_objects)
237
238
239 check_deadline(sync_step, duration)
240 failed_step_objects.extend(failed_objects)
241 self.update_run_time(sync_step)
242 except:
243 failed_steps.add(S)
Sapan Bhatia36938ca2013-09-02 14:35:24 -0400244 self.save_run_times()
Sapan Bhatia13c7f112013-09-02 14:19:35 -0400245 except:
246 logger.log_exc("Exception in observer run loop")
247 traceback.print_exc()