blob: 72fc3c53e26ffef2c89b04daa939061551f55631 [file] [log] [blame]
Scott Bakerbba67b62019-01-28 17:38:21 -08001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15
16""" ModelAccessor
17
18 A class for abstracting access to models. Used to get any djangoisms out
19 of the synchronizer code base.
20
21 This module will import all models into this module's global scope, so doing
22 a "from modelaccessor import *" from a calling module ought to import all
23 models into the calling module's scope.
24"""
25
26import functools
27import importlib
28import os
29import signal
30import sys
Scott Baker7ff8ad92019-02-15 17:02:41 -080031from threading import Timer
Scott Bakerbba67b62019-01-28 17:38:21 -080032from loadmodels import ModelLoadClient
33
34from xosconfig import Config
35from multistructlog import create_logger
36from xosutil.autodiscover_version import autodiscover_version_of_main
37
38log = create_logger(Config().get("logging"))
39
Scott Baker7ff8ad92019-02-15 17:02:41 -080040after_reactor_exit_code = None
Scott Bakerbba67b62019-01-28 17:38:21 -080041orig_sigint = None
42model_accessor = None
43
Scott Bakerbba67b62019-01-28 17:38:21 -080044class ModelAccessor(object):
45 def __init__(self):
46 self.all_model_classes = self.get_all_model_classes()
47
48 def __getattr__(self, name):
49 """ Wrapper for getattr to facilitate retrieval of classes """
50 has_model_class = self.__getattribute__("has_model_class")
51 get_model_class = self.__getattribute__("get_model_class")
52 if has_model_class(name):
53 return get_model_class(name)
54
55 # Default behaviour
56 return self.__getattribute__(name)
57
58 def get_all_model_classes(self):
59 """ Build a dictionary of all model class names """
60 raise Exception("Not Implemented")
61
62 def get_model_class(self, name):
63 """ Given a class name, return that model class """
64 return self.all_model_classes[name]
65
66 def has_model_class(self, name):
67 """ Given a class name, return that model class """
68 return name in self.all_model_classes
69
70 def fetch_pending(self, main_objs, deletion=False):
71 """ Execute the default fetch_pending query """
72 raise Exception("Not Implemented")
73
74 def fetch_policies(self, main_objs, deletion=False):
75 """ Execute the default fetch_pending query """
76 raise Exception("Not Implemented")
77
78 def reset_queries(self):
79 """ Reset any state between passes of synchronizer. For django, to
80 limit memory consumption of cached queries.
81 """
82 pass
83
84 def connection_close(self):
85 """ Close any active database connection. For django, to limit memory
86 consumption.
87 """
88 pass
89
90 def check_db_connection_okay(self):
91 """ Checks to make sure the db connection is okay """
92 pass
93
94 def obj_exists(self, o):
95 """ Return True if the object exists in the data model """
96 raise Exception("Not Implemented")
97
98 def obj_in_list(self, o, olist):
99 """ Return True if o is the same as one of the objects in olist """
100 raise Exception("Not Implemented")
101
102 def now(self):
103 """ Return the current time for timestamping purposes """
104 raise Exception("Not Implemented")
105
106 def is_type(self, obj, name):
107 """ returns True is obj is of model type "name" """
108 raise Exception("Not Implemented")
109
110 def is_instance(self, obj, name):
111 """ returns True if obj is of model type "name" or is a descendant """
112 raise Exception("Not Implemented")
113
114 def get_content_type_id(self, obj):
115 raise Exception("Not Implemented")
116
117 def journal_object(self, o, operation, msg=None, timestamp=None):
118 pass
119
120 def create_obj(self, cls, **kwargs):
121 raise Exception("Not Implemented")
122
123
124def import_models_to_globals():
125 # add all models to globals
126 for (k, v) in model_accessor.all_model_classes.items():
127 globals()[k] = v
128
129 # xosbase doesn't exist from the synchronizer's perspective, so fake out
130 # ModelLink.
131 if "ModelLink" not in globals():
132
133 class ModelLink:
134 def __init__(self, dest, via, into=None):
135 self.dest = dest
136 self.via = via
137 self.into = into
138
139 globals()["ModelLink"] = ModelLink
140
141
142def keep_trying(client, reactor):
143 # Keep checking the connection to wait for it to become unavailable.
144 # Then reconnect. The strategy is to send NoOp operations, one per second, until eventually a NoOp throws an
145 # exception. This will indicate the server has reset. When that happens, we force the client to reconnect, and
146 # it will download a new API from the server.
147
148 from xosapi.xos_grpc_client import Empty
149
150 try:
151 client.utility.NoOp(Empty())
152 except Exception as e:
153 # If we caught an exception, then the API has become unavailable.
154 # So reconnect.
155
156 log.exception("exception in NoOp", e=e)
157 log.info("restarting synchronizer")
158
159 os.execv(sys.executable, ["python"] + sys.argv)
160 return
161
162 reactor.callLater(1, functools.partial(keep_trying, client, reactor))
163
Scott Baker7ff8ad92019-02-15 17:02:41 -0800164def unload_models(client, reactor, version):
165 # This function is called by a timer until it succeeds.
166 log.info("unload_models initiated by timer")
167
168 try:
169 result = ModelLoadClient(client).unload_models(
170 Config.get("name"),
171 version=version,
172 cleanup_behavior=ModelLoadClient.AUTOMATICALLY_CLEAN)
173
174 log.debug("Unload response", result=result)
175
176 if result.status in [result.SUCCESS, result.SUCCESS_NOTHING_CHANGED]:
177 log.info("Models successfully unloaded. Exiting with status", code=0)
178 sys.exit(0)
179
180 if result.status == result.TRYAGAIN:
181 log.info("TRYAGAIN received. Expect to try again in 30 seconds.")
182
183 except Exception as e:
184 # If the synchronizer is operational, then assume the ORM's restart_on_disconnect will deal with the
185 # connection being lost.
186 log.exception("Error while unloading. Expect to try again in 30 seconds.")
187
188 Timer(30, functools.partial(unload_models, client, reactor, version)).start()
189
190def exit_while_inside_reactor(reactor, code):
191 """ Calling sys.exit() while inside reactor ends up trapped by reactor.
192
193 So what we'll do is set a flag indicating we want to exit, then stop reactor, then return
194 """
195 global after_reactor_exit_code
196
197 reactor.stop()
198 signal.signal(signal.SIGINT, orig_sigint)
199 after_reactor_exit_code = code
200
Scott Bakerbba67b62019-01-28 17:38:21 -0800201
202def grpcapi_reconnect(client, reactor):
203 global model_accessor
204
205 # Make sure to try to load models before trying to initialize the ORM. It might be the ORM is broken because it
206 # is waiting on our models.
207
208 if Config.get("models_dir"):
209 version = autodiscover_version_of_main(max_parent_depth=0) or "unknown"
Matteo Scandolo12651d72019-02-21 15:15:29 -0800210 log.info("Service version is %s" % version, core_version=Config.get("core_version"))
Scott Bakerbba67b62019-01-28 17:38:21 -0800211 try:
Scott Baker7ff8ad92019-02-15 17:02:41 -0800212 if Config.get("desired_state") == "load":
213 ModelLoadClient(client).upload_models(
214 Config.get("name"), Config.get("models_dir"), version=version
215 )
216 elif Config.get("desired_state") == "unload":
217 # Try for an easy unload. If there's no dirty models, then unload will succeed without
218 # requiring us to setup the synchronizer.
219 log.info("Trying for an easy unload_models")
220 result = ModelLoadClient(client).unload_models(
221 Config.get("name"),
222 version=version,
223 cleanup_behavior=1) # FIXME: hardcoded value for automatic delete
224 if result.status in [result.SUCCESS, result.SUCCESS_NOTHING_CHANGED]:
225 log.info("Models successfully unloaded. Synchronizer exiting")
226 exit_while_inside_reactor(reactor, 0)
227 return
228
229 # We couldn't unload the easy way, so we'll have to do it the hard way. Fall through and
230 # setup the synchronizer.
231 else:
232 log.error("Misconfigured", desired_state=Config.get("desired_state"))
233 exit_while_inside_reactor(reactor, -1)
234 return
Scott Bakerbba67b62019-01-28 17:38:21 -0800235 except Exception as e: # TODO: narrow exception scope
236 if (
237 hasattr(e, "code")
238 and callable(e.code)
239 and hasattr(e.code(), "name")
240 and (e.code().name) == "UNAVAILABLE"
241 ):
242 # We need to make sure we force a reconnection, as it's possible that we will end up downloading a
243 # new xos API.
244 log.info("grpc unavailable during loadmodels. Force a reconnect")
245 client.connected = False
246 client.connect()
247 return
Matteo Scandolo12651d72019-02-21 15:15:29 -0800248
249 elif (
250 hasattr(e, "code")
251 and callable(e.code)
252 and hasattr(e.code(), "name")
253 and (e.code().name) == "INVALID_ARGUMENT"
254 ):
255 # in this case there is a version mismatch between the service and the core,
256 # shut down the process so it's clear something is wrong
257 log.error(e.details())
258
259 # kill the process so the operator is aware something is wrong
260 log.info("shutting down")
261 exit_while_inside_reactor(reactor, 1)
262 return
263
264
Scott Bakerbba67b62019-01-28 17:38:21 -0800265 log.exception("failed to onboard models")
266 # If it's some other error, then we don't need to force a reconnect. Just try the LoadModels() again.
267 reactor.callLater(10, functools.partial(grpcapi_reconnect, client, reactor))
268 return
269
270 # If the ORM is broken, then wait for the orm to become available.
271
272 if not getattr(client, "xos_orm", None):
273 log.warning("No xos_orm. Will keep trying...")
274 reactor.callLater(1, functools.partial(keep_trying, client, reactor))
275 return
276
277 # this will prevent updated timestamps from being automatically updated
278 client.xos_orm.caller_kind = "synchronizer"
279
280 client.xos_orm.restart_on_disconnect = True
281
282 from apiaccessor import CoreApiModelAccessor
283
284 model_accessor = CoreApiModelAccessor(orm=client.xos_orm)
285
286 # If required_models is set, then check to make sure the required_models
287 # are present. If not, then the synchronizer needs to go to sleep until
288 # the models show up.
289
290 required_models = Config.get("required_models")
291 if required_models:
292 required_models = [x.strip() for x in required_models]
293
294 missing = []
295 found = []
296 for model in required_models:
297 if model_accessor.has_model_class(model):
298 found.append(model)
299 else:
300 missing.append(model)
301
302 log.info("required_models, found:", models=", ".join(found))
303 if missing:
304 log.warning("required_models: missing", models=", ".join(missing))
305 # We're missing a required model. Give up and wait for the connection
306 # to reconnect, and hope our missing model has shown up.
307 reactor.callLater(1, functools.partial(keep_trying, client, reactor))
308 return
309
310 # import all models to global space
311 import_models_to_globals()
312
313 # Synchronizer framework isn't ready to embrace reactor yet...
314 reactor.stop()
315
316 # Restore the sigint handler
317 signal.signal(signal.SIGINT, orig_sigint)
318
Scott Baker7ff8ad92019-02-15 17:02:41 -0800319 # Check to see if we still want to unload
320 if Config.get("desired_state") == "unload":
321 Timer(30, functools.partial(unload_models, client, reactor, version)).start()
322
Scott Bakerbba67b62019-01-28 17:38:21 -0800323
324def config_accessor_grpcapi():
325 global orig_sigint
326
327 log.info("Connecting to the gRPC API")
328
329 grpcapi_endpoint = Config.get("accessor.endpoint")
330 grpcapi_username = Config.get("accessor.username")
331 grpcapi_password = Config.get("accessor.password")
332
333 # if password starts with "@", then retreive the password from a file
334 if grpcapi_password.startswith("@"):
335 fn = grpcapi_password[1:]
336 if not os.path.exists(fn):
337 raise Exception("%s does not exist" % fn)
338 grpcapi_password = open(fn).readline().strip()
339
340 from xosapi.xos_grpc_client import SecureClient
341 from twisted.internet import reactor
342
343 grpcapi_client = SecureClient(
344 endpoint=grpcapi_endpoint, username=grpcapi_username, password=grpcapi_password
345 )
346 grpcapi_client.set_reconnect_callback(
347 functools.partial(grpcapi_reconnect, grpcapi_client, reactor)
348 )
349 grpcapi_client.start()
350
351 # Start reactor. This will cause the client to connect and then execute
352 # grpcapi_callback().
353
354 # Reactor will take over SIGINT during reactor.run(), but does not return it when reactor.stop() is called.
355
356 orig_sigint = signal.getsignal(signal.SIGINT)
357
358 # Start reactor. This will cause the client to connect and then execute
359 # grpcapi_callback().
360
361 reactor.run()
362
Scott Baker7ff8ad92019-02-15 17:02:41 -0800363 # Catch if we wanted to stop while inside of a reactor callback
364 if after_reactor_exit_code is not None:
365 log.info("exiting with status", code=after_reactor_exit_code)
366 sys.exit(after_reactor_exit_code)
367
Scott Bakerbba67b62019-01-28 17:38:21 -0800368
369def config_accessor_mock():
370 global model_accessor
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800371
372 # the mock model accessor always gets built to a temporary location
373 if not "/tmp/mock_modelaccessor" in sys.path:
374 sys.path.append("/tmp/mock_modelaccessor")
375
Scott Bakerbba67b62019-01-28 17:38:21 -0800376 from mock_modelaccessor import model_accessor as mock_model_accessor
377
378 model_accessor = mock_model_accessor
379
380 # mock_model_accessor doesn't have an all_model_classes field, so make one.
381 import mock_modelaccessor as mma
382
383 all_model_classes = {}
384 for k in dir(mma):
385 v = getattr(mma, k)
386 if hasattr(v, "leaf_model_name"):
387 all_model_classes[k] = v
388
389 model_accessor.all_model_classes = all_model_classes
390
391 import_models_to_globals()
392
393
394def config_accessor():
395 accessor_kind = Config.get("accessor.kind")
396
397 if accessor_kind == "testframework":
398 config_accessor_mock()
399 elif accessor_kind == "grpcapi":
400 config_accessor_grpcapi()
401 else:
402 raise Exception("Unknown accessor kind %s" % accessor_kind)
403
404 # now import any wrappers that the synchronizer needs to add to the ORM
405 if Config.get("wrappers"):
406 for wrapper_name in Config.get("wrappers"):
407 importlib.import_module(wrapper_name)
408
409
410config_accessor()