blob: c2d5114a5345db99cc099da0da3fa659cdc41119 [file] [log] [blame]
Scott Bakerbba67b62019-01-28 17:38:21 -08001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15
16""" ModelAccessor
17
18 A class for abstracting access to models. Used to get any djangoisms out
19 of the synchronizer code base.
20
21 This module will import all models into this module's global scope, so doing
22 a "from modelaccessor import *" from a calling module ought to import all
23 models into the calling module's scope.
24"""
25
26import functools
27import importlib
28import os
29import signal
30import sys
Scott Baker7ff8ad92019-02-15 17:02:41 -080031from threading import Timer
Scott Bakerbba67b62019-01-28 17:38:21 -080032from loadmodels import ModelLoadClient
33
34from xosconfig import Config
35from multistructlog import create_logger
36from xosutil.autodiscover_version import autodiscover_version_of_main
37
38log = create_logger(Config().get("logging"))
39
Scott Baker7ff8ad92019-02-15 17:02:41 -080040after_reactor_exit_code = None
Scott Bakerbba67b62019-01-28 17:38:21 -080041orig_sigint = None
42model_accessor = None
43
44
45class ModelAccessor(object):
46 def __init__(self):
47 self.all_model_classes = self.get_all_model_classes()
48
49 def __getattr__(self, name):
50 """ Wrapper for getattr to facilitate retrieval of classes """
51 has_model_class = self.__getattribute__("has_model_class")
52 get_model_class = self.__getattribute__("get_model_class")
53 if has_model_class(name):
54 return get_model_class(name)
55
56 # Default behaviour
57 return self.__getattribute__(name)
58
59 def get_all_model_classes(self):
60 """ Build a dictionary of all model class names """
61 raise Exception("Not Implemented")
62
63 def get_model_class(self, name):
64 """ Given a class name, return that model class """
65 return self.all_model_classes[name]
66
67 def has_model_class(self, name):
68 """ Given a class name, return that model class """
69 return name in self.all_model_classes
70
71 def fetch_pending(self, main_objs, deletion=False):
72 """ Execute the default fetch_pending query """
73 raise Exception("Not Implemented")
74
75 def fetch_policies(self, main_objs, deletion=False):
76 """ Execute the default fetch_pending query """
77 raise Exception("Not Implemented")
78
79 def reset_queries(self):
80 """ Reset any state between passes of synchronizer. For django, to
81 limit memory consumption of cached queries.
82 """
83 pass
84
85 def connection_close(self):
86 """ Close any active database connection. For django, to limit memory
87 consumption.
88 """
89 pass
90
91 def check_db_connection_okay(self):
92 """ Checks to make sure the db connection is okay """
93 pass
94
95 def obj_exists(self, o):
96 """ Return True if the object exists in the data model """
97 raise Exception("Not Implemented")
98
99 def obj_in_list(self, o, olist):
100 """ Return True if o is the same as one of the objects in olist """
101 raise Exception("Not Implemented")
102
103 def now(self):
104 """ Return the current time for timestamping purposes """
105 raise Exception("Not Implemented")
106
107 def is_type(self, obj, name):
108 """ returns True is obj is of model type "name" """
109 raise Exception("Not Implemented")
110
111 def is_instance(self, obj, name):
112 """ returns True if obj is of model type "name" or is a descendant """
113 raise Exception("Not Implemented")
114
115 def get_content_type_id(self, obj):
116 raise Exception("Not Implemented")
117
118 def journal_object(self, o, operation, msg=None, timestamp=None):
119 pass
120
121 def create_obj(self, cls, **kwargs):
122 raise Exception("Not Implemented")
123
124
125def import_models_to_globals():
126 # add all models to globals
127 for (k, v) in model_accessor.all_model_classes.items():
128 globals()[k] = v
129
130 # xosbase doesn't exist from the synchronizer's perspective, so fake out
131 # ModelLink.
132 if "ModelLink" not in globals():
133
134 class ModelLink:
135 def __init__(self, dest, via, into=None):
136 self.dest = dest
137 self.via = via
138 self.into = into
139
140 globals()["ModelLink"] = ModelLink
141
142
143def keep_trying(client, reactor):
144 # Keep checking the connection to wait for it to become unavailable.
145 # Then reconnect. The strategy is to send NoOp operations, one per second, until eventually a NoOp throws an
146 # exception. This will indicate the server has reset. When that happens, we force the client to reconnect, and
147 # it will download a new API from the server.
148
149 from xosapi.xos_grpc_client import Empty
150
151 try:
152 client.utility.NoOp(Empty())
153 except Exception as e:
154 # If we caught an exception, then the API has become unavailable.
155 # So reconnect.
156
157 log.exception("exception in NoOp", e=e)
158 log.info("restarting synchronizer")
159
160 os.execv(sys.executable, ["python"] + sys.argv)
161 return
162
163 reactor.callLater(1, functools.partial(keep_trying, client, reactor))
164
Scott Baker7ff8ad92019-02-15 17:02:41 -0800165def unload_models(client, reactor, version):
166 # This function is called by a timer until it succeeds.
167 log.info("unload_models initiated by timer")
168
169 try:
170 result = ModelLoadClient(client).unload_models(
171 Config.get("name"),
172 version=version,
173 cleanup_behavior=ModelLoadClient.AUTOMATICALLY_CLEAN)
174
175 log.debug("Unload response", result=result)
176
177 if result.status in [result.SUCCESS, result.SUCCESS_NOTHING_CHANGED]:
178 log.info("Models successfully unloaded. Exiting with status", code=0)
179 sys.exit(0)
180
181 if result.status == result.TRYAGAIN:
182 log.info("TRYAGAIN received. Expect to try again in 30 seconds.")
183
184 except Exception as e:
185 # If the synchronizer is operational, then assume the ORM's restart_on_disconnect will deal with the
186 # connection being lost.
187 log.exception("Error while unloading. Expect to try again in 30 seconds.")
188
189 Timer(30, functools.partial(unload_models, client, reactor, version)).start()
190
191def exit_while_inside_reactor(reactor, code):
192 """ Calling sys.exit() while inside reactor ends up trapped by reactor.
193
194 So what we'll do is set a flag indicating we want to exit, then stop reactor, then return
195 """
196 global after_reactor_exit_code
197
198 reactor.stop()
199 signal.signal(signal.SIGINT, orig_sigint)
200 after_reactor_exit_code = code
201
Scott Bakerbba67b62019-01-28 17:38:21 -0800202
203def grpcapi_reconnect(client, reactor):
204 global model_accessor
205
206 # Make sure to try to load models before trying to initialize the ORM. It might be the ORM is broken because it
207 # is waiting on our models.
208
209 if Config.get("models_dir"):
210 version = autodiscover_version_of_main(max_parent_depth=0) or "unknown"
211 log.info("Service version is %s" % version)
212 try:
Scott Baker7ff8ad92019-02-15 17:02:41 -0800213 if Config.get("desired_state") == "load":
214 ModelLoadClient(client).upload_models(
215 Config.get("name"), Config.get("models_dir"), version=version
216 )
217 elif Config.get("desired_state") == "unload":
218 # Try for an easy unload. If there's no dirty models, then unload will succeed without
219 # requiring us to setup the synchronizer.
220 log.info("Trying for an easy unload_models")
221 result = ModelLoadClient(client).unload_models(
222 Config.get("name"),
223 version=version,
224 cleanup_behavior=1) # FIXME: hardcoded value for automatic delete
225 if result.status in [result.SUCCESS, result.SUCCESS_NOTHING_CHANGED]:
226 log.info("Models successfully unloaded. Synchronizer exiting")
227 exit_while_inside_reactor(reactor, 0)
228 return
229
230 # We couldn't unload the easy way, so we'll have to do it the hard way. Fall through and
231 # setup the synchronizer.
232 else:
233 log.error("Misconfigured", desired_state=Config.get("desired_state"))
234 exit_while_inside_reactor(reactor, -1)
235 return
Scott Bakerbba67b62019-01-28 17:38:21 -0800236 except Exception as e: # TODO: narrow exception scope
237 if (
238 hasattr(e, "code")
239 and callable(e.code)
240 and hasattr(e.code(), "name")
241 and (e.code().name) == "UNAVAILABLE"
242 ):
243 # We need to make sure we force a reconnection, as it's possible that we will end up downloading a
244 # new xos API.
245 log.info("grpc unavailable during loadmodels. Force a reconnect")
246 client.connected = False
247 client.connect()
248 return
249 log.exception("failed to onboard models")
250 # If it's some other error, then we don't need to force a reconnect. Just try the LoadModels() again.
251 reactor.callLater(10, functools.partial(grpcapi_reconnect, client, reactor))
252 return
253
254 # If the ORM is broken, then wait for the orm to become available.
255
256 if not getattr(client, "xos_orm", None):
257 log.warning("No xos_orm. Will keep trying...")
258 reactor.callLater(1, functools.partial(keep_trying, client, reactor))
259 return
260
261 # this will prevent updated timestamps from being automatically updated
262 client.xos_orm.caller_kind = "synchronizer"
263
264 client.xos_orm.restart_on_disconnect = True
265
266 from apiaccessor import CoreApiModelAccessor
267
268 model_accessor = CoreApiModelAccessor(orm=client.xos_orm)
269
270 # If required_models is set, then check to make sure the required_models
271 # are present. If not, then the synchronizer needs to go to sleep until
272 # the models show up.
273
274 required_models = Config.get("required_models")
275 if required_models:
276 required_models = [x.strip() for x in required_models]
277
278 missing = []
279 found = []
280 for model in required_models:
281 if model_accessor.has_model_class(model):
282 found.append(model)
283 else:
284 missing.append(model)
285
286 log.info("required_models, found:", models=", ".join(found))
287 if missing:
288 log.warning("required_models: missing", models=", ".join(missing))
289 # We're missing a required model. Give up and wait for the connection
290 # to reconnect, and hope our missing model has shown up.
291 reactor.callLater(1, functools.partial(keep_trying, client, reactor))
292 return
293
294 # import all models to global space
295 import_models_to_globals()
296
297 # Synchronizer framework isn't ready to embrace reactor yet...
298 reactor.stop()
299
300 # Restore the sigint handler
301 signal.signal(signal.SIGINT, orig_sigint)
302
Scott Baker7ff8ad92019-02-15 17:02:41 -0800303 # Check to see if we still want to unload
304 if Config.get("desired_state") == "unload":
305 Timer(30, functools.partial(unload_models, client, reactor, version)).start()
306
Scott Bakerbba67b62019-01-28 17:38:21 -0800307
308def config_accessor_grpcapi():
309 global orig_sigint
310
311 log.info("Connecting to the gRPC API")
312
313 grpcapi_endpoint = Config.get("accessor.endpoint")
314 grpcapi_username = Config.get("accessor.username")
315 grpcapi_password = Config.get("accessor.password")
316
317 # if password starts with "@", then retreive the password from a file
318 if grpcapi_password.startswith("@"):
319 fn = grpcapi_password[1:]
320 if not os.path.exists(fn):
321 raise Exception("%s does not exist" % fn)
322 grpcapi_password = open(fn).readline().strip()
323
324 from xosapi.xos_grpc_client import SecureClient
325 from twisted.internet import reactor
326
327 grpcapi_client = SecureClient(
328 endpoint=grpcapi_endpoint, username=grpcapi_username, password=grpcapi_password
329 )
330 grpcapi_client.set_reconnect_callback(
331 functools.partial(grpcapi_reconnect, grpcapi_client, reactor)
332 )
333 grpcapi_client.start()
334
335 # Start reactor. This will cause the client to connect and then execute
336 # grpcapi_callback().
337
338 # Reactor will take over SIGINT during reactor.run(), but does not return it when reactor.stop() is called.
339
340 orig_sigint = signal.getsignal(signal.SIGINT)
341
342 # Start reactor. This will cause the client to connect and then execute
343 # grpcapi_callback().
344
345 reactor.run()
346
Scott Baker7ff8ad92019-02-15 17:02:41 -0800347 # Catch if we wanted to stop while inside of a reactor callback
348 if after_reactor_exit_code is not None:
349 log.info("exiting with status", code=after_reactor_exit_code)
350 sys.exit(after_reactor_exit_code)
351
Scott Bakerbba67b62019-01-28 17:38:21 -0800352
353def config_accessor_mock():
354 global model_accessor
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800355
356 # the mock model accessor always gets built to a temporary location
357 if not "/tmp/mock_modelaccessor" in sys.path:
358 sys.path.append("/tmp/mock_modelaccessor")
359
Scott Bakerbba67b62019-01-28 17:38:21 -0800360 from mock_modelaccessor import model_accessor as mock_model_accessor
361
362 model_accessor = mock_model_accessor
363
364 # mock_model_accessor doesn't have an all_model_classes field, so make one.
365 import mock_modelaccessor as mma
366
367 all_model_classes = {}
368 for k in dir(mma):
369 v = getattr(mma, k)
370 if hasattr(v, "leaf_model_name"):
371 all_model_classes[k] = v
372
373 model_accessor.all_model_classes = all_model_classes
374
375 import_models_to_globals()
376
377
378def config_accessor():
379 accessor_kind = Config.get("accessor.kind")
380
381 if accessor_kind == "testframework":
382 config_accessor_mock()
383 elif accessor_kind == "grpcapi":
384 config_accessor_grpcapi()
385 else:
386 raise Exception("Unknown accessor kind %s" % accessor_kind)
387
388 # now import any wrappers that the synchronizer needs to add to the ORM
389 if Config.get("wrappers"):
390 for wrapper_name in Config.get("wrappers"):
391 importlib.import_module(wrapper_name)
392
393
394config_accessor()