CORD-706: Replaced Ansible binary interface with Ansible native API
Change-Id: Id9e20c3b287d961279a3606bf6ebcea289266e61
diff --git a/xos/synchronizers/base/ansible_helper.py b/xos/synchronizers/base/ansible_helper.py
index cea9686..61520df 100644
--- a/xos/synchronizers/base/ansible_helper.py
+++ b/xos/synchronizers/base/ansible_helper.py
@@ -10,7 +10,8 @@
import traceback
import subprocess
from xos.config import Config, XOS_DIR
-from xos.logger import observer_logger
+from xos.logger import observer_logger as logger
+from ansible_runner import *
step_dir = Config().observer_steps_dir
sys_dir = Config().observer_sys_dir
@@ -18,76 +19,6 @@
os_template_loader = jinja2.FileSystemLoader( searchpath=[step_dir, "/opt/xos/synchronizers/shared_templates"])
os_template_env = jinja2.Environment(loader=os_template_loader)
-def parse_output(msg):
- lines = msg.splitlines()
- results = []
-
- observer_logger.info(msg)
-
- for l in lines:
- magic_str = 'ok: [127.0.0.1] => '
- magic_str2 = 'changed: [127.0.0.1] => '
- magic_str3 = 'ok: [localhost] => '
- magic_str4 = 'changed: [localhost] => '
- if (l.startswith(magic_str)):
- w = len(magic_str)
- str = l[w:]
-
- # handle ok: [127.0.0.1] => (item=org.onosproject.driver) => {...
- if str.startswith("(") and (" => {" in str):
- str = str.split("=> ",1)[1]
-
- d = json.loads(str)
- results.append(d)
- elif (l.startswith(magic_str2)):
- w = len(magic_str2)
- str = l[w:]
-
- if str.startswith("(") and (" => {" in str):
- str = str.split("=> ",1)[1]
-
- d = json.loads(str)
- results.append(d)
- elif (l.startswith(magic_str3)):
- w = len(magic_str3)
- str = l[w:]
-
- # handle ok: [127.0.0.1] => (item=org.onosproject.driver) => {...
- if str.startswith("(") and (" => {" in str):
- str = str.split("=> ",1)[1]
-
- d = json.loads(str)
- results.append(d)
- elif (l.startswith(magic_str4)):
- w = len(magic_str4)
- str = l[w:]
-
- if str.startswith("(") and (" => {" in str):
- str = str.split("=> ",1)[1]
-
- d = json.loads(str)
- results.append(d)
-
-
- return results
-
-def parse_unreachable(msg):
- total_unreachable=0
- total_failed=0
- for l in msg.splitlines():
- x = re.findall('ok=([0-9]+).*changed=([0-9]+).*unreachable=([0-9]+).*failed=([0-9]+)', l)
- if x:
- (ok, changed, unreachable, failed) = x[0]
- ok=int(ok)
- changed=int(changed)
- unreachable=int(unreachable)
- failed=int(failed)
-
- total_unreachable += unreachable
- total_failed += failed
- return {'unreachable':total_unreachable,'failed':total_failed}
-
-
def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))
@@ -115,7 +46,7 @@
return (opts, os.path.join(pathed_sys_dir,objname))
-def run_template(name, opts, path='', expected_num=None, ansible_config=None, ansible_hosts=None, run_ansible_script=None):
+def run_template(name, opts, path='', expected_num=None, ansible_config=None, ansible_hosts=None, run_ansible_script=None, object=None):
template = os_template_env.get_template(name)
buffer = template.render(opts)
@@ -133,54 +64,64 @@
if ansible_hosts:
env["ANSIBLE_HOSTS"] = ansible_hosts
- if (not Config().observer_pretend):
- if not run_ansible_script:
- run_ansible_script = os.path.join(XOS_DIR, "synchronizers/base/run_ansible")
+ # Dropped support for observer_pretend - to be redone
+ runner = Runner(
+ playbook=fqp,
+ run_data=opts)
+
- process = subprocess.Popen("%s %s" % (run_ansible_script, shellquote(fqp)), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
- msg = process.stdout.read()
- err_msg = process.stderr.read()
-
- if getattr(Config(), "observer_save_ansible_output", False):
- try:
- open(fqp+".out","w").write(msg)
- open(fqp+".err","w").write(err_msg)
- except:
- # fail silently
- pass
-
- else:
- msg = open(fqp+'.out').read()
+ stats,aresults = runner.run()
try:
- ok_results = parse_output(msg)
+ ok_results = []
+ total_unreachable = 0
+ failed = 0
+
+ error_msg = []
+ for x in aresults:
+ if not x.is_failed() and not x.is_unreachable() and not x.is_skipped():
+ ok_results.append(x)
+ elif x.is_unreachable():
+ total_unreachable+=1
+ try:
+ error_msg.append(x._result['msg'])
+ except:
+ pass
+ elif x.is_failed():
+ failed+=1
+ try:
+ error_msg.append(x._result['msg'])
+ except:
+ pass
+
+
if (expected_num is not None) and (len(ok_results) != expected_num):
raise ValueError('Unexpected num %s!=%d' % (str(expected_num), len(ok_results)) )
- parsed = parse_unreachable(msg)
- total_unreachable = parsed['unreachable']
- failed = parsed['failed']
+ #total_unreachable = stats.unreachable
+
if (failed):
raise ValueError('Ansible playbook failed.')
- if (total_unreachable > 0):
- raise ValueError("Unreachable results in ansible recipe")
except ValueError,e:
- all_fatal = [e.message] + re.findall(r'^msg: (.*)',msg,re.MULTILINE)
- all_fatal2 = re.findall(r'^ERROR: (.*)',msg,re.MULTILINE)
- all_fatal3 = re.findall(r'^failed:.*"msg": "(.*)"',msg,re.MULTILINE)
-
- all_fatal.extend(all_fatal2)
- all_fatal.extend(all_fatal3)
try:
- error = ' // '.join(all_fatal)
+ error = ' // '.join(error_msg)
except:
pass
raise Exception(error)
- return ok_results
+ if (object):
+ oprops = object.tologdict()
+ for i in ok_results:
+ ansible = i._result
+ ansible['ansible'] = 1
+ c = dict(oprops.items() + ansible.items())
+ logger.info(i._task, extra=c)
+
+ processed_results = map(lambda x:x._result, ok_results)
+ return processed_results[1:] # 0 is setup
-def run_template_ssh(name, opts, path='', expected_num=None):
+def run_template_ssh(name, opts, path='', expected_num=None, object=None):
instance_name = opts["instance_name"]
hostname = opts["hostname"]
private_key = opts["private_key"]
@@ -241,7 +182,7 @@
print "ANSIBLE_CONFIG=%s" % config_pathname
print "ANSIBLE_HOSTS=%s" % hosts_pathname
- return run_template(name, opts, path, ansible_config = config_pathname, ansible_hosts = hosts_pathname, run_ansible_script="/opt/xos/synchronizers/base/run_ansible_verbose")
+ return run_template(name, opts, path, ansible_config = config_pathname, ansible_hosts = hosts_pathname, run_ansible_script="/opt/xos/synchronizers/base/run_ansible_verbose", object=object)
diff --git a/xos/synchronizers/base/ansible_runner.py b/xos/synchronizers/base/ansible_runner.py
new file mode 100644
index 0000000..8089b43
--- /dev/null
+++ b/xos/synchronizers/base/ansible_runner.py
@@ -0,0 +1,158 @@
+#!/usr/bin/python
+
+import os
+import sys
+
+from tempfile import NamedTemporaryFile
+from ansible.inventory import Inventory
+from ansible.vars import VariableManager
+from ansible.parsing.dataloader import DataLoader
+from ansible.executor import playbook_executor
+from ansible.utils.display import Display
+from ansible.plugins.callback import CallbackBase
+
+
+class ResultCallback(CallbackBase):
+ def __init__(self):
+ super(ResultCallback, self).__init__()
+ self.results = []
+
+ def v2_runner_on_ok(self, result, **kwargs):
+ self.results.append(result)
+
+ def v2_runner_on_failed(self, result, **kwargs):
+ host = result._host
+ self.results.append(result)
+
+ def v2_runner_on_unreachable(self, result, **kwargs):
+ host = result._host
+ self.results.append(result)
+
+class Options(object):
+ """
+ Options class to replace Ansible OptParser
+ """
+ def __init__(self, verbosity=None, inventory=None, listhosts=None, subset=None, module_paths=None, extra_vars=None,
+ forks=None, ask_vault_pass=None, vault_password_files=None, new_vault_password_file=None,
+ output_file=None, tags=None, skip_tags=None, one_line=None, tree=None, ask_sudo_pass=None, ask_su_pass=None,
+ sudo=None, sudo_user=None, become=None, become_method=None, become_user=None, become_ask_pass=None,
+ ask_pass=None, private_key_file=None, remote_user=None, connection=None, timeout=None, ssh_common_args=None,
+ sftp_extra_args=None, scp_extra_args=None, ssh_extra_args=None, poll_interval=None, seconds=None, check=None,
+ syntax=None, diff=None, force_handlers=None, flush_cache=None, listtasks=None, listtags=None, module_path=None):
+ self.verbosity = verbosity
+ self.inventory = inventory
+ self.listhosts = listhosts
+ self.subset = subset
+ self.module_paths = module_paths
+ self.extra_vars = extra_vars
+ self.forks = forks
+ self.ask_vault_pass = ask_vault_pass
+ self.vault_password_files = vault_password_files
+ self.new_vault_password_file = new_vault_password_file
+ self.output_file = output_file
+ self.tags = tags
+ self.skip_tags = skip_tags
+ self.one_line = one_line
+ self.tree = tree
+ self.ask_sudo_pass = ask_sudo_pass
+ self.ask_su_pass = ask_su_pass
+ self.sudo = sudo
+ self.sudo_user = sudo_user
+ self.become = become
+ self.become_method = become_method
+ self.become_user = become_user
+ self.become_ask_pass = become_ask_pass
+ self.ask_pass = ask_pass
+ self.private_key_file = private_key_file
+ self.remote_user = remote_user
+ self.connection = connection
+ self.timeout = timeout
+ self.ssh_common_args = ssh_common_args
+ self.sftp_extra_args = sftp_extra_args
+ self.scp_extra_args = scp_extra_args
+ self.ssh_extra_args = ssh_extra_args
+ self.poll_interval = poll_interval
+ self.seconds = seconds
+ self.check = check
+ self.syntax = syntax
+ self.diff = diff
+ self.force_handlers = force_handlers
+ self.flush_cache = flush_cache
+ self.listtasks = listtasks
+ self.listtags = listtags
+ self.module_path = module_path
+
+
+class Runner(object):
+
+ def __init__(self, playbook, run_data, private_key_file=None, verbosity=0):
+
+ self.run_data = run_data
+
+ self.options = Options()
+ self.options.output_file = playbook + '.result'
+ self.options.private_key_file = private_key_file
+ self.options.verbosity = verbosity
+ self.options.connection = 'ssh' # Need a connection type "smart" or "ssh"
+ #self.options.become = True
+ #self.options.become_method = 'sudo'
+ #self.options.become_user = 'root'
+
+ # Set global verbosity
+ self.display = Display()
+ self.display.verbosity = self.options.verbosity
+ # Executor appears to have it's own
+ # verbosity object/setting as well
+ playbook_executor.verbosity = self.options.verbosity
+
+ # Become Pass Needed if not logging in as user root
+ #passwords = {'become_pass': become_pass}
+
+ # Gets data from YAML/JSON files
+ self.loader = DataLoader()
+ try:
+ self.loader.set_vault_password(os.environ['VAULT_PASS'])
+ except KeyError:
+ pass
+
+
+ # All the variables from all the various places
+ self.variable_manager = VariableManager()
+ self.variable_manager.extra_vars = self.run_data
+
+ # Set inventory, using most of above objects
+ self.inventory = Inventory(loader=self.loader, variable_manager=self.variable_manager)
+ self.variable_manager.set_inventory(self.inventory)
+
+
+ # Setup playbook executor, but don't run until run() called
+ self.pbex = playbook_executor.PlaybookExecutor(
+ playbooks=[playbook],
+ inventory=self.inventory,
+ variable_manager=self.variable_manager,
+ loader=self.loader,
+ options=self.options,
+ passwords={})
+
+ def run(self):
+ # Results of PlaybookExecutor
+ os.environ['REQUESTS_CA_BUNDLE'] = '/usr/local/share/ca-certificates/local_certs.crt'
+ callback = ResultCallback()
+ self.pbex._tqm._stdout_callback = callback
+
+ self.pbex.run()
+ stats = self.pbex._tqm._stats
+
+ # Test if success for record_logs
+ run_success = True
+ hosts = sorted(stats.processed.keys())
+ for h in hosts:
+ t = stats.summarize(h)
+ if t['unreachable'] > 0 or t['failures'] > 0:
+ run_success = False
+
+
+ #os.remove(self.hosts.name)
+
+ return stats,callback.results
+