blob: 28a3a83e9299604a7d65275b05a7777735317611 [file] [log] [blame]
Scott Bakerbba67b62019-01-28 17:38:21 -08001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
Scott Bakerbba67b62019-01-28 17:38:21 -080015""" ModelAccessor
16
17 A class for abstracting access to models. Used to get any djangoisms out
18 of the synchronizer code base.
19
20 This module will import all models into this module's global scope, so doing
21 a "from modelaccessor import *" from a calling module ought to import all
22 models into the calling module's scope.
23"""
24
Zack Williams5c2ea232019-01-30 15:23:01 -070025from __future__ import absolute_import
26
Scott Bakerbba67b62019-01-28 17:38:21 -080027import functools
28import importlib
29import os
30import signal
31import sys
Scott Baker7ff8ad92019-02-15 17:02:41 -080032from threading import Timer
Scott Bakerbba67b62019-01-28 17:38:21 -080033
34from xosconfig import Config
Scott Bakerbba67b62019-01-28 17:38:21 -080035
Zack Williams5c2ea232019-01-30 15:23:01 -070036from .loadmodels import ModelLoadClient
37
38from multistructlog import create_logger
Scott Bakerbba67b62019-01-28 17:38:21 -080039log = create_logger(Config().get("logging"))
40
Scott Baker7ff8ad92019-02-15 17:02:41 -080041after_reactor_exit_code = None
Scott Bakerbba67b62019-01-28 17:38:21 -080042orig_sigint = None
43model_accessor = None
44
Zack Williams5c2ea232019-01-30 15:23:01 -070045
Scott Bakerbba67b62019-01-28 17:38:21 -080046class ModelAccessor(object):
47 def __init__(self):
48 self.all_model_classes = self.get_all_model_classes()
49
50 def __getattr__(self, name):
51 """ Wrapper for getattr to facilitate retrieval of classes """
52 has_model_class = self.__getattribute__("has_model_class")
53 get_model_class = self.__getattribute__("get_model_class")
54 if has_model_class(name):
55 return get_model_class(name)
56
57 # Default behaviour
58 return self.__getattribute__(name)
59
60 def get_all_model_classes(self):
61 """ Build a dictionary of all model class names """
62 raise Exception("Not Implemented")
63
64 def get_model_class(self, name):
65 """ Given a class name, return that model class """
66 return self.all_model_classes[name]
67
68 def has_model_class(self, name):
69 """ Given a class name, return that model class """
70 return name in self.all_model_classes
71
72 def fetch_pending(self, main_objs, deletion=False):
73 """ Execute the default fetch_pending query """
74 raise Exception("Not Implemented")
75
76 def fetch_policies(self, main_objs, deletion=False):
77 """ Execute the default fetch_pending query """
78 raise Exception("Not Implemented")
79
80 def reset_queries(self):
81 """ Reset any state between passes of synchronizer. For django, to
82 limit memory consumption of cached queries.
83 """
84 pass
85
86 def connection_close(self):
87 """ Close any active database connection. For django, to limit memory
88 consumption.
89 """
90 pass
91
92 def check_db_connection_okay(self):
93 """ Checks to make sure the db connection is okay """
94 pass
95
96 def obj_exists(self, o):
97 """ Return True if the object exists in the data model """
98 raise Exception("Not Implemented")
99
100 def obj_in_list(self, o, olist):
101 """ Return True if o is the same as one of the objects in olist """
102 raise Exception("Not Implemented")
103
104 def now(self):
105 """ Return the current time for timestamping purposes """
106 raise Exception("Not Implemented")
107
108 def is_type(self, obj, name):
109 """ returns True is obj is of model type "name" """
110 raise Exception("Not Implemented")
111
112 def is_instance(self, obj, name):
113 """ returns True if obj is of model type "name" or is a descendant """
114 raise Exception("Not Implemented")
115
116 def get_content_type_id(self, obj):
117 raise Exception("Not Implemented")
118
119 def journal_object(self, o, operation, msg=None, timestamp=None):
120 pass
121
122 def create_obj(self, cls, **kwargs):
123 raise Exception("Not Implemented")
124
125
126def import_models_to_globals():
127 # add all models to globals
128 for (k, v) in model_accessor.all_model_classes.items():
129 globals()[k] = v
130
131 # xosbase doesn't exist from the synchronizer's perspective, so fake out
132 # ModelLink.
133 if "ModelLink" not in globals():
134
135 class ModelLink:
136 def __init__(self, dest, via, into=None):
137 self.dest = dest
138 self.via = via
139 self.into = into
140
141 globals()["ModelLink"] = ModelLink
142
143
144def keep_trying(client, reactor):
145 # Keep checking the connection to wait for it to become unavailable.
146 # Then reconnect. The strategy is to send NoOp operations, one per second, until eventually a NoOp throws an
147 # exception. This will indicate the server has reset. When that happens, we force the client to reconnect, and
148 # it will download a new API from the server.
149
150 from xosapi.xos_grpc_client import Empty
151
152 try:
153 client.utility.NoOp(Empty())
154 except Exception as e:
155 # If we caught an exception, then the API has become unavailable.
156 # So reconnect.
157
158 log.exception("exception in NoOp", e=e)
159 log.info("restarting synchronizer")
160
161 os.execv(sys.executable, ["python"] + sys.argv)
162 return
163
164 reactor.callLater(1, functools.partial(keep_trying, client, reactor))
165
Zack Williams5c2ea232019-01-30 15:23:01 -0700166
Scott Baker7ff8ad92019-02-15 17:02:41 -0800167def unload_models(client, reactor, version):
168 # This function is called by a timer until it succeeds.
169 log.info("unload_models initiated by timer")
170
171 try:
172 result = ModelLoadClient(client).unload_models(
173 Config.get("name"),
174 version=version,
175 cleanup_behavior=ModelLoadClient.AUTOMATICALLY_CLEAN)
176
177 log.debug("Unload response", result=result)
178
179 if result.status in [result.SUCCESS, result.SUCCESS_NOTHING_CHANGED]:
180 log.info("Models successfully unloaded. Exiting with status", code=0)
181 sys.exit(0)
182
183 if result.status == result.TRYAGAIN:
184 log.info("TRYAGAIN received. Expect to try again in 30 seconds.")
185
Zack Williams5c2ea232019-01-30 15:23:01 -0700186 except Exception:
Scott Baker7ff8ad92019-02-15 17:02:41 -0800187 # If the synchronizer is operational, then assume the ORM's restart_on_disconnect will deal with the
188 # connection being lost.
189 log.exception("Error while unloading. Expect to try again in 30 seconds.")
190
191 Timer(30, functools.partial(unload_models, client, reactor, version)).start()
192
Zack Williams5c2ea232019-01-30 15:23:01 -0700193
Scott Baker7ff8ad92019-02-15 17:02:41 -0800194def exit_while_inside_reactor(reactor, code):
195 """ Calling sys.exit() while inside reactor ends up trapped by reactor.
196
197 So what we'll do is set a flag indicating we want to exit, then stop reactor, then return
198 """
199 global after_reactor_exit_code
200
201 reactor.stop()
202 signal.signal(signal.SIGINT, orig_sigint)
203 after_reactor_exit_code = code
204
Scott Bakerbba67b62019-01-28 17:38:21 -0800205
Scott Baker6d787c92019-02-26 17:04:53 -0800206def get_synchronizer_version():
207 import __main__ as synchronizer_main
208
209 # VERSION file should be in same directory as the synchronizer's __main__
210 if hasattr(synchronizer_main, "__file__"):
211 version_fn = os.path.join(os.path.dirname(synchronizer_main.__file__), "VERSION")
212 if os.path.exists(version_fn):
213 version = open(version_fn, "rt").readline().strip()
214 if version:
215 return version
216 return "unknown"
217
218
Scott Bakerbba67b62019-01-28 17:38:21 -0800219def grpcapi_reconnect(client, reactor):
220 global model_accessor
221
222 # Make sure to try to load models before trying to initialize the ORM. It might be the ORM is broken because it
223 # is waiting on our models.
224
225 if Config.get("models_dir"):
Scott Baker6d787c92019-02-26 17:04:53 -0800226 version = get_synchronizer_version()
Matteo Scandolo12651d72019-02-21 15:15:29 -0800227 log.info("Service version is %s" % version, core_version=Config.get("core_version"))
Scott Bakerbba67b62019-01-28 17:38:21 -0800228 try:
Scott Baker7ff8ad92019-02-15 17:02:41 -0800229 if Config.get("desired_state") == "load":
Scott Bakera2717c82019-06-06 10:55:11 -0700230 log.info("Calling Loadmodels")
Scott Baker7ff8ad92019-02-15 17:02:41 -0800231 ModelLoadClient(client).upload_models(
232 Config.get("name"), Config.get("models_dir"), version=version
233 )
Scott Bakera2717c82019-06-06 10:55:11 -0700234 log.info("Back from Loadmodels")
Scott Baker7ff8ad92019-02-15 17:02:41 -0800235 elif Config.get("desired_state") == "unload":
236 # Try for an easy unload. If there's no dirty models, then unload will succeed without
237 # requiring us to setup the synchronizer.
238 log.info("Trying for an easy unload_models")
239 result = ModelLoadClient(client).unload_models(
240 Config.get("name"),
241 version=version,
242 cleanup_behavior=1) # FIXME: hardcoded value for automatic delete
243 if result.status in [result.SUCCESS, result.SUCCESS_NOTHING_CHANGED]:
244 log.info("Models successfully unloaded. Synchronizer exiting")
245 exit_while_inside_reactor(reactor, 0)
246 return
247
248 # We couldn't unload the easy way, so we'll have to do it the hard way. Fall through and
249 # setup the synchronizer.
250 else:
251 log.error("Misconfigured", desired_state=Config.get("desired_state"))
252 exit_while_inside_reactor(reactor, -1)
253 return
Scott Bakerbba67b62019-01-28 17:38:21 -0800254 except Exception as e: # TODO: narrow exception scope
255 if (
256 hasattr(e, "code")
257 and callable(e.code)
258 and hasattr(e.code(), "name")
259 and (e.code().name) == "UNAVAILABLE"
260 ):
261 # We need to make sure we force a reconnection, as it's possible that we will end up downloading a
262 # new xos API.
263 log.info("grpc unavailable during loadmodels. Force a reconnect")
264 client.connected = False
265 client.connect()
266 return
Matteo Scandolo12651d72019-02-21 15:15:29 -0800267
268 elif (
269 hasattr(e, "code")
270 and callable(e.code)
271 and hasattr(e.code(), "name")
272 and (e.code().name) == "INVALID_ARGUMENT"
273 ):
274 # in this case there is a version mismatch between the service and the core,
275 # shut down the process so it's clear something is wrong
276 log.error(e.details())
277
278 # kill the process so the operator is aware something is wrong
279 log.info("shutting down")
280 exit_while_inside_reactor(reactor, 1)
281 return
282
Scott Bakerbba67b62019-01-28 17:38:21 -0800283 log.exception("failed to onboard models")
284 # If it's some other error, then we don't need to force a reconnect. Just try the LoadModels() again.
285 reactor.callLater(10, functools.partial(grpcapi_reconnect, client, reactor))
286 return
287
288 # If the ORM is broken, then wait for the orm to become available.
289
290 if not getattr(client, "xos_orm", None):
291 log.warning("No xos_orm. Will keep trying...")
292 reactor.callLater(1, functools.partial(keep_trying, client, reactor))
293 return
294
295 # this will prevent updated timestamps from being automatically updated
296 client.xos_orm.caller_kind = "synchronizer"
297
298 client.xos_orm.restart_on_disconnect = True
299
Zack Williams5c2ea232019-01-30 15:23:01 -0700300 from .apiaccessor import CoreApiModelAccessor
Scott Bakerbba67b62019-01-28 17:38:21 -0800301
302 model_accessor = CoreApiModelAccessor(orm=client.xos_orm)
303
304 # If required_models is set, then check to make sure the required_models
305 # are present. If not, then the synchronizer needs to go to sleep until
306 # the models show up.
307
308 required_models = Config.get("required_models")
309 if required_models:
310 required_models = [x.strip() for x in required_models]
311
312 missing = []
313 found = []
314 for model in required_models:
315 if model_accessor.has_model_class(model):
316 found.append(model)
317 else:
318 missing.append(model)
319
320 log.info("required_models, found:", models=", ".join(found))
321 if missing:
322 log.warning("required_models: missing", models=", ".join(missing))
323 # We're missing a required model. Give up and wait for the connection
324 # to reconnect, and hope our missing model has shown up.
325 reactor.callLater(1, functools.partial(keep_trying, client, reactor))
326 return
327
328 # import all models to global space
329 import_models_to_globals()
330
331 # Synchronizer framework isn't ready to embrace reactor yet...
332 reactor.stop()
333
334 # Restore the sigint handler
335 signal.signal(signal.SIGINT, orig_sigint)
336
Scott Baker7ff8ad92019-02-15 17:02:41 -0800337 # Check to see if we still want to unload
338 if Config.get("desired_state") == "unload":
339 Timer(30, functools.partial(unload_models, client, reactor, version)).start()
340
Scott Bakerbba67b62019-01-28 17:38:21 -0800341
342def config_accessor_grpcapi():
343 global orig_sigint
344
345 log.info("Connecting to the gRPC API")
346
347 grpcapi_endpoint = Config.get("accessor.endpoint")
348 grpcapi_username = Config.get("accessor.username")
349 grpcapi_password = Config.get("accessor.password")
350
351 # if password starts with "@", then retreive the password from a file
352 if grpcapi_password.startswith("@"):
353 fn = grpcapi_password[1:]
354 if not os.path.exists(fn):
355 raise Exception("%s does not exist" % fn)
356 grpcapi_password = open(fn).readline().strip()
357
358 from xosapi.xos_grpc_client import SecureClient
359 from twisted.internet import reactor
360
361 grpcapi_client = SecureClient(
362 endpoint=grpcapi_endpoint, username=grpcapi_username, password=grpcapi_password
363 )
364 grpcapi_client.set_reconnect_callback(
365 functools.partial(grpcapi_reconnect, grpcapi_client, reactor)
366 )
Scott Bakera2717c82019-06-06 10:55:11 -0700367 grpcapi_client.restart_on_protobuf_change = True
Scott Bakerbba67b62019-01-28 17:38:21 -0800368 grpcapi_client.start()
369
370 # Start reactor. This will cause the client to connect and then execute
371 # grpcapi_callback().
372
373 # Reactor will take over SIGINT during reactor.run(), but does not return it when reactor.stop() is called.
374
375 orig_sigint = signal.getsignal(signal.SIGINT)
376
377 # Start reactor. This will cause the client to connect and then execute
378 # grpcapi_callback().
379
380 reactor.run()
381
Scott Baker7ff8ad92019-02-15 17:02:41 -0800382 # Catch if we wanted to stop while inside of a reactor callback
383 if after_reactor_exit_code is not None:
384 log.info("exiting with status", code=after_reactor_exit_code)
385 sys.exit(after_reactor_exit_code)
386
Scott Bakerbba67b62019-01-28 17:38:21 -0800387
388def config_accessor_mock():
389 global model_accessor
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800390
391 # the mock model accessor always gets built to a temporary location
Zack Williams5c2ea232019-01-30 15:23:01 -0700392 if "/tmp/mock_modelaccessor" not in sys.path:
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800393 sys.path.append("/tmp/mock_modelaccessor")
394
Scott Bakerbba67b62019-01-28 17:38:21 -0800395 from mock_modelaccessor import model_accessor as mock_model_accessor
396
397 model_accessor = mock_model_accessor
398
399 # mock_model_accessor doesn't have an all_model_classes field, so make one.
400 import mock_modelaccessor as mma
401
402 all_model_classes = {}
403 for k in dir(mma):
404 v = getattr(mma, k)
405 if hasattr(v, "leaf_model_name"):
406 all_model_classes[k] = v
407
408 model_accessor.all_model_classes = all_model_classes
409
410 import_models_to_globals()
411
412
413def config_accessor():
414 accessor_kind = Config.get("accessor.kind")
415
416 if accessor_kind == "testframework":
417 config_accessor_mock()
418 elif accessor_kind == "grpcapi":
419 config_accessor_grpcapi()
420 else:
421 raise Exception("Unknown accessor kind %s" % accessor_kind)
422
423 # now import any wrappers that the synchronizer needs to add to the ORM
424 if Config.get("wrappers"):
425 for wrapper_name in Config.get("wrappers"):
426 importlib.import_module(wrapper_name)
427
428
429config_accessor()