blob: 4b1150403a90e43fbb869bc6979c3af016f5bde0 [file] [log] [blame]
Sapan Bhatia24836f12013-08-27 10:16:05 -04001import time
2import traceback
3import commands
4import threading
5import json
6
7from datetime import datetime
8from collections import defaultdict
9from core.models import *
10from django.db.models import F, Q
11from openstack.manager import OpenStackManager
12from util.logger import Logger, logging, logger
13#from timeout import timeout
14
15
16logger = Logger(logfile='observer.log', level=logging.INFO)
17
18def toposort(g, steps):
19 reverse = {}
20
21 for k,v in g.items():
22 for rk in v:
23 try:
24 reverse[rk].append(k)
25 except:
26 reverse[rk]=k
27
28 sources = []
29 for k,v in g.items():
30 if not reverse.has_key(k):
31 sources.append(k)
32
33
34 for k,v in reverse.iteritems():
35 if (not v):
36 sources.append(k)
37
38 order = []
39 marked = []
40 while sources:
41 n = sources.pop()
42 try:
43 for m in g[n]:
44 if m not in marked:
45 sources.append(m)
46 marked.append(m)
47 except KeyError:
48 pass
49 if (n in steps):
50 order.append(n)
51
52 return order
53
54class PlanetStackObserver:
55 sync_steps = ['SyncNetworks','SyncNetworkSlivers','SyncSites','SyncSitePrivileges','SyncSlices','SyncSliceMemberships','SyncSlivers','SyncSliverIps']
56
57 def __init__(self):
58 self.manager = OpenStackManager()
59 # The Condition object that gets signalled by Feefie events
60 self.load_sync_steps()
61 self.event_cond = threading.Condition()
62 self.load_enacted()
63
64 def wait_for_event(self, timeout):
65 self.event_cond.acquire()
66 self.event_cond.wait(timeout)
67 self.event_cond.release()
68
69 def wake_up(self):
70 logger.info('Wake up routine called. Event cond %r'%self.event_cond)
71 self.event_cond.acquire()
72 self.event_cond.notify()
73 self.event_cond.release()
74
75 def load_sync_steps(self):
76 dep_path = Config().pl_dependency_path
77 try:
78 # This contains dependencies between records, not sync steps
79 self.model_dependency_graph = json.loads(open(dep_path).read())
80 except Exception,e:
81 raise e
82
83 backend_path = Config().backend_dependency_path
84 try:
85 # This contains dependencies between backend records
86 self.backend_dependency_graph = json.loads(open(backend_path).read())
87 except Exception,e:
88 raise e
89
90 provides_dict = {}
91 for s in sync_steps:
92 for m in s.provides:
93 provides_dict[m]=s.__name__
94
95 step_graph = {}
96 for k,v in model_dependency_graph.iteritems():
97 try:
98 source = provides_dict[k]
99 for m in v:
100 try:
101 dest = provides_dict[m]
102 except KeyError:
103 # no deps, pass
104 step_graph[source]=dest
105
106 except KeyError:
107 pass
108 # no dependencies, pass
109
110 if (backend_dependency_graph):
111 backend_dict = {}
112 for s in sync_steps:
113 for m in s.serves:
114 backend_dict[m]=s.__name__
115
116 for k,v in backend_dependency_graph.iteritems():
117 try:
118 source = backend_dict[k]
119 for m in v:
120 try:
121 dest = backend_dict[m]
122 except KeyError:
123 # no deps, pass
124 step_graph[source]=dest
125
126 except KeyError:
127 pass
128 # no dependencies, pass
129
130 dependency_graph = step_graph
131
132 self.ordered_steps = toposort(dependency_graph, steps)
133
134
135 def run(self):
136 if not self.manager.enabled or not self.manager.has_openstack:
137 return
138
139
140 while True:
141 try:
142 start_time=time.time()
143
144 logger.info('Waiting for event')
145 tBeforeWait = time.time()
146 self.wait_for_event(timeout=300)
147
148 for S in self.ordered_steps:
149 sync_step = S()
150 sync_step()
151
152 # Enforce 5 minutes between wakeups
153 tSleep = 300 - (time.time() - tBeforeWait)
154 if tSleep > 0:
155 logger.info('Sleeping for %d seconds' % tSleep)
156 time.sleep(tSleep)
157
158 logger.info('Observer woke up')
159 except:
160 logger.log_exc("Exception in observer run loop")
161 traceback.print_exc()