SEBA-570 add backup/restore feature
Change-Id: I25987d3394e9416d9a67cc495bf8f8907ffe11a7
diff --git a/Dockerfile.core b/Dockerfile.core
index f94066a..2e5dcf5 100644
--- a/Dockerfile.core
+++ b/Dockerfile.core
@@ -17,7 +17,7 @@
# Install libraries and python requirements
COPY requirements.txt /tmp/requirements.txt
-RUN apk add --no-cache bash postgresql-dev \
+RUN apk add --no-cache bash postgresql-dev postgresql-client \
&& pip install -r /tmp/requirements.txt \
&& pip freeze > /var/xos/pip_freeze_xos-core_`date -u +%Y%m%dT%H%M%S` \
&& mkdir -p /opt/xos
diff --git a/VERSION b/VERSION
index b347b11..351227f 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.2.3
+3.2.4
diff --git a/tox.ini b/tox.ini
index 7152507..a8ce26b 100644
--- a/tox.ini
+++ b/tox.ini
@@ -28,6 +28,7 @@
multistructlog
nose2
flake8
+ pyfakefs
commands =
nose2 -c tox.ini --verbose --junit-xml
diff --git a/xos/core/migrations/0011_auto_20190430_1254.py b/xos/core/migrations/0011_auto_20190430_1254.py
new file mode 100644
index 0000000..e7b71c6
--- /dev/null
+++ b/xos/core/migrations/0011_auto_20190430_1254.py
@@ -0,0 +1,125 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# -*- coding: utf-8 -*-
+# Generated by Django 1.11.20 on 2019-04-30 16:54
+from __future__ import unicode_literals
+
+import core.models.xosbase_header
+from django.db import migrations, models
+import django.db.models.deletion
+import django.utils.timezone
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ('core', '0010_auto_20190408_2122'),
+ ]
+
+ operations = [
+ migrations.CreateModel(
+ name='BackupFile_decl',
+ fields=[
+ ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
+ ('created', models.DateTimeField(auto_now_add=True, help_text=b'Time this model was created')),
+ ('updated', models.DateTimeField(default=django.utils.timezone.now, help_text=b'Time this model was changed by a non-synchronizer')),
+ ('enacted', models.DateTimeField(blank=True, default=None, help_text=b'When synced, set to the timestamp of the data that was synced', null=True)),
+ ('policed', models.DateTimeField(blank=True, default=None, help_text=b'When policed, set to the timestamp of the data that was policed', null=True)),
+ ('backend_register', models.CharField(blank=True, default=b'{}', max_length=1024, null=True)),
+ ('backend_need_delete', models.BooleanField(default=False)),
+ ('backend_need_reap', models.BooleanField(default=False)),
+ ('backend_status', models.CharField(default=b'Provisioning in progress', max_length=1024)),
+ ('backend_code', models.IntegerField(default=0)),
+ ('deleted', models.BooleanField(default=False)),
+ ('write_protect', models.BooleanField(default=False)),
+ ('lazy_blocked', models.BooleanField(default=False)),
+ ('no_sync', models.BooleanField(default=False)),
+ ('no_policy', models.BooleanField(default=False)),
+ ('policy_status', models.CharField(blank=True, default=b'Policy in process', max_length=1024, null=True)),
+ ('policy_code', models.IntegerField(blank=True, default=0, null=True)),
+ ('leaf_model_name', models.CharField(help_text=b'The most specialized model in this chain of inheritance, often defined by a service developer', max_length=1024)),
+ ('backend_need_delete_policy', models.BooleanField(default=False, help_text=b'True if delete model_policy must be run before object can be reaped')),
+ ('xos_managed', models.BooleanField(default=True, help_text=b'True if xos is responsible for creating/deleting this object')),
+ ('backend_handle', models.CharField(blank=True, help_text=b'Handle used by the backend to track this object', max_length=1024, null=True)),
+ ('changed_by_step', models.DateTimeField(blank=True, default=None, help_text=b'Time this model was changed by a sync step', null=True)),
+ ('changed_by_policy', models.DateTimeField(blank=True, default=None, help_text=b'Time this model was changed by a model policy', null=True)),
+ ('name', models.CharField(help_text=b'human-readable name of this backup file', max_length=256)),
+ ('uri', models.CharField(help_text=b'location of the backup file', max_length=1024)),
+ ('checksum', models.CharField(blank=True, help_text=b'checksum of backup file, formatted as algorithm:hash', max_length=1024, null=True)),
+ ('status', models.CharField(blank=True, choices=[(b'retrieved', b'retrieved'), (b'sent', b'sent'), (b'inprogress', b'inprogress')], help_text=b'status of file transfer', max_length=32, null=True)),
+ ('backend_filename', models.CharField(blank=True, help_text=b'for internal use, local filename', max_length=1024, null=True)),
+ ],
+ bases=(models.Model, core.models.xosbase_header.PlModelMixIn),
+ ),
+ migrations.CreateModel(
+ name='BackupOperation_decl',
+ fields=[
+ ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
+ ('created', models.DateTimeField(auto_now_add=True, help_text=b'Time this model was created')),
+ ('updated', models.DateTimeField(default=django.utils.timezone.now, help_text=b'Time this model was changed by a non-synchronizer')),
+ ('enacted', models.DateTimeField(blank=True, default=None, help_text=b'When synced, set to the timestamp of the data that was synced', null=True)),
+ ('policed', models.DateTimeField(blank=True, default=None, help_text=b'When policed, set to the timestamp of the data that was policed', null=True)),
+ ('backend_register', models.CharField(blank=True, default=b'{}', max_length=1024, null=True)),
+ ('backend_need_delete', models.BooleanField(default=False)),
+ ('backend_need_reap', models.BooleanField(default=False)),
+ ('backend_status', models.CharField(default=b'Provisioning in progress', max_length=1024)),
+ ('backend_code', models.IntegerField(default=0)),
+ ('deleted', models.BooleanField(default=False)),
+ ('write_protect', models.BooleanField(default=False)),
+ ('lazy_blocked', models.BooleanField(default=False)),
+ ('no_sync', models.BooleanField(default=False)),
+ ('no_policy', models.BooleanField(default=False)),
+ ('policy_status', models.CharField(blank=True, default=b'Policy in process', max_length=1024, null=True)),
+ ('policy_code', models.IntegerField(blank=True, default=0, null=True)),
+ ('leaf_model_name', models.CharField(help_text=b'The most specialized model in this chain of inheritance, often defined by a service developer', max_length=1024)),
+ ('backend_need_delete_policy', models.BooleanField(default=False, help_text=b'True if delete model_policy must be run before object can be reaped')),
+ ('xos_managed', models.BooleanField(default=True, help_text=b'True if xos is responsible for creating/deleting this object')),
+ ('backend_handle', models.CharField(blank=True, help_text=b'Handle used by the backend to track this object', max_length=1024, null=True)),
+ ('changed_by_step', models.DateTimeField(blank=True, default=None, help_text=b'Time this model was changed by a sync step', null=True)),
+ ('changed_by_policy', models.DateTimeField(blank=True, default=None, help_text=b'Time this model was changed by a model policy', null=True)),
+ ('component', models.CharField(choices=[(b'xos', b'XOS')], default=b'xos', help_text=b'component that this operation applies to', max_length=32)),
+ ('operation', models.CharField(choices=[(b'create', b'create'), (b'restore', b'restore'), (b'verify', b'verify')], help_text=b'operation to perform', max_length=32)),
+ ('status', models.CharField(blank=True, choices=[(b'created', b'created'), (b'restored', b'restored'), (b'failed', b'failed'), (b'inprogress', b'in progress'), (b'orphaned', b'orphaned')], help_text=b'status of operation', max_length=32, null=True)),
+ ('error_msg', models.CharField(blank=True, help_text=b'error message from backup processor, if status is failure', max_length=4096, null=True)),
+ ('effective_date', models.DateTimeField(blank=True, help_text=b'the time and date the operation was completed', null=True)),
+ ],
+ bases=(models.Model, core.models.xosbase_header.PlModelMixIn),
+ ),
+ migrations.CreateModel(
+ name='BackupFile',
+ fields=[
+ ],
+ options={
+ 'proxy': True,
+ 'indexes': [],
+ },
+ bases=('core.backupfile_decl',),
+ ),
+ migrations.CreateModel(
+ name='BackupOperation',
+ fields=[
+ ],
+ options={
+ 'proxy': True,
+ 'indexes': [],
+ },
+ bases=('core.backupoperation_decl',),
+ ),
+ migrations.AddField(
+ model_name='backupoperation_decl',
+ name='file',
+ field=models.ForeignKey(blank=True, help_text=b'File to backup to or restore from', null=True, on_delete=django.db.models.deletion.CASCADE, related_name='operations', to='core.BackupFile'),
+ ),
+ ]
diff --git a/xos/core/models/core.xproto b/xos/core/models/core.xproto
index 6e11557..f7e5fc5 100644
--- a/xos/core/models/core.xproto
+++ b/xos/core/models/core.xproto
@@ -120,6 +120,66 @@
optional manytoone service->Service:addresspools = 7:1001 [db_index = True, blank = True, help_text="Service this AddressPool belongs to"];
}
+message BackupFile (XOSBase) {
+ required string name = 1 [
+ help_text = "human-readable name of this backup file",
+ max_length = 256];
+ required string uri = 2 [
+ help_text = "location of the backup file",
+ max_length = 1024];
+ optional string checksum = 3 [
+ help_text = "checksum of backup file, formatted as algorithm:hash",
+ max_length = 1024];
+ // status:
+ // retrieved - file has been retrieved from URI
+ // sent - file has been sent to URI
+ // inprogress - file transfer is in progress
+ optional string status = 4 [
+ help_text = "status of file transfer",
+ choices = "(('retrieved', 'retrieved'), ('sent', 'sent'), ('inprogress', 'inprogress'))",
+ feedback_state = True,
+ max_length = 32];
+ optional string backend_filename = 5 [
+ help_text = "for internal use, local filename",
+ feedback_state = True,
+ max_length = 1024];
+}
+
+message BackupOperation (XOSBase) {
+ // `file` is required for restores.
+ // `file` is optional for backups. If file is unspecified then XOS will create a backup file using
+ // a default mechanism.
+ optional manytoone file->BackupFile:operations = 1:1001 [
+ help_text = "File to backup to or restore from"];
+ required string component = 2 [
+ help_text = "component that this operation applies to",
+ // XOS is currently the only allowed component
+ choices = "(('xos', 'XOS'), )",
+ default = "xos",
+ max_length = 32];
+ required string operation = 3 [
+ help_text = "operation to perform",
+ choices = "(('create', 'create'), ('restore', 'restore'), ('verify', 'verify'))",
+ max_length = 32];
+ optional string status = 4 [
+ help_text = "status of operation",
+ choices = "(('created', 'created'), ('restored', 'restored'), ('failed', 'failed'), ('inprogress', 'in progress'), ('orphaned', 'orphaned'))",
+ feedback_state = True,
+ max_length = 32];
+ optional string error_msg = 5 [
+ help_text = "error message from backup processor, if status is failure",
+ feedback_state = True,
+ max_length = 4096];
+ // `effective_date` may be different from `XOSBase.enacted` if a synchronizer is performing
+ // an operation on an external component. `XOSBase.enacted` is always set to the time the
+ // model is saved, which could differ from the time the backup or restore completed by
+ // a short time.
+ optional string effective_date = 6 [
+ help_text = "the time and date the operation was completed",
+ content_type = "date",
+ feedback_state = True];
+}
+
message ComputeServiceInstance (ServiceInstance) {
required manytoone slice->Slice:computeserviceinstances = 1:1001 [db_index = True, blank = False, help_text = "Slice that controls this ServiceInstance"];
required manytoone image->Image:computeserviceinstances = 2:1001 [db_index = True, blank = False, help_text = "Image used to instantiate this ServiceInstance"];
diff --git a/xos/coreapi/Makefile b/xos/coreapi/Makefile
index bf99f99..fa457ee 100644
--- a/xos/coreapi/Makefile
+++ b/xos/coreapi/Makefile
@@ -31,7 +31,7 @@
start:
bash -c "source env.sh && python ./core_main.py $(START_OPTIONS)"
-prep: unload_unwanted app_lists rebuild_protos compile_protos try_models wait_for_db makemigrations migrate
+prep: unload_unwanted app_lists rebuild_protos compile_protos try_models wait_for_db makemigrations migrate backuprestore
unload_unwanted:
python unload_unwanted_apps.py
@@ -45,6 +45,9 @@
migrate:
python /opt/xos/manage.py migrate
+backuprestore:
+ python backupprocessor.py
+
rebuild_protos:
cd protos && make rebuild-protos
diff --git a/xos/coreapi/authhelper.py b/xos/coreapi/authhelper.py
index 8ffc966..0711d4c 100644
--- a/xos/coreapi/authhelper.py
+++ b/xos/coreapi/authhelper.py
@@ -106,7 +106,6 @@
if not id:
raise XOSPermissionDenied("failed to authenticate token %s" % v)
user = User.objects.get(id=id)
- log.info("authenticated sessionid %s as %s" % (v, user))
return user
if required:
diff --git a/xos/coreapi/backuphandler.py b/xos/coreapi/backuphandler.py
new file mode 100644
index 0000000..f57127c
--- /dev/null
+++ b/xos/coreapi/backuphandler.py
@@ -0,0 +1,59 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+""" BackupHandler
+
+ This file contains code to interface with various backends (django, postgres, etc) for creating
+ and restoring backups. Backend-specific code is isolated here to make it easier to port and easier
+ to use.
+"""
+
+import os
+
+from xosconfig import Config
+from multistructlog import create_logger
+
+# TODO(smbaker): Write a django BackupHandler that uses dumpdata and loaddata
+
+
+class BackupHandler_postgres(object):
+ """ This backuphandler uses postgres pg_dump and psql """
+ def __init__(self):
+ self.db_name = Config().get("database.name")
+ self.db_username = Config().get("database.username")
+ self.db_password = Config().get("database.password")
+ self.db_host = "xos-db"
+ self.db_port = "5432"
+ self.log = create_logger(Config().get("logging"))
+
+ def backup(self, filename):
+ cmd = "PGPASSWORD=\"%s\" pg_dump -h %s -p %s -U %s -c %s > %s" % \
+ (self.db_password, self.db_host, self.db_port, self.db_username, self.db_name, filename)
+ self.log.info("Shell execute: %s" % cmd)
+ result = os.system(cmd)
+ self.log.info("Shell result", result=result)
+ if result != 0:
+ raise Exception("pgdump failed")
+
+ def restore(self, filename):
+ cmd = "PGPASSWORD=\"%s\" psql -h %s -p %s -U %s %s < %s > /dev/null" % \
+ (self.db_password, self.db_host, self.db_port, self.db_username, self.db_name, filename)
+ self.log.info("Shell execute: %s" % cmd)
+ result = os.system(cmd)
+ self.log.info("Shell result", result=result)
+ if result != 0:
+ raise Exception("psql failed")
+
+
+BackupHandler = BackupHandler_postgres
diff --git a/xos/coreapi/backupprocessor.py b/xos/coreapi/backupprocessor.py
new file mode 100644
index 0000000..0f50bca
--- /dev/null
+++ b/xos/coreapi/backupprocessor.py
@@ -0,0 +1,245 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+""" BackupProcessor
+
+ This runs after the core process has completed, and is responsible for performing the actual
+ backup and restore operations.
+
+ JSON-encoded text files are used as a communication between the core, which requests backup/restore
+ operations, and this processor, which carries out the operations.
+"""
+
+import datetime
+import hashlib
+import json
+import os
+import traceback
+
+from xosconfig import Config
+from multistructlog import create_logger
+from backuphandler import BackupHandler
+
+
+class BackupProcessor(object):
+ def __init__(self):
+ self.backup_request_dir = "/var/run/xos/backup/requests"
+ self.backup_response_dir = "/var/run/xos/backup/responses"
+ self.backup_file_dir = "/var/run/xos/backup/local"
+ self.log = create_logger(Config().get("logging"))
+
+ def instrument_fail(self, req, where):
+ """ Check to see if the request indicates that a failure should be instrumented for testing
+ purposes. This is done by inserting special strings ("fail_before_restore", etc) into
+ the request's filename.
+ """
+
+ if where in req["file_details"]["backend_filename"]:
+ raise Exception("Instrumented Failure: %s" % where)
+
+ def compute_checksum(self, fn):
+ m = hashlib.sha1()
+ with open(fn, "rb") as f:
+ block = f.read(4096)
+ while block:
+ m.update(block)
+ block = f.read(4096)
+ return "sha1:" + m.hexdigest()
+
+ def try_models(self):
+ """ Returns True if django modeling is functional """
+ result = os.system("python try_models.py")
+ return result == 0
+
+ def emergency_rollback(self, emergency_rollback_fn):
+ self.log.warning("Performing emergency rollback")
+ BackupHandler().restore(emergency_rollback_fn)
+
+ def finalize_response(self, request, response, status, checksum=None, error_msg=None, exception=False):
+ """ Build a response dictionary, incorporating informaiton from the request, as well as information
+ generated while processing the response.
+
+ If there is an error_msg, it will be printed here and the error will also be encoded into
+ the reponse.
+ """
+ if error_msg:
+ # TODO(smbaker): Consider also including exception information?
+ response["error_msg"] = error_msg
+ response[""] = traceback.format_exc()
+ if exception:
+ self.log.exception(error_msg)
+ else:
+ self.log.error(error_msg)
+
+ self.log.info("Finalizing response", status=status, id=request["id"])
+ response["id"] = request["id"]
+ response["status"] = status
+ response["operation"] = request["operation"]
+ response["file_details"] = request["file_details"]
+
+ # Date formatted to be consistent with DBMS implementation
+ response["effective_date"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f+00")
+
+ if checksum:
+ response["file_details"]["checksum"] = checksum
+
+ fn = os.path.join(self.backup_response_dir, request["request_fn"] + "_response")
+ open(fn, "w").write(json.dumps(response))
+ os.remove(request["request_pathname"])
+ return status
+
+ def handle_backup_request(self, req):
+ resp = {}
+
+ backend_filename = req["file_details"]["backend_filename"]
+ backend_filename_dir = os.path.dirname(backend_filename)
+ if not backend_filename_dir:
+ return self.finalize_response(req, resp, "failed",
+ error_msg="backend_filename should include a directory")
+
+ # Ensure the backup directory exists
+ if not os.path.exists(backend_filename_dir):
+ os.makedirs(backend_filename_dir)
+
+ # Step 1: Run the backup
+
+ try:
+ self.instrument_fail(req, "fail_before_backup")
+ BackupHandler().backup(backend_filename)
+ except Exception:
+ return self.finalize_response(req, resp, "failed",
+ error_msg="Backup failed",
+ exception=True)
+
+ # Step 2: Generate the checksum
+
+ try:
+ checksum = self.compute_checksum(backend_filename)
+ except Exception:
+ return self.finalize_response(req, resp, "failed",
+ error_msg="checksum compute failed",
+ exception=True)
+
+ return self.finalize_response(req, resp, "created", checksum=checksum)
+
+ def handle_restore_request(self, req):
+ resp = {}
+
+ backend_filename = req["file_details"]["backend_filename"]
+ if not os.path.exists(backend_filename):
+ return self.finalize_response(req, resp, "failed",
+ error_msg="restore file %s does not exist" % backend_filename)
+
+ # Step 1: Verify checksum
+
+ checksum = req["file_details"].get("checksum")
+ if checksum:
+ computed_checksum = self.compute_checksum(backend_filename)
+ if computed_checksum != checksum:
+ return self.finalize_response(req, resp, "failed",
+ error_msg="checksum mismatch: %s != %s" % (computed_checksum, checksum),
+ exception=True)
+
+ # Step 2: Perform the emergency-rollback backup
+
+ if not os.path.exists(self.backup_file_dir):
+ os.makedirs(self.backup_file_dir)
+
+ emergency_rollback_fn = os.path.join(self.backup_file_dir, "emergency_rollback")
+
+ try:
+ self.instrument_fail(req, "fail_before_rollback")
+ BackupHandler().backup(emergency_rollback_fn)
+ except Exception:
+ return self.finalize_response(req, resp, "failed",
+ error_msg="Exception during create emergency rollback",
+ exception=True)
+
+ # Step 3: Perform the restore
+
+ try:
+ self.instrument_fail(req, "fail_before_restore")
+ BackupHandler().restore(backend_filename)
+ except Exception:
+ self.emergency_rollback(emergency_rollback_fn)
+ return self.finalize_response(req, resp, "failed",
+ error_msg="Exception during restore, emergency rollback performed",
+ exception=True)
+
+ # Step 4: Verify model integrity
+
+ if (not self.try_models()) or ("fail_try_models" in req["file_details"]["backend_filename"]):
+ self.emergency_rollback(emergency_rollback_fn)
+ return self.finalize_response(req, resp, "failed",
+ error_msg="Try_models failed, emergency rollback performed",
+ exception=True)
+
+ return self.finalize_response(req, resp, "restored")
+
+ def run(self):
+ # make sure the directory exists
+ if not os.path.exists(self.backup_request_dir):
+ os.makedirs(self.backup_request_dir)
+
+ for fn in os.listdir(self.backup_request_dir):
+ pathname = os.path.join(self.backup_request_dir, fn)
+ if os.path.isdir(pathname) or pathname.startswith("."):
+ continue
+
+ try:
+ request = json.loads(open(pathname).read())
+ except Exception:
+ # If we can't read it then ignore it
+ self.log.exception("Failed to read backup request", fn=pathname)
+ continue
+
+ try:
+ id = request["id"]
+ operation = request["operation"]
+ backend_filename = request["file_details"]["backend_filename"]
+ except Exception:
+ # If we can't understand it then ignore it
+ self.log.exception("Failed to understand backup request", fn=pathname)
+ continue
+
+ self.log.info(
+ "Processing request",
+ id=id,
+ operation=operation,
+ backend_filename=backend_filename)
+
+ request["request_fn"] = fn
+ request["request_pathname"] = pathname
+
+ try:
+ if (operation == "create"):
+ self.handle_backup_request(request)
+ elif (operation == "restore"):
+ self.handle_restore_request(request)
+ elif (operation == "verify"):
+ self.finalize_response(request, {}, "failed",
+ error_msg="Verify is not implemented yet")
+ else:
+ self.finalize_response(request, {}, "failed",
+ error_msg="Operation is not backup | restore | verify")
+ except Exception:
+ # Something failed in a way we didn't expect.
+ self.finalize_response(request, {}, "failed",
+ error_msg="Uncaught exception",
+ exception=True)
+
+
+if __name__ == "__main__":
+ Config.init()
+ BackupProcessor().run()
diff --git a/xos/coreapi/backupsetwatcher.py b/xos/coreapi/backupsetwatcher.py
new file mode 100644
index 0000000..704988f
--- /dev/null
+++ b/xos/coreapi/backupsetwatcher.py
@@ -0,0 +1,324 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+""" Backup Set Watcher
+
+ Watches for backup/restore operations. Stops the core so they can be executed.
+
+ The basic idea is that backup/restore requests are written to a directory, and responses are read from another
+ directory. After a request has been written the core will exit. Only one request is written at a time. Responses
+ are only read when the thread is started, as it's impossible for a response to show up while the core is
+ running.
+
+ 1. Operator optionally creates BackupFile in the XOS database to describe where backup shall be placed
+ 2. Operator creates BackupOperation in the XOS database
+ 2. BackupSetWatcher notices new BackupOperation, writes to file in /var/run/xos/backup/requests
+ 3. BackupSetWatcher terminates coreapi
+ 4. coreapi exits
+ 5. the root core shell script executes backup, writing response to /var/run/xos/backup/responses
+ 6. the root core shell script restarts coreapi
+ 7. BackupSetWatcher notices a new response, and updates the appropriate BackupOperation object.
+"""
+
+import datetime
+import json
+import os
+import threading
+import time
+
+from core.models import BackupFile, BackupOperation
+from django.db.models import Q, F
+
+from xosconfig import Config
+from multistructlog import create_logger
+log = create_logger(Config().get("logging"))
+
+
+# Restrict what the user can specify as a URI for now
+ALLOWED_URI = "file:///var/run/xos/backup/local/"
+
+
+def max_non_none(*lst):
+ lst = [x for x in lst if x is not None]
+ return max(lst)
+
+
+def set_enacted(model, code, status):
+ model.enacted = max_non_none(model.updated, model.changed_by_policy)
+ model.backend_code = code
+ model.backend_status = status
+
+
+class BackupDoesNotExist(Exception):
+ pass
+
+
+class BackupUnreadable(Exception):
+ pass
+
+
+class BackupSetWatcherThread(threading.Thread):
+ daemon = True
+ interval = 5
+
+ def __init__(self, server, *args, **kwargs):
+ self.terminate_signal = False
+ super(BackupSetWatcherThread, self).__init__(*args, **kwargs)
+
+ self.server = server
+ self.exiting = False
+ self.backup_request_dir = "/var/run/xos/backup/requests"
+ self.backup_response_dir = "/var/run/xos/backup/responses"
+ self.backup_file_dir = "/var/run/xos/backup/local"
+ self.process_response_dir()
+
+ def init_request_dir(self):
+ # make sure the directory exists
+ if not os.path.exists(self.backup_request_dir):
+ os.makedirs(self.backup_request_dir)
+
+ # make sure it is empty
+ for fn in os.listdir(self.backup_request_dir):
+ fn = os.path.join(self.backup_request_dir, fn)
+ if os.path.isdir(fn) or fn.startswith("."):
+ continue
+ os.remove(fn)
+
+ def process_response_create(self, id, operation, status, response):
+ file_details = response["file_details"]
+
+ backupops = BackupOperation.objects.filter(id=id)
+ if not backupops:
+ log.exception("Backup response refers to a backupop that does not exist", id=id)
+ raise BackupDoesNotExist()
+
+ backupop = backupops[0]
+
+ checksum = file_details.get("checksum")
+ if checksum:
+ backupop.file.checksum = checksum
+ backupop.file.save(allow_modify_feedback=True,
+ update_fields=["checksum"])
+
+ backupop.status = status
+ backupop.error_msg = response.get("error_msg", "")
+ backupop.effective_date = response.get("effective_date", "")
+ set_enacted(backupop, 1, "enacted")
+ backupop.save(allow_modify_feedback=True,
+ update_fields=["backend_code", "backend_status", "effective_date", "enacted", "file", "status",
+ "error_msg"])
+
+ def process_response_restore(self, id, operation, status, response):
+ file_details = response["file_details"]
+
+ # If the restore was successful, then look for any inprogress backups and mark them orphaned.
+ # There should be exactly one such inprogress backup, because there was a BackupOp sitting
+ # in the data model when the backup ran.
+ if (status == "restored"):
+ for req in BackupOperation.objects.filter(status="inprogress", operation="create"):
+ log.info("Orphaning inprogress backupop", backupop=req)
+ req.status = "orphaned"
+ req.save(allow_modify_feedback=True,
+ update_fields=["status"])
+
+ # It's likely the Restore operation doesn't exist, because it went away during the restore
+ # process. Check for the existing operation first, and if it doesn't exist, then create
+ # one to stand in its place.
+ backupops = BackupOperation.objects.filter(id=id)
+ if backupops:
+ backupop = backupops[0]
+ log.info("Resolved existing backupop model", backupop=backupop)
+ else:
+ backupfiles = BackupFile.objects.filter(id=file_details["id"])
+ if backupfiles:
+ backupfile = backupfiles[0]
+ else:
+ backupfile = BackupFile(
+ name=file_details["name"],
+ uri=file_details["uri"],
+ checksum=file_details["checksum"],
+ backend_filename=file_details["backend_filename"])
+ backupfile.save(allow_modify_feedback=True)
+ log.info("Created backupfile model", backupfile=backupfile)
+
+ backupop = BackupOperation(operation=operation,
+ file=backupfile)
+ backupop.save(allow_modify_feedback=True)
+ log.info("Created backupop model", backupop=backupop)
+
+ set_enacted(backupop, 1, "enacted")
+ backupop.error_msg = response.get("error_msg", "")
+ backupop.status = status
+ backupop.effective_date = response.get("effective_date", "")
+ backupop.save(allow_modify_feedback=True,
+ update_fields=["backend_code", "backend_status", "effective_date", "enacted", "status",
+ "error_msg"])
+
+ def process_response_dir(self):
+ # make sure the directory exists
+ if not os.path.exists(self.backup_response_dir):
+ os.makedirs(self.backup_response_dir)
+
+ log.info("Looking for backup responses")
+
+ # process the responses and delete them
+ for fn in os.listdir(self.backup_response_dir):
+ try:
+ fn = os.path.join(self.backup_response_dir, fn)
+ if os.path.isdir(fn) or fn.startswith("."):
+ continue
+
+ log.info("Processing backup response", fn=fn)
+
+ try:
+ contents = json.loads(open(fn).read())
+ except Exception:
+ # If we can't read it then ignore and delete it
+ log.exception("Failed to read backup response", fn=fn)
+ raise BackupUnreadable()
+
+ try:
+ id = contents["id"]
+ operation = contents["operation"]
+ status = contents["status"]
+ _ = contents["file_details"]["backend_filename"]
+ except Exception:
+ # If we can't understand it then ignore and delete it
+ log.exception("Failed to understand backup response", fn=fn)
+ raise BackupUnreadable()
+
+ if operation == "create":
+ self.process_response_create(id, operation, status, contents)
+ elif operation == "restore":
+ self.process_response_restore(id, operation, status, contents)
+
+ # We've successfully concluded. Delete the response file
+ os.remove(fn)
+ except (BackupUnreadable, BackupDoesNotExist):
+ # Critical failures that can never be resolved, and we can never update the status in the data model,
+ # so delete the response file
+ os.remove(fn)
+
+ def save_request(self, backupop):
+ request = {"id": backupop.id,
+ "operation": backupop.operation}
+
+ request["file_details"] = {
+ "id": backupop.file.id,
+ "name": backupop.file.name,
+ "uri": backupop.file.uri,
+ "checksum": backupop.file.checksum,
+ "backend_filename": backupop.file.backend_filename}
+
+ request_fn = os.path.join(self.backup_request_dir, "request")
+ open(request_fn, "w").write(json.dumps(request))
+
+ def run_once(self):
+ # If we're exiting due to a backup request being saved, then return
+ if self.exiting:
+ return
+
+ # The standard synchronizer dirty object query
+ backupops = BackupOperation.objects.filter(
+ Q(enacted=None)
+ | Q(enacted__lt=F("updated"))
+ | Q(enacted__lt=F("changed_by_policy")))
+
+ for backupop in backupops:
+ if backupop.operation not in ["create", "restore"]:
+ # TODO(smbaker): Implement verify
+ # If it's not a create or restore then there is
+ # nothing for us to do.
+ continue
+
+ if backupop.component != "xos":
+ # We only handle XOS Backups / Restores
+ continue
+
+ if backupop.status:
+ log.info("BackupOp is already executed", backupop=backupop, status=backupop.status)
+ # The op was assigned a status. We are done with it.
+ # Set its enacted timestamp to prevent finding it again.
+ set_enacted(backupop, 1, "already_executed")
+ backupop.save(allow_modify_feedback=True,
+ update_fields=["enacted", "backend_code", "backend_status"])
+ continue
+
+ # Restores must always specify a file
+ if backupop.operation == "restore" and (not backupop.file):
+ backupop.backend_code = 2
+ backupop.backend_status = "Must specify file for restore operation"
+ backupop.save(allow_modify_feedback=True,
+ update_fields=["backend_code", "backend_status"])
+ continue
+
+ if backupop.operation == "create":
+ if (backupop.file):
+ # file was specified, check that the uri is allowed
+ if (not backupop.file.uri) or (not backupop.file.uri.startswith(ALLOWED_URI)):
+ backupop.backend_code = 2
+ backupop.backend_status = "Only backup_uri that starts with %s is supported" % ALLOWED_URI
+ backupop.save(allow_modify_feedback=True,
+ update_fields=["backend_code", "backend_status"])
+ continue
+ else:
+ # Generate a backend_filename from uri by stripping off file://
+ # This will leave one leading slash on the URL.
+ backupop.file.backend_filename = backupop.file.uri[7:]
+ backupop.file.save(allow_modify_feedback=True,
+ update_fields=["backend_filename"])
+ else:
+ # Create operation doesn't specify a file, so autogenerate one.
+ log.info("Adding autogenerated file to BackupOp", backupop=backupop)
+ current_datetime = datetime.datetime.today().strftime('%Y-%m-%d-%H-%M-%S')
+ name = "autogenerated-file-%s" % current_datetime
+ backend_filename = os.path.join(self.backup_file_dir, name)
+ backupop_file = BackupFile(name=name,
+ uri="file://" + backend_filename,
+ backend_filename=backend_filename)
+ backupop_file.save(allow_modify_feedback=True)
+ backupop.file = backupop_file
+ backupop.save(update_fields=["file"])
+
+ # There can only be one request at a time. Ensure the directory is empty.
+ self.init_request_dir()
+ self.save_request(backupop)
+ self.exiting = True
+
+ # Mark the operation as inprogress
+ backupop.status = "inprogress"
+ backupop.save(allow_modify_feedback=True,
+ update_fields=["effective_date", "status"])
+
+ log.info("Backup/Restore request saved - initiating core shutdown")
+ self.server.delayed_shutdown(0)
+
+ # Stop looping through backupops. Since we set self.exiting=True, we know the loop will
+ # not be called again.
+ return
+
+ def run(self):
+ while not self.terminate_signal:
+ start = time.time()
+ try:
+ self.run_once()
+ except BaseException:
+ log.exception("backupop_watcher: Exception in run loop")
+
+ telap = time.time() - start
+ if telap < self.interval:
+ time.sleep(self.interval - telap)
+
+ def stop(self):
+ self.terminate_signal = True
diff --git a/xos/coreapi/core_main.py b/xos/coreapi/core_main.py
index 5392534..619ffef 100644
--- a/xos/coreapi/core_main.py
+++ b/xos/coreapi/core_main.py
@@ -14,6 +14,7 @@
import argparse
import prometheus_client
+import sys
# FIXME: should grpc_server initialize the Config?
from grpc_server import XOSGrpcServer
@@ -45,6 +46,34 @@
default=None,
help="file containing output of model prep step",
)
+ parser.add_argument(
+ "--no-backupwatcher",
+ dest="enable_backupwatcher",
+ action="store_false",
+ default=True,
+ help="disable the backupwatcher thread",
+ )
+ parser.add_argument(
+ "--no-reaper",
+ dest="enable_reaper",
+ action="store_false",
+ default=True,
+ help="disable the reaper thread",
+ )
+ parser.add_argument(
+ "--no-server",
+ dest="enable_server",
+ action="store_false",
+ default=True,
+ help="disable the grpc server thread",
+ )
+ parser.add_argument(
+ "--no-prometheus",
+ dest="enable_prometheus",
+ action="store_false",
+ default=True,
+ help="disable the prometheus thread",
+ )
args = parser.parse_args()
if args.model_output:
@@ -68,23 +97,50 @@
return reaper
+def init_backupset_watcher(server):
+ backupset_watcher = None
+ try:
+ from backupsetwatcher import BackupSetWatcherThread
+
+ backupset_watcher = BackupSetWatcherThread(server)
+ backupset_watcher.start()
+ except BaseException:
+ log.exception("Failed to initialize backupset watcher")
+
+ return backupset_watcher
+
+
if __name__ == "__main__":
args = parse_args()
# start the prometheus server
# TODO (teone) consider moving this in a separate process so that it won't die when we load services
- prometheus_client.start_http_server(8000)
+ if args.enable_prometheus:
+ prometheus_client.start_http_server(8000)
server = XOSGrpcServer(
model_status=args.model_status, model_output=args.model_output
)
- server.start()
- if server.django_initialized:
- reaper = init_reaper()
+ if args.enable_server:
+ server.start()
else:
- log.warning("Skipping reaper as django is not initialized")
- reaper = None
+ # This is primarily to facilitate development, running the reaper and/or the backupwatcher without
+ # actually starting the grpc server.
+ server.init_django()
+ if not server.django_initialized:
+ log.error("Django is broken. Please remove the synchronizer causing the problem and restart the core.")
+ sys.exit(-1)
+
+ reaper = None
+ backup_watcher = None
+ if server.django_initialized:
+ if args.enable_reaper:
+ reaper = init_reaper()
+ if args.enable_backupwatcher:
+ backup_watcher = init_backupset_watcher(server)
+ else:
+ log.warning("Skipping reaper and backupset_watcher as django is not initialized")
log.info("XOS core entering wait loop")
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
@@ -99,3 +155,6 @@
if reaper:
reaper.stop()
+
+ if backup_watcher:
+ backup_watcher.stop()
diff --git a/xos/coreapi/test_backupsetwatcher.py b/xos/coreapi/test_backupsetwatcher.py
new file mode 100644
index 0000000..62acff3
--- /dev/null
+++ b/xos/coreapi/test_backupsetwatcher.py
@@ -0,0 +1,106 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import pdb
+import sys
+import unittest
+from mock import MagicMock, Mock
+from pyfakefs import fake_filesystem_unittest
+from io import open
+
+from xosconfig import Config
+
+
+class MockServer(object):
+ def __init__(self):
+ super(MockServer, self).__init__()
+
+ def delayed_shutdown(self, seconds):
+ pass
+
+
+class TestBackupWatcher(fake_filesystem_unittest.TestCase):
+ def setUp(self):
+ config = os.path.abspath(
+ os.path.dirname(os.path.realpath(__file__)) + "/test_config.yaml"
+ )
+ Config.clear() # in case left unclean by a previous test case
+ Config.init(config)
+
+ self.mock_backup_operation = MagicMock()
+ self.mock_backup_file = MagicMock()
+
+ sys.modules["core"] = Mock()
+ sys.modules["core.models"] = Mock(BackupOperation=self.mock_backup_operation,
+ BackupFile=self.mock_backup_file)
+
+ import backupsetwatcher
+ self.backupsetwatcher = backupsetwatcher
+
+ self.setUpPyfakefs()
+
+ self.server = MockServer()
+ self.watcher = backupsetwatcher.BackupSetWatcherThread(self.server)
+
+ def tearDown(self):
+ pass
+
+ def test_init_request_dir(self):
+ self.assertFalse(os.path.exists(self.watcher.backup_request_dir))
+
+ # Should create the directory
+ self.watcher.init_request_dir()
+ self.assertTrue(os.path.exists(self.watcher.backup_request_dir))
+
+ # Shoudl remove any existing files
+ fn = os.path.join(self.watcher.backup_request_dir, "foo")
+ open(fn, "w").write("foo")
+ self.watcher.init_request_dir()
+ self.assertFalse(os.path.exists(fn))
+
+ def test_process_response_create_noexist(self):
+ file_details = {"checksum": "1234"}
+ response = {"file_details": file_details}
+
+ self.backupsetwatcher.BackupOperation.objects.filter.return_value = []
+
+ with self.assertRaises(self.backupsetwatcher.BackupDoesNotExist):
+ self.watcher.process_response_create(id=1, operation="create", status="created", response=response)
+
+ def test_process_response_create(self):
+ file_details = {"checksum": "1234"}
+ response = {"file_details": file_details}
+
+ file = MagicMock()
+ op = MagicMock(file=file)
+
+ self.backupsetwatcher.BackupOperation.objects.filter.return_value = [op]
+
+ self.watcher.process_response_create(id=1, operation="create", status="created", response=response)
+
+ self.assertEqual(op.file.checksum, "1234")
+ op.file.save.assert_called()
+
+ self.assertEqual(op.status, "created")
+ self.assertEqual(op.error_msg, "")
+ op.save.assert_called()
+
+
+def main():
+ unittest.main()
+
+
+if __name__ == "__main__":
+ main()