blob: 608457975bf46d75f7a5519ed802c1ded8f736d5 [file] [log] [blame]
Scott Bakerbba67b62019-01-28 17:38:21 -08001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15
16""" ModelAccessor
17
18 A class for abstracting access to models. Used to get any djangoisms out
19 of the synchronizer code base.
20
21 This module will import all models into this module's global scope, so doing
22 a "from modelaccessor import *" from a calling module ought to import all
23 models into the calling module's scope.
24"""
25
26import functools
27import importlib
28import os
29import signal
30import sys
31import time
32from loadmodels import ModelLoadClient
33
34from xosconfig import Config
35from multistructlog import create_logger
36from xosutil.autodiscover_version import autodiscover_version_of_main
37
38log = create_logger(Config().get("logging"))
39
40orig_sigint = None
41model_accessor = None
42
43
44class ModelAccessor(object):
45 def __init__(self):
46 self.all_model_classes = self.get_all_model_classes()
47
48 def __getattr__(self, name):
49 """ Wrapper for getattr to facilitate retrieval of classes """
50 has_model_class = self.__getattribute__("has_model_class")
51 get_model_class = self.__getattribute__("get_model_class")
52 if has_model_class(name):
53 return get_model_class(name)
54
55 # Default behaviour
56 return self.__getattribute__(name)
57
58 def get_all_model_classes(self):
59 """ Build a dictionary of all model class names """
60 raise Exception("Not Implemented")
61
62 def get_model_class(self, name):
63 """ Given a class name, return that model class """
64 return self.all_model_classes[name]
65
66 def has_model_class(self, name):
67 """ Given a class name, return that model class """
68 return name in self.all_model_classes
69
70 def fetch_pending(self, main_objs, deletion=False):
71 """ Execute the default fetch_pending query """
72 raise Exception("Not Implemented")
73
74 def fetch_policies(self, main_objs, deletion=False):
75 """ Execute the default fetch_pending query """
76 raise Exception("Not Implemented")
77
78 def reset_queries(self):
79 """ Reset any state between passes of synchronizer. For django, to
80 limit memory consumption of cached queries.
81 """
82 pass
83
84 def connection_close(self):
85 """ Close any active database connection. For django, to limit memory
86 consumption.
87 """
88 pass
89
90 def check_db_connection_okay(self):
91 """ Checks to make sure the db connection is okay """
92 pass
93
94 def obj_exists(self, o):
95 """ Return True if the object exists in the data model """
96 raise Exception("Not Implemented")
97
98 def obj_in_list(self, o, olist):
99 """ Return True if o is the same as one of the objects in olist """
100 raise Exception("Not Implemented")
101
102 def now(self):
103 """ Return the current time for timestamping purposes """
104 raise Exception("Not Implemented")
105
106 def is_type(self, obj, name):
107 """ returns True is obj is of model type "name" """
108 raise Exception("Not Implemented")
109
110 def is_instance(self, obj, name):
111 """ returns True if obj is of model type "name" or is a descendant """
112 raise Exception("Not Implemented")
113
114 def get_content_type_id(self, obj):
115 raise Exception("Not Implemented")
116
117 def journal_object(self, o, operation, msg=None, timestamp=None):
118 pass
119
120 def create_obj(self, cls, **kwargs):
121 raise Exception("Not Implemented")
122
123
124def import_models_to_globals():
125 # add all models to globals
126 for (k, v) in model_accessor.all_model_classes.items():
127 globals()[k] = v
128
129 # xosbase doesn't exist from the synchronizer's perspective, so fake out
130 # ModelLink.
131 if "ModelLink" not in globals():
132
133 class ModelLink:
134 def __init__(self, dest, via, into=None):
135 self.dest = dest
136 self.via = via
137 self.into = into
138
139 globals()["ModelLink"] = ModelLink
140
141
142def keep_trying(client, reactor):
143 # Keep checking the connection to wait for it to become unavailable.
144 # Then reconnect. The strategy is to send NoOp operations, one per second, until eventually a NoOp throws an
145 # exception. This will indicate the server has reset. When that happens, we force the client to reconnect, and
146 # it will download a new API from the server.
147
148 from xosapi.xos_grpc_client import Empty
149
150 try:
151 client.utility.NoOp(Empty())
152 except Exception as e:
153 # If we caught an exception, then the API has become unavailable.
154 # So reconnect.
155
156 log.exception("exception in NoOp", e=e)
157 log.info("restarting synchronizer")
158
159 os.execv(sys.executable, ["python"] + sys.argv)
160 return
161
162 reactor.callLater(1, functools.partial(keep_trying, client, reactor))
163
164
165def grpcapi_reconnect(client, reactor):
166 global model_accessor
167
168 # Make sure to try to load models before trying to initialize the ORM. It might be the ORM is broken because it
169 # is waiting on our models.
170
171 if Config.get("models_dir"):
172 version = autodiscover_version_of_main(max_parent_depth=0) or "unknown"
173 log.info("Service version is %s" % version)
174 try:
175 ModelLoadClient(client).upload_models(
176 Config.get("name"), Config.get("models_dir"), version=version
177 )
178 except Exception as e: # TODO: narrow exception scope
179 if (
180 hasattr(e, "code")
181 and callable(e.code)
182 and hasattr(e.code(), "name")
183 and (e.code().name) == "UNAVAILABLE"
184 ):
185 # We need to make sure we force a reconnection, as it's possible that we will end up downloading a
186 # new xos API.
187 log.info("grpc unavailable during loadmodels. Force a reconnect")
188 client.connected = False
189 client.connect()
190 return
191 log.exception("failed to onboard models")
192 # If it's some other error, then we don't need to force a reconnect. Just try the LoadModels() again.
193 reactor.callLater(10, functools.partial(grpcapi_reconnect, client, reactor))
194 return
195
196 # If the ORM is broken, then wait for the orm to become available.
197
198 if not getattr(client, "xos_orm", None):
199 log.warning("No xos_orm. Will keep trying...")
200 reactor.callLater(1, functools.partial(keep_trying, client, reactor))
201 return
202
203 # this will prevent updated timestamps from being automatically updated
204 client.xos_orm.caller_kind = "synchronizer"
205
206 client.xos_orm.restart_on_disconnect = True
207
208 from apiaccessor import CoreApiModelAccessor
209
210 model_accessor = CoreApiModelAccessor(orm=client.xos_orm)
211
212 # If required_models is set, then check to make sure the required_models
213 # are present. If not, then the synchronizer needs to go to sleep until
214 # the models show up.
215
216 required_models = Config.get("required_models")
217 if required_models:
218 required_models = [x.strip() for x in required_models]
219
220 missing = []
221 found = []
222 for model in required_models:
223 if model_accessor.has_model_class(model):
224 found.append(model)
225 else:
226 missing.append(model)
227
228 log.info("required_models, found:", models=", ".join(found))
229 if missing:
230 log.warning("required_models: missing", models=", ".join(missing))
231 # We're missing a required model. Give up and wait for the connection
232 # to reconnect, and hope our missing model has shown up.
233 reactor.callLater(1, functools.partial(keep_trying, client, reactor))
234 return
235
236 # import all models to global space
237 import_models_to_globals()
238
239 # Synchronizer framework isn't ready to embrace reactor yet...
240 reactor.stop()
241
242 # Restore the sigint handler
243 signal.signal(signal.SIGINT, orig_sigint)
244
245
246def config_accessor_grpcapi():
247 global orig_sigint
248
249 log.info("Connecting to the gRPC API")
250
251 grpcapi_endpoint = Config.get("accessor.endpoint")
252 grpcapi_username = Config.get("accessor.username")
253 grpcapi_password = Config.get("accessor.password")
254
255 # if password starts with "@", then retreive the password from a file
256 if grpcapi_password.startswith("@"):
257 fn = grpcapi_password[1:]
258 if not os.path.exists(fn):
259 raise Exception("%s does not exist" % fn)
260 grpcapi_password = open(fn).readline().strip()
261
262 from xosapi.xos_grpc_client import SecureClient
263 from twisted.internet import reactor
264
265 grpcapi_client = SecureClient(
266 endpoint=grpcapi_endpoint, username=grpcapi_username, password=grpcapi_password
267 )
268 grpcapi_client.set_reconnect_callback(
269 functools.partial(grpcapi_reconnect, grpcapi_client, reactor)
270 )
271 grpcapi_client.start()
272
273 # Start reactor. This will cause the client to connect and then execute
274 # grpcapi_callback().
275
276 # Reactor will take over SIGINT during reactor.run(), but does not return it when reactor.stop() is called.
277
278 orig_sigint = signal.getsignal(signal.SIGINT)
279
280 # Start reactor. This will cause the client to connect and then execute
281 # grpcapi_callback().
282
283 reactor.run()
284
285
286def config_accessor_mock():
287 global model_accessor
288 from mock_modelaccessor import model_accessor as mock_model_accessor
289
290 model_accessor = mock_model_accessor
291
292 # mock_model_accessor doesn't have an all_model_classes field, so make one.
293 import mock_modelaccessor as mma
294
295 all_model_classes = {}
296 for k in dir(mma):
297 v = getattr(mma, k)
298 if hasattr(v, "leaf_model_name"):
299 all_model_classes[k] = v
300
301 model_accessor.all_model_classes = all_model_classes
302
303 import_models_to_globals()
304
305
306def config_accessor():
307 accessor_kind = Config.get("accessor.kind")
308
309 if accessor_kind == "testframework":
310 config_accessor_mock()
311 elif accessor_kind == "grpcapi":
312 config_accessor_grpcapi()
313 else:
314 raise Exception("Unknown accessor kind %s" % accessor_kind)
315
316 # now import any wrappers that the synchronizer needs to add to the ORM
317 if Config.get("wrappers"):
318 for wrapper_name in Config.get("wrappers"):
319 importlib.import_module(wrapper_name)
320
321
322config_accessor()