blob: b4b26f398190a2172dff299bfe8f9e775c4834bb [file] [log] [blame]
Scott Bakerbba67b62019-01-28 17:38:21 -08001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
Scott Bakerbba67b62019-01-28 17:38:21 -080015""" ModelAccessor
16
17 A class for abstracting access to models. Used to get any djangoisms out
18 of the synchronizer code base.
19
20 This module will import all models into this module's global scope, so doing
21 a "from modelaccessor import *" from a calling module ought to import all
22 models into the calling module's scope.
23"""
24
Zack Williams5c2ea232019-01-30 15:23:01 -070025from __future__ import absolute_import
26
Scott Bakerbba67b62019-01-28 17:38:21 -080027import functools
28import importlib
29import os
30import signal
31import sys
Scott Baker7ff8ad92019-02-15 17:02:41 -080032from threading import Timer
Scott Bakerbba67b62019-01-28 17:38:21 -080033
34from xosconfig import Config
Scott Bakerbba67b62019-01-28 17:38:21 -080035from xosutil.autodiscover_version import autodiscover_version_of_main
36
Zack Williams5c2ea232019-01-30 15:23:01 -070037from .loadmodels import ModelLoadClient
38
39from multistructlog import create_logger
Scott Bakerbba67b62019-01-28 17:38:21 -080040log = create_logger(Config().get("logging"))
41
Scott Baker7ff8ad92019-02-15 17:02:41 -080042after_reactor_exit_code = None
Scott Bakerbba67b62019-01-28 17:38:21 -080043orig_sigint = None
44model_accessor = None
45
Zack Williams5c2ea232019-01-30 15:23:01 -070046
Scott Bakerbba67b62019-01-28 17:38:21 -080047class ModelAccessor(object):
48 def __init__(self):
49 self.all_model_classes = self.get_all_model_classes()
50
51 def __getattr__(self, name):
52 """ Wrapper for getattr to facilitate retrieval of classes """
53 has_model_class = self.__getattribute__("has_model_class")
54 get_model_class = self.__getattribute__("get_model_class")
55 if has_model_class(name):
56 return get_model_class(name)
57
58 # Default behaviour
59 return self.__getattribute__(name)
60
61 def get_all_model_classes(self):
62 """ Build a dictionary of all model class names """
63 raise Exception("Not Implemented")
64
65 def get_model_class(self, name):
66 """ Given a class name, return that model class """
67 return self.all_model_classes[name]
68
69 def has_model_class(self, name):
70 """ Given a class name, return that model class """
71 return name in self.all_model_classes
72
73 def fetch_pending(self, main_objs, deletion=False):
74 """ Execute the default fetch_pending query """
75 raise Exception("Not Implemented")
76
77 def fetch_policies(self, main_objs, deletion=False):
78 """ Execute the default fetch_pending query """
79 raise Exception("Not Implemented")
80
81 def reset_queries(self):
82 """ Reset any state between passes of synchronizer. For django, to
83 limit memory consumption of cached queries.
84 """
85 pass
86
87 def connection_close(self):
88 """ Close any active database connection. For django, to limit memory
89 consumption.
90 """
91 pass
92
93 def check_db_connection_okay(self):
94 """ Checks to make sure the db connection is okay """
95 pass
96
97 def obj_exists(self, o):
98 """ Return True if the object exists in the data model """
99 raise Exception("Not Implemented")
100
101 def obj_in_list(self, o, olist):
102 """ Return True if o is the same as one of the objects in olist """
103 raise Exception("Not Implemented")
104
105 def now(self):
106 """ Return the current time for timestamping purposes """
107 raise Exception("Not Implemented")
108
109 def is_type(self, obj, name):
110 """ returns True is obj is of model type "name" """
111 raise Exception("Not Implemented")
112
113 def is_instance(self, obj, name):
114 """ returns True if obj is of model type "name" or is a descendant """
115 raise Exception("Not Implemented")
116
117 def get_content_type_id(self, obj):
118 raise Exception("Not Implemented")
119
120 def journal_object(self, o, operation, msg=None, timestamp=None):
121 pass
122
123 def create_obj(self, cls, **kwargs):
124 raise Exception("Not Implemented")
125
126
127def import_models_to_globals():
128 # add all models to globals
129 for (k, v) in model_accessor.all_model_classes.items():
130 globals()[k] = v
131
132 # xosbase doesn't exist from the synchronizer's perspective, so fake out
133 # ModelLink.
134 if "ModelLink" not in globals():
135
136 class ModelLink:
137 def __init__(self, dest, via, into=None):
138 self.dest = dest
139 self.via = via
140 self.into = into
141
142 globals()["ModelLink"] = ModelLink
143
144
145def keep_trying(client, reactor):
146 # Keep checking the connection to wait for it to become unavailable.
147 # Then reconnect. The strategy is to send NoOp operations, one per second, until eventually a NoOp throws an
148 # exception. This will indicate the server has reset. When that happens, we force the client to reconnect, and
149 # it will download a new API from the server.
150
151 from xosapi.xos_grpc_client import Empty
152
153 try:
154 client.utility.NoOp(Empty())
155 except Exception as e:
156 # If we caught an exception, then the API has become unavailable.
157 # So reconnect.
158
159 log.exception("exception in NoOp", e=e)
160 log.info("restarting synchronizer")
161
162 os.execv(sys.executable, ["python"] + sys.argv)
163 return
164
165 reactor.callLater(1, functools.partial(keep_trying, client, reactor))
166
Zack Williams5c2ea232019-01-30 15:23:01 -0700167
Scott Baker7ff8ad92019-02-15 17:02:41 -0800168def unload_models(client, reactor, version):
169 # This function is called by a timer until it succeeds.
170 log.info("unload_models initiated by timer")
171
172 try:
173 result = ModelLoadClient(client).unload_models(
174 Config.get("name"),
175 version=version,
176 cleanup_behavior=ModelLoadClient.AUTOMATICALLY_CLEAN)
177
178 log.debug("Unload response", result=result)
179
180 if result.status in [result.SUCCESS, result.SUCCESS_NOTHING_CHANGED]:
181 log.info("Models successfully unloaded. Exiting with status", code=0)
182 sys.exit(0)
183
184 if result.status == result.TRYAGAIN:
185 log.info("TRYAGAIN received. Expect to try again in 30 seconds.")
186
Zack Williams5c2ea232019-01-30 15:23:01 -0700187 except Exception:
Scott Baker7ff8ad92019-02-15 17:02:41 -0800188 # If the synchronizer is operational, then assume the ORM's restart_on_disconnect will deal with the
189 # connection being lost.
190 log.exception("Error while unloading. Expect to try again in 30 seconds.")
191
192 Timer(30, functools.partial(unload_models, client, reactor, version)).start()
193
Zack Williams5c2ea232019-01-30 15:23:01 -0700194
Scott Baker7ff8ad92019-02-15 17:02:41 -0800195def exit_while_inside_reactor(reactor, code):
196 """ Calling sys.exit() while inside reactor ends up trapped by reactor.
197
198 So what we'll do is set a flag indicating we want to exit, then stop reactor, then return
199 """
200 global after_reactor_exit_code
201
202 reactor.stop()
203 signal.signal(signal.SIGINT, orig_sigint)
204 after_reactor_exit_code = code
205
Scott Bakerbba67b62019-01-28 17:38:21 -0800206
207def grpcapi_reconnect(client, reactor):
208 global model_accessor
209
210 # Make sure to try to load models before trying to initialize the ORM. It might be the ORM is broken because it
211 # is waiting on our models.
212
213 if Config.get("models_dir"):
214 version = autodiscover_version_of_main(max_parent_depth=0) or "unknown"
Matteo Scandolo12651d72019-02-21 15:15:29 -0800215 log.info("Service version is %s" % version, core_version=Config.get("core_version"))
Scott Bakerbba67b62019-01-28 17:38:21 -0800216 try:
Scott Baker7ff8ad92019-02-15 17:02:41 -0800217 if Config.get("desired_state") == "load":
218 ModelLoadClient(client).upload_models(
219 Config.get("name"), Config.get("models_dir"), version=version
220 )
221 elif Config.get("desired_state") == "unload":
222 # Try for an easy unload. If there's no dirty models, then unload will succeed without
223 # requiring us to setup the synchronizer.
224 log.info("Trying for an easy unload_models")
225 result = ModelLoadClient(client).unload_models(
226 Config.get("name"),
227 version=version,
228 cleanup_behavior=1) # FIXME: hardcoded value for automatic delete
229 if result.status in [result.SUCCESS, result.SUCCESS_NOTHING_CHANGED]:
230 log.info("Models successfully unloaded. Synchronizer exiting")
231 exit_while_inside_reactor(reactor, 0)
232 return
233
234 # We couldn't unload the easy way, so we'll have to do it the hard way. Fall through and
235 # setup the synchronizer.
236 else:
237 log.error("Misconfigured", desired_state=Config.get("desired_state"))
238 exit_while_inside_reactor(reactor, -1)
239 return
Scott Bakerbba67b62019-01-28 17:38:21 -0800240 except Exception as e: # TODO: narrow exception scope
241 if (
242 hasattr(e, "code")
243 and callable(e.code)
244 and hasattr(e.code(), "name")
245 and (e.code().name) == "UNAVAILABLE"
246 ):
247 # We need to make sure we force a reconnection, as it's possible that we will end up downloading a
248 # new xos API.
249 log.info("grpc unavailable during loadmodels. Force a reconnect")
250 client.connected = False
251 client.connect()
252 return
Matteo Scandolo12651d72019-02-21 15:15:29 -0800253
254 elif (
255 hasattr(e, "code")
256 and callable(e.code)
257 and hasattr(e.code(), "name")
258 and (e.code().name) == "INVALID_ARGUMENT"
259 ):
260 # in this case there is a version mismatch between the service and the core,
261 # shut down the process so it's clear something is wrong
262 log.error(e.details())
263
264 # kill the process so the operator is aware something is wrong
265 log.info("shutting down")
266 exit_while_inside_reactor(reactor, 1)
267 return
268
Scott Bakerbba67b62019-01-28 17:38:21 -0800269 log.exception("failed to onboard models")
270 # If it's some other error, then we don't need to force a reconnect. Just try the LoadModels() again.
271 reactor.callLater(10, functools.partial(grpcapi_reconnect, client, reactor))
272 return
273
274 # If the ORM is broken, then wait for the orm to become available.
275
276 if not getattr(client, "xos_orm", None):
277 log.warning("No xos_orm. Will keep trying...")
278 reactor.callLater(1, functools.partial(keep_trying, client, reactor))
279 return
280
281 # this will prevent updated timestamps from being automatically updated
282 client.xos_orm.caller_kind = "synchronizer"
283
284 client.xos_orm.restart_on_disconnect = True
285
Zack Williams5c2ea232019-01-30 15:23:01 -0700286 from .apiaccessor import CoreApiModelAccessor
Scott Bakerbba67b62019-01-28 17:38:21 -0800287
288 model_accessor = CoreApiModelAccessor(orm=client.xos_orm)
289
290 # If required_models is set, then check to make sure the required_models
291 # are present. If not, then the synchronizer needs to go to sleep until
292 # the models show up.
293
294 required_models = Config.get("required_models")
295 if required_models:
296 required_models = [x.strip() for x in required_models]
297
298 missing = []
299 found = []
300 for model in required_models:
301 if model_accessor.has_model_class(model):
302 found.append(model)
303 else:
304 missing.append(model)
305
306 log.info("required_models, found:", models=", ".join(found))
307 if missing:
308 log.warning("required_models: missing", models=", ".join(missing))
309 # We're missing a required model. Give up and wait for the connection
310 # to reconnect, and hope our missing model has shown up.
311 reactor.callLater(1, functools.partial(keep_trying, client, reactor))
312 return
313
314 # import all models to global space
315 import_models_to_globals()
316
317 # Synchronizer framework isn't ready to embrace reactor yet...
318 reactor.stop()
319
320 # Restore the sigint handler
321 signal.signal(signal.SIGINT, orig_sigint)
322
Scott Baker7ff8ad92019-02-15 17:02:41 -0800323 # Check to see if we still want to unload
324 if Config.get("desired_state") == "unload":
325 Timer(30, functools.partial(unload_models, client, reactor, version)).start()
326
Scott Bakerbba67b62019-01-28 17:38:21 -0800327
328def config_accessor_grpcapi():
329 global orig_sigint
330
331 log.info("Connecting to the gRPC API")
332
333 grpcapi_endpoint = Config.get("accessor.endpoint")
334 grpcapi_username = Config.get("accessor.username")
335 grpcapi_password = Config.get("accessor.password")
336
337 # if password starts with "@", then retreive the password from a file
338 if grpcapi_password.startswith("@"):
339 fn = grpcapi_password[1:]
340 if not os.path.exists(fn):
341 raise Exception("%s does not exist" % fn)
342 grpcapi_password = open(fn).readline().strip()
343
344 from xosapi.xos_grpc_client import SecureClient
345 from twisted.internet import reactor
346
347 grpcapi_client = SecureClient(
348 endpoint=grpcapi_endpoint, username=grpcapi_username, password=grpcapi_password
349 )
350 grpcapi_client.set_reconnect_callback(
351 functools.partial(grpcapi_reconnect, grpcapi_client, reactor)
352 )
353 grpcapi_client.start()
354
355 # Start reactor. This will cause the client to connect and then execute
356 # grpcapi_callback().
357
358 # Reactor will take over SIGINT during reactor.run(), but does not return it when reactor.stop() is called.
359
360 orig_sigint = signal.getsignal(signal.SIGINT)
361
362 # Start reactor. This will cause the client to connect and then execute
363 # grpcapi_callback().
364
365 reactor.run()
366
Scott Baker7ff8ad92019-02-15 17:02:41 -0800367 # Catch if we wanted to stop while inside of a reactor callback
368 if after_reactor_exit_code is not None:
369 log.info("exiting with status", code=after_reactor_exit_code)
370 sys.exit(after_reactor_exit_code)
371
Scott Bakerbba67b62019-01-28 17:38:21 -0800372
373def config_accessor_mock():
374 global model_accessor
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800375
376 # the mock model accessor always gets built to a temporary location
Zack Williams5c2ea232019-01-30 15:23:01 -0700377 if "/tmp/mock_modelaccessor" not in sys.path:
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800378 sys.path.append("/tmp/mock_modelaccessor")
379
Scott Bakerbba67b62019-01-28 17:38:21 -0800380 from mock_modelaccessor import model_accessor as mock_model_accessor
381
382 model_accessor = mock_model_accessor
383
384 # mock_model_accessor doesn't have an all_model_classes field, so make one.
385 import mock_modelaccessor as mma
386
387 all_model_classes = {}
388 for k in dir(mma):
389 v = getattr(mma, k)
390 if hasattr(v, "leaf_model_name"):
391 all_model_classes[k] = v
392
393 model_accessor.all_model_classes = all_model_classes
394
395 import_models_to_globals()
396
397
398def config_accessor():
399 accessor_kind = Config.get("accessor.kind")
400
401 if accessor_kind == "testframework":
402 config_accessor_mock()
403 elif accessor_kind == "grpcapi":
404 config_accessor_grpcapi()
405 else:
406 raise Exception("Unknown accessor kind %s" % accessor_kind)
407
408 # now import any wrappers that the synchronizer needs to add to the ORM
409 if Config.get("wrappers"):
410 for wrapper_name in Config.get("wrappers"):
411 importlib.import_module(wrapper_name)
412
413
414config_accessor()