| # Copyright 2017-present Open Networking Foundation |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """ Reaper |
| |
| The reaper implements permanent deletion of soft-deleted objects. |
| |
| It does this by polling for soft-deleted objects. For each object, the |
| reaper checks to see if its cascade set is empty. If so, the object will |
| be purged. If it is non-empty, then the reaper will skip the object under |
| the assumption that it will eventually become empty. |
| """ |
| |
| import os |
| import sys |
| import traceback |
| import threading |
| import time |
| |
| import django |
| from django.db import reset_queries, router |
| |
| if __name__ == "__main__": |
| sys.path.append("/opt/xos") |
| os.environ.setdefault("DJANGO_SETTINGS_MODULE", "xos.settings") |
| |
| from decorators import check_db_connection |
| from xosconfig import Config |
| from multistructlog import create_logger |
| log = create_logger(Config().get("logging")) |
| |
| from core.models.xosbase import XOSCollector |
| |
| |
| class ReaperThread(threading.Thread): |
| daemon = True |
| interval = 5 |
| |
| def __init__(self, *args, **kwargs): |
| self.terminate_signal = False |
| super(ReaperThread, self).__init__(*args, **kwargs) |
| |
| def journal_object(self, o, operation, msg=None, timestamp=None): |
| # not implemented at this time |
| pass |
| |
| def get_cascade_set(self, m): |
| """ Get the set of objects that would cascade if this object was |
| deleted. |
| """ |
| collector = XOSCollector(using=router.db_for_write(m.__class__, instance=m)) |
| collector.collect([m]) |
| deps = [] |
| for (k, models) in collector.data.items(): |
| for model in models: |
| if model == m: |
| # collector will return ourself; ignore it. |
| continue |
| if issubclass(m.__class__, model.__class__): |
| # collector will return our parent classes; ignore them. |
| continue |
| # We don't actually need this check, as with multiple passes the reaper can |
| # clean up a hierarchy of objects. |
| # if getattr(model, "backend_need_reap", False): |
| # # model is already marked for reaping; ignore it. |
| # continue |
| deps.append(model) |
| return deps |
| |
| @check_db_connection |
| def run_reaper_once(self): |
| # logger.debug("REAPER: run_reaper_once()") |
| |
| # Reap non-sync'd models here |
| # models_to_reap = [Slice,Network,NetworkSlice] |
| |
| models_to_reap = django.apps.apps.get_models(include_auto_created=False) |
| for m in models_to_reap: |
| if not hasattr(m, "deleted_objects"): |
| continue |
| |
| dobjs = m.deleted_objects.all() |
| for d in dobjs: |
| if hasattr(d, "_meta") and hasattr(d._meta, "proxy") and d._meta.proxy: |
| # skip proxy objects; we'll get the base instead |
| continue |
| |
| if (not getattr(d, "backend_need_reap", False)) and getattr( |
| d, "backend_need_delete", False |
| ): |
| self.journal_object(d, "reaper.need_delete") |
| log.info("skipping because it has need_delete set", object=d) |
| continue |
| |
| if (not getattr(d, "backend_need_reap", False)) and getattr( |
| d, "backend_need_delete_policy", False |
| ): |
| log.info("skipping because it has need_delete_policy set", object=d) |
| continue |
| |
| if hasattr(d, "leaf_model"): |
| d = d.leaf_model |
| |
| cascade_set = self.get_cascade_set(d) |
| if cascade_set: |
| self.journal_object( |
| d, |
| "reaper.cascade_set", |
| msg=",".join([str(m) for m in cascade_set]), |
| ) |
| log.info( |
| "REAPER: cannot purge object because its cascade_set is nonempty", |
| object=d, |
| cascade_set=",".join([str(m) for m in cascade_set]), |
| ) |
| continue |
| |
| self.journal_object(d, "reaper.purge") |
| log.info("REAPER: purging object", object=d) |
| try: |
| d.delete(purge=True) |
| except BaseException: |
| self.journal_object(d, "reaper.purge.exception") |
| log.error("REAPER: exception purging object", object=d) |
| traceback.print_exc() |
| |
| try: |
| reset_queries() |
| except BaseException: |
| # this shouldn't happen, but in case it does, catch it... |
| log.exception("REAPER: exception in reset_queries") |
| |
| # logger.debug("REAPER: finished run_reaper_once()") |
| |
| def run(self): |
| while not self.terminate_signal: |
| start = time.time() |
| try: |
| self.run_reaper_once() |
| except BaseException: |
| log.exception("REAPER: Exception in run loop") |
| |
| telap = time.time() - start |
| if telap < self.interval: |
| time.sleep(self.interval - telap) |
| |
| def stop(self): |
| self.terminate_signal = True |
| |
| |
| if __name__ == "__main__": |
| django.setup() |
| |
| reaper = ReaperThread() |
| reaper.start() |
| |
| _ONE_DAY_IN_SECONDS = 60 * 60 * 24 |
| try: |
| while True: |
| time.sleep(_ONE_DAY_IN_SECONDS) |
| except KeyboardInterrupt: |
| reaper.stop() |