blob: d408acea439588a0e94e55c476e729ca8dfffd99 [file] [log] [blame]
Scott Bakerbba67b62019-01-28 17:38:21 -08001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
Scott Bakerbba67b62019-01-28 17:38:21 -080015""" ModelAccessor
16
17 A class for abstracting access to models. Used to get any djangoisms out
18 of the synchronizer code base.
19
20 This module will import all models into this module's global scope, so doing
21 a "from modelaccessor import *" from a calling module ought to import all
22 models into the calling module's scope.
23"""
24
Zack Williams5c2ea232019-01-30 15:23:01 -070025from __future__ import absolute_import
26
Scott Bakerbba67b62019-01-28 17:38:21 -080027import functools
28import importlib
29import os
30import signal
31import sys
Scott Baker7ff8ad92019-02-15 17:02:41 -080032from threading import Timer
Scott Bakerbba67b62019-01-28 17:38:21 -080033
34from xosconfig import Config
Scott Bakerbba67b62019-01-28 17:38:21 -080035
Zack Williams5c2ea232019-01-30 15:23:01 -070036from .loadmodels import ModelLoadClient
37
38from multistructlog import create_logger
Scott Bakerbba67b62019-01-28 17:38:21 -080039log = create_logger(Config().get("logging"))
40
Scott Baker7ff8ad92019-02-15 17:02:41 -080041after_reactor_exit_code = None
Scott Bakerbba67b62019-01-28 17:38:21 -080042orig_sigint = None
43model_accessor = None
44
Zack Williams5c2ea232019-01-30 15:23:01 -070045
Scott Bakerbba67b62019-01-28 17:38:21 -080046class ModelAccessor(object):
47 def __init__(self):
48 self.all_model_classes = self.get_all_model_classes()
49
50 def __getattr__(self, name):
51 """ Wrapper for getattr to facilitate retrieval of classes """
52 has_model_class = self.__getattribute__("has_model_class")
53 get_model_class = self.__getattribute__("get_model_class")
54 if has_model_class(name):
55 return get_model_class(name)
56
57 # Default behaviour
58 return self.__getattribute__(name)
59
60 def get_all_model_classes(self):
61 """ Build a dictionary of all model class names """
62 raise Exception("Not Implemented")
63
64 def get_model_class(self, name):
65 """ Given a class name, return that model class """
66 return self.all_model_classes[name]
67
68 def has_model_class(self, name):
69 """ Given a class name, return that model class """
70 return name in self.all_model_classes
71
72 def fetch_pending(self, main_objs, deletion=False):
73 """ Execute the default fetch_pending query """
74 raise Exception("Not Implemented")
75
76 def fetch_policies(self, main_objs, deletion=False):
77 """ Execute the default fetch_pending query """
78 raise Exception("Not Implemented")
79
80 def reset_queries(self):
81 """ Reset any state between passes of synchronizer. For django, to
82 limit memory consumption of cached queries.
83 """
84 pass
85
86 def connection_close(self):
87 """ Close any active database connection. For django, to limit memory
88 consumption.
89 """
90 pass
91
92 def check_db_connection_okay(self):
93 """ Checks to make sure the db connection is okay """
94 pass
95
96 def obj_exists(self, o):
97 """ Return True if the object exists in the data model """
98 raise Exception("Not Implemented")
99
100 def obj_in_list(self, o, olist):
101 """ Return True if o is the same as one of the objects in olist """
102 raise Exception("Not Implemented")
103
104 def now(self):
105 """ Return the current time for timestamping purposes """
106 raise Exception("Not Implemented")
107
108 def is_type(self, obj, name):
109 """ returns True is obj is of model type "name" """
110 raise Exception("Not Implemented")
111
112 def is_instance(self, obj, name):
113 """ returns True if obj is of model type "name" or is a descendant """
114 raise Exception("Not Implemented")
115
116 def get_content_type_id(self, obj):
117 raise Exception("Not Implemented")
118
119 def journal_object(self, o, operation, msg=None, timestamp=None):
120 pass
121
122 def create_obj(self, cls, **kwargs):
123 raise Exception("Not Implemented")
124
125
126def import_models_to_globals():
127 # add all models to globals
128 for (k, v) in model_accessor.all_model_classes.items():
129 globals()[k] = v
130
131 # xosbase doesn't exist from the synchronizer's perspective, so fake out
132 # ModelLink.
133 if "ModelLink" not in globals():
134
135 class ModelLink:
136 def __init__(self, dest, via, into=None):
137 self.dest = dest
138 self.via = via
139 self.into = into
140
141 globals()["ModelLink"] = ModelLink
142
143
144def keep_trying(client, reactor):
145 # Keep checking the connection to wait for it to become unavailable.
146 # Then reconnect. The strategy is to send NoOp operations, one per second, until eventually a NoOp throws an
147 # exception. This will indicate the server has reset. When that happens, we force the client to reconnect, and
148 # it will download a new API from the server.
149
150 from xosapi.xos_grpc_client import Empty
151
152 try:
153 client.utility.NoOp(Empty())
154 except Exception as e:
155 # If we caught an exception, then the API has become unavailable.
156 # So reconnect.
157
158 log.exception("exception in NoOp", e=e)
159 log.info("restarting synchronizer")
160
161 os.execv(sys.executable, ["python"] + sys.argv)
162 return
163
164 reactor.callLater(1, functools.partial(keep_trying, client, reactor))
165
Zack Williams5c2ea232019-01-30 15:23:01 -0700166
Scott Baker7ff8ad92019-02-15 17:02:41 -0800167def unload_models(client, reactor, version):
168 # This function is called by a timer until it succeeds.
169 log.info("unload_models initiated by timer")
170
171 try:
172 result = ModelLoadClient(client).unload_models(
173 Config.get("name"),
174 version=version,
175 cleanup_behavior=ModelLoadClient.AUTOMATICALLY_CLEAN)
176
177 log.debug("Unload response", result=result)
178
179 if result.status in [result.SUCCESS, result.SUCCESS_NOTHING_CHANGED]:
180 log.info("Models successfully unloaded. Exiting with status", code=0)
181 sys.exit(0)
182
183 if result.status == result.TRYAGAIN:
184 log.info("TRYAGAIN received. Expect to try again in 30 seconds.")
185
Zack Williams5c2ea232019-01-30 15:23:01 -0700186 except Exception:
Scott Baker7ff8ad92019-02-15 17:02:41 -0800187 # If the synchronizer is operational, then assume the ORM's restart_on_disconnect will deal with the
188 # connection being lost.
189 log.exception("Error while unloading. Expect to try again in 30 seconds.")
190
191 Timer(30, functools.partial(unload_models, client, reactor, version)).start()
192
Zack Williams5c2ea232019-01-30 15:23:01 -0700193
Scott Baker7ff8ad92019-02-15 17:02:41 -0800194def exit_while_inside_reactor(reactor, code):
195 """ Calling sys.exit() while inside reactor ends up trapped by reactor.
196
197 So what we'll do is set a flag indicating we want to exit, then stop reactor, then return
198 """
199 global after_reactor_exit_code
200
201 reactor.stop()
202 signal.signal(signal.SIGINT, orig_sigint)
203 after_reactor_exit_code = code
204
Scott Bakerbba67b62019-01-28 17:38:21 -0800205
Scott Baker6d787c92019-02-26 17:04:53 -0800206def get_synchronizer_version():
207 import __main__ as synchronizer_main
208
209 # VERSION file should be in same directory as the synchronizer's __main__
210 if hasattr(synchronizer_main, "__file__"):
211 version_fn = os.path.join(os.path.dirname(synchronizer_main.__file__), "VERSION")
212 if os.path.exists(version_fn):
213 version = open(version_fn, "rt").readline().strip()
214 if version:
215 return version
216 return "unknown"
217
218
Scott Bakerbba67b62019-01-28 17:38:21 -0800219def grpcapi_reconnect(client, reactor):
220 global model_accessor
221
222 # Make sure to try to load models before trying to initialize the ORM. It might be the ORM is broken because it
223 # is waiting on our models.
224
225 if Config.get("models_dir"):
Scott Baker6d787c92019-02-26 17:04:53 -0800226 version = get_synchronizer_version()
Matteo Scandolo12651d72019-02-21 15:15:29 -0800227 log.info("Service version is %s" % version, core_version=Config.get("core_version"))
Scott Bakerbba67b62019-01-28 17:38:21 -0800228 try:
Scott Baker7ff8ad92019-02-15 17:02:41 -0800229 if Config.get("desired_state") == "load":
230 ModelLoadClient(client).upload_models(
231 Config.get("name"), Config.get("models_dir"), version=version
232 )
233 elif Config.get("desired_state") == "unload":
234 # Try for an easy unload. If there's no dirty models, then unload will succeed without
235 # requiring us to setup the synchronizer.
236 log.info("Trying for an easy unload_models")
237 result = ModelLoadClient(client).unload_models(
238 Config.get("name"),
239 version=version,
240 cleanup_behavior=1) # FIXME: hardcoded value for automatic delete
241 if result.status in [result.SUCCESS, result.SUCCESS_NOTHING_CHANGED]:
242 log.info("Models successfully unloaded. Synchronizer exiting")
243 exit_while_inside_reactor(reactor, 0)
244 return
245
246 # We couldn't unload the easy way, so we'll have to do it the hard way. Fall through and
247 # setup the synchronizer.
248 else:
249 log.error("Misconfigured", desired_state=Config.get("desired_state"))
250 exit_while_inside_reactor(reactor, -1)
251 return
Scott Bakerbba67b62019-01-28 17:38:21 -0800252 except Exception as e: # TODO: narrow exception scope
253 if (
254 hasattr(e, "code")
255 and callable(e.code)
256 and hasattr(e.code(), "name")
257 and (e.code().name) == "UNAVAILABLE"
258 ):
259 # We need to make sure we force a reconnection, as it's possible that we will end up downloading a
260 # new xos API.
261 log.info("grpc unavailable during loadmodels. Force a reconnect")
262 client.connected = False
263 client.connect()
264 return
Matteo Scandolo12651d72019-02-21 15:15:29 -0800265
266 elif (
267 hasattr(e, "code")
268 and callable(e.code)
269 and hasattr(e.code(), "name")
270 and (e.code().name) == "INVALID_ARGUMENT"
271 ):
272 # in this case there is a version mismatch between the service and the core,
273 # shut down the process so it's clear something is wrong
274 log.error(e.details())
275
276 # kill the process so the operator is aware something is wrong
277 log.info("shutting down")
278 exit_while_inside_reactor(reactor, 1)
279 return
280
Scott Bakerbba67b62019-01-28 17:38:21 -0800281 log.exception("failed to onboard models")
282 # If it's some other error, then we don't need to force a reconnect. Just try the LoadModels() again.
283 reactor.callLater(10, functools.partial(grpcapi_reconnect, client, reactor))
284 return
285
286 # If the ORM is broken, then wait for the orm to become available.
287
288 if not getattr(client, "xos_orm", None):
289 log.warning("No xos_orm. Will keep trying...")
290 reactor.callLater(1, functools.partial(keep_trying, client, reactor))
291 return
292
293 # this will prevent updated timestamps from being automatically updated
294 client.xos_orm.caller_kind = "synchronizer"
295
296 client.xos_orm.restart_on_disconnect = True
297
Zack Williams5c2ea232019-01-30 15:23:01 -0700298 from .apiaccessor import CoreApiModelAccessor
Scott Bakerbba67b62019-01-28 17:38:21 -0800299
300 model_accessor = CoreApiModelAccessor(orm=client.xos_orm)
301
302 # If required_models is set, then check to make sure the required_models
303 # are present. If not, then the synchronizer needs to go to sleep until
304 # the models show up.
305
306 required_models = Config.get("required_models")
307 if required_models:
308 required_models = [x.strip() for x in required_models]
309
310 missing = []
311 found = []
312 for model in required_models:
313 if model_accessor.has_model_class(model):
314 found.append(model)
315 else:
316 missing.append(model)
317
318 log.info("required_models, found:", models=", ".join(found))
319 if missing:
320 log.warning("required_models: missing", models=", ".join(missing))
321 # We're missing a required model. Give up and wait for the connection
322 # to reconnect, and hope our missing model has shown up.
323 reactor.callLater(1, functools.partial(keep_trying, client, reactor))
324 return
325
326 # import all models to global space
327 import_models_to_globals()
328
329 # Synchronizer framework isn't ready to embrace reactor yet...
330 reactor.stop()
331
332 # Restore the sigint handler
333 signal.signal(signal.SIGINT, orig_sigint)
334
Scott Baker7ff8ad92019-02-15 17:02:41 -0800335 # Check to see if we still want to unload
336 if Config.get("desired_state") == "unload":
337 Timer(30, functools.partial(unload_models, client, reactor, version)).start()
338
Scott Bakerbba67b62019-01-28 17:38:21 -0800339
340def config_accessor_grpcapi():
341 global orig_sigint
342
343 log.info("Connecting to the gRPC API")
344
345 grpcapi_endpoint = Config.get("accessor.endpoint")
346 grpcapi_username = Config.get("accessor.username")
347 grpcapi_password = Config.get("accessor.password")
348
349 # if password starts with "@", then retreive the password from a file
350 if grpcapi_password.startswith("@"):
351 fn = grpcapi_password[1:]
352 if not os.path.exists(fn):
353 raise Exception("%s does not exist" % fn)
354 grpcapi_password = open(fn).readline().strip()
355
356 from xosapi.xos_grpc_client import SecureClient
357 from twisted.internet import reactor
358
359 grpcapi_client = SecureClient(
360 endpoint=grpcapi_endpoint, username=grpcapi_username, password=grpcapi_password
361 )
362 grpcapi_client.set_reconnect_callback(
363 functools.partial(grpcapi_reconnect, grpcapi_client, reactor)
364 )
365 grpcapi_client.start()
366
367 # Start reactor. This will cause the client to connect and then execute
368 # grpcapi_callback().
369
370 # Reactor will take over SIGINT during reactor.run(), but does not return it when reactor.stop() is called.
371
372 orig_sigint = signal.getsignal(signal.SIGINT)
373
374 # Start reactor. This will cause the client to connect and then execute
375 # grpcapi_callback().
376
377 reactor.run()
378
Scott Baker7ff8ad92019-02-15 17:02:41 -0800379 # Catch if we wanted to stop while inside of a reactor callback
380 if after_reactor_exit_code is not None:
381 log.info("exiting with status", code=after_reactor_exit_code)
382 sys.exit(after_reactor_exit_code)
383
Scott Bakerbba67b62019-01-28 17:38:21 -0800384
385def config_accessor_mock():
386 global model_accessor
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800387
388 # the mock model accessor always gets built to a temporary location
Zack Williams5c2ea232019-01-30 15:23:01 -0700389 if "/tmp/mock_modelaccessor" not in sys.path:
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800390 sys.path.append("/tmp/mock_modelaccessor")
391
Scott Bakerbba67b62019-01-28 17:38:21 -0800392 from mock_modelaccessor import model_accessor as mock_model_accessor
393
394 model_accessor = mock_model_accessor
395
396 # mock_model_accessor doesn't have an all_model_classes field, so make one.
397 import mock_modelaccessor as mma
398
399 all_model_classes = {}
400 for k in dir(mma):
401 v = getattr(mma, k)
402 if hasattr(v, "leaf_model_name"):
403 all_model_classes[k] = v
404
405 model_accessor.all_model_classes = all_model_classes
406
407 import_models_to_globals()
408
409
410def config_accessor():
411 accessor_kind = Config.get("accessor.kind")
412
413 if accessor_kind == "testframework":
414 config_accessor_mock()
415 elif accessor_kind == "grpcapi":
416 config_accessor_grpcapi()
417 else:
418 raise Exception("Unknown accessor kind %s" % accessor_kind)
419
420 # now import any wrappers that the synchronizer needs to add to the ORM
421 if Config.get("wrappers"):
422 for wrapper_name in Config.get("wrappers"):
423 importlib.import_module(wrapper_name)
424
425
426config_accessor()