blob: 9630ecb8779d4bc33d8f552ff588b57a00ab2509 [file] [log] [blame]
from basetest import *
import logging
import StringIO
import subprocess
import sys
from synchronizers.base.event_loop import XOSObserver
from synchronizers.model_policy import run_policy_once
from xos.config import set_override
from util.logger import Logger, observer_logger
class BaseObserverToscaTest(BaseToscaTest):
hide_observer_output = True
def __init__(self):
super(BaseObserverToscaTest, self).__init__()
def get_usable_deployment(self):
return "MyDeployment"
def get_usable_controller(self):
return "CloudLab"
def ensure_observer_not_running(self):
ps_output = subprocess.Popen("ps -elfy", shell=True, stdout=subprocess.PIPE).stdout.read()
if "/opt/xos/xos-observer.py" in ps_output:
print >> sys.stderr, "an observer is still running"
print >> sys.stderr, "please stop it, for example 'supervisorctl stop observer'"
sys.exit(-1)
def log_to_memory(self):
logStream = StringIO.StringIO()
handler = logging.StreamHandler(stream=logStream)
handler.setLevel(logging.DEBUG)
handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
loggername = Logger().loggername
log = logging.getLogger(loggername)
for hdlr in log.handlers[:]:
log.removeHandler(hdlr)
log.addHandler(handler)
log.propagate = False
log = observer_logger.logger
for hdlr in log.handlers[:]:
log.removeHandler(hdlr)
log.addHandler(handler)
log.propagate = False
self.logStream = logStream
def hide_output(self):
set_override("observer_console_print", False)
self.log_to_memory()
sys.stdout = self.logStream
sys.stderr = self.logStream
def restore_output(self):
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
if not self.hide_observer_output:
print self.logStream.getvalue()
def save_output(self, what, fn):
file(fn,"w").write(self.logStream.getvalue())
print >> sys.__stdout__," (%s log saved to %s)" % (what, fn)
def run_model_policy(self, save_output=None):
self.ensure_observer_not_running()
self.hide_output()
try:
print ">>>>> run model_policies"
run_policy_once()
print ">>>>> done model_policies"
if save_output:
self.save_output("model_policy",save_output)
finally:
self.restore_output()
def run_observer(self, save_output=None):
self.ensure_observer_not_running()
self.log_to_memory()
self.hide_output()
try:
print ">>>>> run observer"
observer = XOSObserver()
observer.run_once()
print ">>>>> done observer"
if save_output:
self.save_output("observer",save_output)
finally:
self.restore_output()