blob: 6dc34da4bb367744fa2757de686047cd2782f6c2 [file] [log] [blame]
Sapan Bhatia24836f12013-08-27 10:16:05 -04001import time
2import traceback
3import commands
4import threading
5import json
6
7from datetime import datetime
8from collections import defaultdict
9from core.models import *
10from django.db.models import F, Q
Tony Mack387a73f2013-09-18 07:59:14 -040011#from openstack.manager import OpenStackManager
12from openstack.driver import OpenStackDriver
Sapan Bhatia24836f12013-08-27 10:16:05 -040013from util.logger import Logger, logging, logger
14#from timeout import timeout
Sapan Bhatia757e0b62013-09-02 16:55:00 -040015from planetstack.config import Config
Sapan Bhatia04c94ad2013-09-02 18:00:28 -040016from observer.steps import *
Sapan Bhatia24836f12013-08-27 10:16:05 -040017
Sapan Bhatia13c7f112013-09-02 14:19:35 -040018debug_mode = False
Sapan Bhatia24836f12013-08-27 10:16:05 -040019
20logger = Logger(logfile='observer.log', level=logging.INFO)
21
Sapan Bhatia13c7f112013-09-02 14:19:35 -040022class StepNotReady(Exception):
Tony Mackce79de02013-09-24 10:12:33 -040023 pass
Sapan Bhatia13c7f112013-09-02 14:19:35 -040024
Sapan Bhatia24836f12013-08-27 10:16:05 -040025def toposort(g, steps):
Tony Mackce79de02013-09-24 10:12:33 -040026 reverse = {}
Sapan Bhatia24836f12013-08-27 10:16:05 -040027
Tony Mackce79de02013-09-24 10:12:33 -040028 for k,v in g.items():
29 for rk in v:
30 try:
31 reverse[rk].append(k)
32 except:
33 reverse[rk]=k
Sapan Bhatia24836f12013-08-27 10:16:05 -040034
Tony Mackce79de02013-09-24 10:12:33 -040035 sources = []
36 for k,v in g.items():
37 if not reverse.has_key(k):
38 sources.append(k)
Sapan Bhatia24836f12013-08-27 10:16:05 -040039
40
Tony Mackce79de02013-09-24 10:12:33 -040041 for k,v in reverse.iteritems():
42 if (not v):
43 sources.append(k)
Sapan Bhatia24836f12013-08-27 10:16:05 -040044
Tony Mackce79de02013-09-24 10:12:33 -040045 order = []
46 marked = []
Sapan Bhatia04c94ad2013-09-02 18:00:28 -040047
Tony Mackce79de02013-09-24 10:12:33 -040048 while sources:
49 n = sources.pop()
50 try:
51 for m in g[n]:
52 if m not in marked:
53 sources.append(m)
54 marked.append(m)
55 except KeyError:
56 pass
57 order.append(n)
Sapan Bhatia972a2e82013-10-02 00:03:02 -040058
59 order.extend(set(steps)-set(order))
Tony Mackce79de02013-09-24 10:12:33 -040060 return order
Sapan Bhatia24836f12013-08-27 10:16:05 -040061
62class PlanetStackObserver:
Tony Mack66646d52013-09-24 21:47:12 -040063 sync_steps = [SyncNetworks,SyncNetworkSlivers,SyncSites,SyncSitePrivileges,SyncSlices,SyncSliceMemberships,SyncSlivers,SyncSliverIps,SyncExternalRoutes,SyncUsers,GarbageCollector]
Sapan Bhatia24836f12013-08-27 10:16:05 -040064
Tony Mackce79de02013-09-24 10:12:33 -040065 def __init__(self):
66 # The Condition object that gets signalled by Feefie events
Tony Mack3bf77b02013-09-25 00:49:50 -040067 self.step_lookup = {}
Tony Mackce79de02013-09-24 10:12:33 -040068 self.load_sync_steps()
69 self.event_cond = threading.Condition()
Tony Mack387a73f2013-09-18 07:59:14 -040070 self.driver = OpenStackDriver()
Sapan Bhatia24836f12013-08-27 10:16:05 -040071
Tony Mackce79de02013-09-24 10:12:33 -040072 def wait_for_event(self, timeout):
73 self.event_cond.acquire()
74 self.event_cond.wait(timeout)
75 self.event_cond.release()
76
77 def wake_up(self):
78 logger.info('Wake up routine called. Event cond %r'%self.event_cond)
79 self.event_cond.acquire()
80 self.event_cond.notify()
81 self.event_cond.release()
Sapan Bhatia24836f12013-08-27 10:16:05 -040082
Tony Mackce79de02013-09-24 10:12:33 -040083 def load_sync_steps(self):
84 dep_path = Config().observer_backend_dependency_graph
85 try:
86 # This contains dependencies between records, not sync steps
87 self.model_dependency_graph = json.loads(open(dep_path).read())
88 except Exception,e:
89 raise e
Sapan Bhatia24836f12013-08-27 10:16:05 -040090
Tony Mackce79de02013-09-24 10:12:33 -040091 try:
Tony Mackc12d5ca2013-09-24 10:30:39 -040092 backend_path = Config().observer_pl_dependency_graph
Tony Mackce79de02013-09-24 10:12:33 -040093 # This contains dependencies between backend records
94 self.backend_dependency_graph = json.loads(open(backend_path).read())
95 except Exception,e:
96 # We can work without a backend graph
97 self.backend_dependency_graph = {}
Sapan Bhatia24836f12013-08-27 10:16:05 -040098
Tony Mackce79de02013-09-24 10:12:33 -040099 provides_dict = {}
100 for s in self.sync_steps:
Sapan Bhatia972a2e82013-10-02 00:03:02 -0400101 self.step_lookup[s.__name__] = s
Tony Mackce79de02013-09-24 10:12:33 -0400102 for m in s.provides:
103 try:
104 provides_dict[m.__name__].append(s.__name__)
105 except KeyError:
106 provides_dict[m.__name__]=[s.__name__]
Sapan Bhatia04c94ad2013-09-02 18:00:28 -0400107
Tony Mackce79de02013-09-24 10:12:33 -0400108
109 step_graph = {}
110 for k,v in self.model_dependency_graph.iteritems():
111 try:
112 for source in provides_dict[k]:
113 for m in v:
114 try:
115 for dest in provides_dict[m]:
116 # no deps, pass
117 try:
118 step_graph[source].append(dest)
119 except:
120 step_graph[source]=[dest]
121 except KeyError:
122 pass
123
124 except KeyError:
125 pass
126 # no dependencies, pass
127
128 #import pdb
129 #pdb.set_trace()
130 if (self.backend_dependency_graph):
131 backend_dict = {}
132 for s in self.sync_steps:
133 for m in s.serves:
134 backend_dict[m]=s.__name__
135
136 for k,v in backend_dependency_graph.iteritems():
137 try:
138 source = backend_dict[k]
139 for m in v:
140 try:
141 dest = backend_dict[m]
142 except KeyError:
143 # no deps, pass
144 pass
145 step_graph[source]=dest
146
147 except KeyError:
148 pass
149 # no dependencies, pass
Sapan Bhatia24836f12013-08-27 10:16:05 -0400150
Tony Mackce79de02013-09-24 10:12:33 -0400151 dependency_graph = step_graph
Sapan Bhatia24836f12013-08-27 10:16:05 -0400152
Sapan Bhatia972a2e82013-10-02 00:03:02 -0400153 self.ordered_steps = toposort(dependency_graph, map(lambda s:s.__name__,self.sync_steps))
Tony Mackce79de02013-09-24 10:12:33 -0400154 print "Order of steps=",self.ordered_steps
155 self.load_run_times()
156
Sapan Bhatia24836f12013-08-27 10:16:05 -0400157
Tony Mackae7f30c2013-09-25 12:46:50 -0400158 def check_duration(self, step, duration):
Tony Mackce79de02013-09-24 10:12:33 -0400159 try:
Tony Mackae7f30c2013-09-25 12:46:50 -0400160 if (duration > step.deadline):
161 logger.info('Sync step %s missed deadline, took %.2f seconds'%(step.name,duration))
Tony Mackce79de02013-09-24 10:12:33 -0400162 except AttributeError:
163 # S doesn't have a deadline
164 pass
Sapan Bhatia24836f12013-08-27 10:16:05 -0400165
Tony Mackce79de02013-09-24 10:12:33 -0400166 def update_run_time(self, step):
Tony Mackae7f30c2013-09-25 12:46:50 -0400167 self.last_run_times[step.__name__]=time.time()
Sapan Bhatia13c7f112013-09-02 14:19:35 -0400168
Tony Mackce79de02013-09-24 10:12:33 -0400169 def check_schedule(self, step):
Tony Mackea41f562013-09-25 08:10:40 -0400170 time_since_last_run = time.time() - self.last_run_times[step.__name__]
Tony Mackce79de02013-09-24 10:12:33 -0400171 try:
172 if (time_since_last_run < step.requested_interval):
173 raise StepNotReady
174 except AttributeError:
Tony Mackea41f562013-09-25 08:10:40 -0400175 logger.info('Step %s does not have requested_interval set'%step.__name__)
Tony Mackce79de02013-09-24 10:12:33 -0400176 raise StepNotReady
177
178 def load_run_times(self):
179 try:
180 jrun_times = open('/tmp/observer_run_times').read()
181 self.last_run_times = json.loads(jrun_times)
182 except:
183 self.last_run_times={}
184 for e in self.ordered_steps:
Tony Mackc12d5ca2013-09-24 10:30:39 -0400185 self.last_run_times[e]=0
Sapan Bhatia36938ca2013-09-02 14:35:24 -0400186
187
188
Tony Mackce79de02013-09-24 10:12:33 -0400189 def save_run_times(self):
190 run_times = json.dumps(self.last_run_times)
191 open('/tmp/observer_run_times','w').write(run_times)
Sapan Bhatia36938ca2013-09-02 14:35:24 -0400192
Tony Mackce79de02013-09-24 10:12:33 -0400193 def check_class_dependency(self, step, failed_steps):
194 for failed_step in failed_steps:
Tony Mack4fa85fb2013-09-25 14:39:57 -0400195 step.dependencies = self.model_dependency_graph.get(step.provides[0].__name__, [])
196 if (failed_step in step.dependencies):
Tony Mackce79de02013-09-24 10:12:33 -0400197 raise StepNotReady
Sapan Bhatia13c7f112013-09-02 14:19:35 -0400198
Tony Mackce79de02013-09-24 10:12:33 -0400199 def run(self):
200 if not self.driver.enabled or not self.driver.has_openstack:
201 return
Tony Mackce79de02013-09-24 10:12:33 -0400202 while True:
203 try:
204 logger.info('Waiting for event')
205 tBeforeWait = time.time()
Sapan Bhatia972a2e82013-10-02 00:03:02 -0400206 self.wait_for_event(timeout=30)
Tony Mackce79de02013-09-24 10:12:33 -0400207 logger.info('Observer woke up')
Sapan Bhatia13c7f112013-09-02 14:19:35 -0400208
Tony Mackce79de02013-09-24 10:12:33 -0400209 # Set of whole steps that failed
210 failed_steps = []
Sapan Bhatia13c7f112013-09-02 14:19:35 -0400211
Tony Mackce79de02013-09-24 10:12:33 -0400212 # Set of individual objects within steps that failed
213 failed_step_objects = []
Sapan Bhatia24836f12013-08-27 10:16:05 -0400214
Tony Mackce79de02013-09-24 10:12:33 -0400215 for S in self.ordered_steps:
Tony Mack3bf77b02013-09-25 00:49:50 -0400216 step = self.step_lookup[S]
Tony Mackce79de02013-09-24 10:12:33 -0400217 start_time=time.time()
218
Tony Mack3bf77b02013-09-25 00:49:50 -0400219 sync_step = step(driver=self.driver)
Sapan Bhatia972a2e82013-10-02 00:03:02 -0400220 sync_step.__name__ = step.__name__
Sapan Bhatiaca2e21f2013-10-02 01:10:02 -0400221 sync_step.dependencies = []
222 try:
223 mlist = sync_step.provides
224
225 for m in mlist:
226 sync_step.dependencies.extend(self.model_dependency_graph[m.__name__])
227 except KeyError:
228 pass
Tony Mackce79de02013-09-24 10:12:33 -0400229 sync_step.debug_mode = debug_mode
Sapan Bhatia24836f12013-08-27 10:16:05 -0400230
Tony Mackce79de02013-09-24 10:12:33 -0400231 should_run = False
232 try:
233 # Various checks that decide whether
234 # this step runs or not
235 self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed
236 self.check_schedule(sync_step) # dont run sync_network_routes if time since last run < 1 hour
237 should_run = True
238 except StepNotReady:
Tony Mackae7f30c2013-09-25 12:46:50 -0400239 logging.info('Step not ready: %s'%sync_step.__name__)
Tony Mack3bf77b02013-09-25 00:49:50 -0400240 failed_steps.append(sync_step)
Tony Mackce79de02013-09-24 10:12:33 -0400241 except:
Tony Mack3bf77b02013-09-25 00:49:50 -0400242 failed_steps.append(sync_step)
Sapan Bhatia24836f12013-08-27 10:16:05 -0400243
Tony Mackce79de02013-09-24 10:12:33 -0400244 if (should_run):
245 try:
246 duration=time.time() - start_time
Sapan Bhatia13c7f112013-09-02 14:19:35 -0400247
Tony Mackce79de02013-09-24 10:12:33 -0400248 # ********* This is the actual sync step
Sapan Bhatiaca2e21f2013-10-02 01:10:02 -0400249 import pdb
250 pdb.set_trace()
Tony Mackce79de02013-09-24 10:12:33 -0400251 failed_objects = sync_step(failed=failed_step_objects)
Sapan Bhatia13c7f112013-09-02 14:19:35 -0400252
253
Tony Mackae7f30c2013-09-25 12:46:50 -0400254 self.check_duration(sync_step, duration)
255 if failed_objects:
256 failed_step_objects.extend(failed_objects)
Tony Mackce79de02013-09-24 10:12:33 -0400257 self.update_run_time(sync_step)
258 except:
Sapan Bhatia972a2e82013-10-02 00:03:02 -0400259 raise
Tony Mackea41f562013-09-25 08:10:40 -0400260 failed_steps.append(S)
Tony Mackce79de02013-09-24 10:12:33 -0400261 self.save_run_times()
262 except:
263 logger.log_exc("Exception in observer run loop")
264 traceback.print_exc()