SEBA-462 Service unload can be initiated by synchronizer
Change-Id: Idb8b924a6b048b16cdb6b04e91058026932d827d
diff --git a/lib/xos-synchronizer/xossynchronizer/backend.py b/lib/xos-synchronizer/xossynchronizer/backend.py
index d4b7e67..55977b2 100644
--- a/lib/xos-synchronizer/xossynchronizer/backend.py
+++ b/lib/xos-synchronizer/xossynchronizer/backend.py
@@ -111,7 +111,11 @@
self.log.info("Skipping observer thread due to no steps dir.")
pull_steps_dir = Config.get("pull_steps_dir")
- if pull_steps_dir:
+ if not pull_steps_dir:
+ self.log.info("Skipping pull step engine due to no pull_steps_dir dir.")
+ elif Config.get("desired_state") == "unload":
+ self.log.info("Skipping pull steps engine due to synchronizer unloading.")
+ else:
self.log.info("Starting XOSPullStepEngine", pull_steps_dir=pull_steps_dir)
pull_steps_engine = XOSPullStepEngine(model_accessor=self.model_accessor)
pull_steps_engine.load_pull_step_modules(pull_steps_dir)
@@ -119,17 +123,17 @@
target=pull_steps_engine.start, name="pull_step_engine"
)
pull_steps_thread.start()
- else:
- self.log.info("Skipping pull step engine due to no pull_steps_dir dir.")
event_steps_dir = Config.get("event_steps_dir")
- if event_steps_dir:
+ if not event_steps_dir:
+ self.log.info("Skipping event engine due to no event_steps dir.")
+ elif Config.get("desired_state") == "unload":
+ self.log.info("Skipping event engine due to synchronizer unloading.")
+ else:
self.log.info("Starting XOSEventEngine", event_steps_dir=event_steps_dir)
event_engine = XOSEventEngine(model_accessor=self.model_accessor, log=self.log)
event_engine.load_event_step_modules(event_steps_dir)
event_engine.start()
- else:
- self.log.info("Skipping event engine due to no event_steps dir.")
# start model policies thread
policies_dir = Config.get("model_policies_dir")
diff --git a/lib/xos-synchronizer/xossynchronizer/loadmodels.py b/lib/xos-synchronizer/xossynchronizer/loadmodels.py
index 78fa1a6..7acce41 100644
--- a/lib/xos-synchronizer/xossynchronizer/loadmodels.py
+++ b/lib/xos-synchronizer/xossynchronizer/loadmodels.py
@@ -20,6 +20,10 @@
class ModelLoadClient(object):
+ REQUIRE_CLEAN = 0
+ AUTOMATICALLY_CLEAN = 1
+ PURGE = 2
+
def __init__(self, api):
self.api = api
@@ -67,3 +71,15 @@
item.contents = open(os.path.join(migrations_dir, fn)).read()
result = self.api.dynamicload.LoadModels(request)
+
+ return result
+
+ def unload_models(self, name, version="unknown", cleanup_behavior=REQUIRE_CLEAN):
+ request = self.api.dynamicload_pb2.UnloadModelsRequest(
+ name=name,
+ version=version,
+ cleanup_behavior=cleanup_behavior)
+ result = self.api.dynamicload.UnloadModels(request)
+
+ return result
+
diff --git a/lib/xos-synchronizer/xossynchronizer/modelaccessor.py b/lib/xos-synchronizer/xossynchronizer/modelaccessor.py
index 1418cc6..c2d5114 100644
--- a/lib/xos-synchronizer/xossynchronizer/modelaccessor.py
+++ b/lib/xos-synchronizer/xossynchronizer/modelaccessor.py
@@ -28,7 +28,7 @@
import os
import signal
import sys
-import time
+from threading import Timer
from loadmodels import ModelLoadClient
from xosconfig import Config
@@ -37,6 +37,7 @@
log = create_logger(Config().get("logging"))
+after_reactor_exit_code = None
orig_sigint = None
model_accessor = None
@@ -161,6 +162,43 @@
reactor.callLater(1, functools.partial(keep_trying, client, reactor))
+def unload_models(client, reactor, version):
+ # This function is called by a timer until it succeeds.
+ log.info("unload_models initiated by timer")
+
+ try:
+ result = ModelLoadClient(client).unload_models(
+ Config.get("name"),
+ version=version,
+ cleanup_behavior=ModelLoadClient.AUTOMATICALLY_CLEAN)
+
+ log.debug("Unload response", result=result)
+
+ if result.status in [result.SUCCESS, result.SUCCESS_NOTHING_CHANGED]:
+ log.info("Models successfully unloaded. Exiting with status", code=0)
+ sys.exit(0)
+
+ if result.status == result.TRYAGAIN:
+ log.info("TRYAGAIN received. Expect to try again in 30 seconds.")
+
+ except Exception as e:
+ # If the synchronizer is operational, then assume the ORM's restart_on_disconnect will deal with the
+ # connection being lost.
+ log.exception("Error while unloading. Expect to try again in 30 seconds.")
+
+ Timer(30, functools.partial(unload_models, client, reactor, version)).start()
+
+def exit_while_inside_reactor(reactor, code):
+ """ Calling sys.exit() while inside reactor ends up trapped by reactor.
+
+ So what we'll do is set a flag indicating we want to exit, then stop reactor, then return
+ """
+ global after_reactor_exit_code
+
+ reactor.stop()
+ signal.signal(signal.SIGINT, orig_sigint)
+ after_reactor_exit_code = code
+
def grpcapi_reconnect(client, reactor):
global model_accessor
@@ -172,9 +210,29 @@
version = autodiscover_version_of_main(max_parent_depth=0) or "unknown"
log.info("Service version is %s" % version)
try:
- ModelLoadClient(client).upload_models(
- Config.get("name"), Config.get("models_dir"), version=version
- )
+ if Config.get("desired_state") == "load":
+ ModelLoadClient(client).upload_models(
+ Config.get("name"), Config.get("models_dir"), version=version
+ )
+ elif Config.get("desired_state") == "unload":
+ # Try for an easy unload. If there's no dirty models, then unload will succeed without
+ # requiring us to setup the synchronizer.
+ log.info("Trying for an easy unload_models")
+ result = ModelLoadClient(client).unload_models(
+ Config.get("name"),
+ version=version,
+ cleanup_behavior=1) # FIXME: hardcoded value for automatic delete
+ if result.status in [result.SUCCESS, result.SUCCESS_NOTHING_CHANGED]:
+ log.info("Models successfully unloaded. Synchronizer exiting")
+ exit_while_inside_reactor(reactor, 0)
+ return
+
+ # We couldn't unload the easy way, so we'll have to do it the hard way. Fall through and
+ # setup the synchronizer.
+ else:
+ log.error("Misconfigured", desired_state=Config.get("desired_state"))
+ exit_while_inside_reactor(reactor, -1)
+ return
except Exception as e: # TODO: narrow exception scope
if (
hasattr(e, "code")
@@ -242,6 +300,10 @@
# Restore the sigint handler
signal.signal(signal.SIGINT, orig_sigint)
+ # Check to see if we still want to unload
+ if Config.get("desired_state") == "unload":
+ Timer(30, functools.partial(unload_models, client, reactor, version)).start()
+
def config_accessor_grpcapi():
global orig_sigint
@@ -282,6 +344,11 @@
reactor.run()
+ # Catch if we wanted to stop while inside of a reactor callback
+ if after_reactor_exit_code is not None:
+ log.info("exiting with status", code=after_reactor_exit_code)
+ sys.exit(after_reactor_exit_code)
+
def config_accessor_mock():
global model_accessor