CORD-706: Replaced Ansible binary interface with Ansible native API

Change-Id: Id9e20c3b287d961279a3606bf6ebcea289266e61
diff --git a/xos/synchronizers/base/ansible_helper.py b/xos/synchronizers/base/ansible_helper.py
index cea9686..61520df 100644
--- a/xos/synchronizers/base/ansible_helper.py
+++ b/xos/synchronizers/base/ansible_helper.py
@@ -10,7 +10,8 @@
 import traceback
 import subprocess
 from xos.config import Config, XOS_DIR
-from xos.logger import observer_logger
+from xos.logger import observer_logger as logger
+from ansible_runner import *
 
 step_dir = Config().observer_steps_dir
 sys_dir = Config().observer_sys_dir
@@ -18,76 +19,6 @@
 os_template_loader = jinja2.FileSystemLoader( searchpath=[step_dir, "/opt/xos/synchronizers/shared_templates"])
 os_template_env = jinja2.Environment(loader=os_template_loader)
 
-def parse_output(msg):
-    lines = msg.splitlines()
-    results = []
-
-    observer_logger.info(msg)
-
-    for l in lines:
-        magic_str = 'ok: [127.0.0.1] => '
-        magic_str2 = 'changed: [127.0.0.1] => '
-        magic_str3 = 'ok: [localhost] => '
-        magic_str4 = 'changed: [localhost] => '
-        if (l.startswith(magic_str)):
-            w = len(magic_str)
-            str = l[w:]
-
-            # handle ok: [127.0.0.1] => (item=org.onosproject.driver) => {...
-            if str.startswith("(") and (" => {" in str):
-                str = str.split("=> ",1)[1]
-
-            d = json.loads(str)
-            results.append(d)
-        elif (l.startswith(magic_str2)):
-            w = len(magic_str2)
-            str = l[w:]
-
-            if str.startswith("(") and (" => {" in str):
-                str = str.split("=> ",1)[1]
-
-            d = json.loads(str)
-            results.append(d)
-        elif (l.startswith(magic_str3)):
-            w = len(magic_str3)
-            str = l[w:]
-
-            # handle ok: [127.0.0.1] => (item=org.onosproject.driver) => {...
-            if str.startswith("(") and (" => {" in str):
-                str = str.split("=> ",1)[1]
-
-            d = json.loads(str)
-            results.append(d)
-        elif (l.startswith(magic_str4)):
-            w = len(magic_str4)
-            str = l[w:]
-
-            if str.startswith("(") and (" => {" in str):
-                str = str.split("=> ",1)[1]
-
-            d = json.loads(str)
-            results.append(d)
-
-
-    return results
-
-def parse_unreachable(msg):
-    total_unreachable=0
-    total_failed=0
-    for l in msg.splitlines():
-        x = re.findall('ok=([0-9]+).*changed=([0-9]+).*unreachable=([0-9]+).*failed=([0-9]+)', l)
-        if x:
-            (ok, changed, unreachable, failed) = x[0]
-            ok=int(ok)
-            changed=int(changed)
-            unreachable=int(unreachable)
-            failed=int(failed)
-
-            total_unreachable += unreachable
-            total_failed += failed
-    return {'unreachable':total_unreachable,'failed':total_failed}
-
-
 def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
     return ''.join(random.choice(chars) for _ in range(size))
 
@@ -115,7 +46,7 @@
 
     return (opts, os.path.join(pathed_sys_dir,objname))
 
-def run_template(name, opts, path='', expected_num=None, ansible_config=None, ansible_hosts=None, run_ansible_script=None):
+def run_template(name, opts, path='', expected_num=None, ansible_config=None, ansible_hosts=None, run_ansible_script=None, object=None):
     template = os_template_env.get_template(name)
     buffer = template.render(opts)
 
@@ -133,54 +64,64 @@
     if ansible_hosts:
        env["ANSIBLE_HOSTS"] = ansible_hosts
 
-    if (not Config().observer_pretend):
-        if not run_ansible_script:
-            run_ansible_script = os.path.join(XOS_DIR, "synchronizers/base/run_ansible")
+    # Dropped support for observer_pretend - to be redone
+    runner = Runner(
+        playbook=fqp,
+        run_data=opts)
+        
 
-        process = subprocess.Popen("%s %s" % (run_ansible_script, shellquote(fqp)), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
-        msg = process.stdout.read()
-        err_msg = process.stderr.read()
-
-        if getattr(Config(), "observer_save_ansible_output", False):
-            try:
-                open(fqp+".out","w").write(msg)
-                open(fqp+".err","w").write(err_msg)
-            except:
-                # fail silently
-                pass
-
-    else:
-        msg = open(fqp+'.out').read()
+    stats,aresults = runner.run()
 
     try:
-        ok_results = parse_output(msg)
+        ok_results = []
+        total_unreachable = 0
+        failed = 0
+
+        error_msg = []
+        for x in aresults:
+            if not x.is_failed() and not x.is_unreachable() and not x.is_skipped():
+                ok_results.append(x)
+            elif x.is_unreachable():
+                total_unreachable+=1
+                try:
+                    error_msg.append(x._result['msg'])
+                except:
+                    pass
+            elif x.is_failed():
+                failed+=1
+                try:
+                    error_msg.append(x._result['msg'])
+                except:
+                    pass
+
+
         if (expected_num is not None) and (len(ok_results) != expected_num):
             raise ValueError('Unexpected num %s!=%d' % (str(expected_num), len(ok_results)) )
 
-        parsed = parse_unreachable(msg)
-        total_unreachable = parsed['unreachable']
-	failed = parsed['failed']
+        #total_unreachable = stats.unreachable
+
 	if (failed):
 		raise ValueError('Ansible playbook failed.')
 
-        if (total_unreachable > 0):
-            raise ValueError("Unreachable results in ansible recipe")
     except ValueError,e:
-        all_fatal = [e.message] + re.findall(r'^msg: (.*)',msg,re.MULTILINE)
-        all_fatal2 = re.findall(r'^ERROR: (.*)',msg,re.MULTILINE)
-        all_fatal3 = re.findall(r'^failed:.*"msg": "(.*)"',msg,re.MULTILINE)
-
-        all_fatal.extend(all_fatal2)
-        all_fatal.extend(all_fatal3)
         try:
-            error = ' // '.join(all_fatal)
+            error = ' // '.join(error_msg)
         except:
             pass
         raise Exception(error)
 
-    return ok_results
+    if (object):
+        oprops = object.tologdict()
+        for i in ok_results:
+            ansible = i._result
+            ansible['ansible'] = 1
+            c = dict(oprops.items() + ansible.items())
+            logger.info(i._task, extra=c)
+            
+    processed_results = map(lambda x:x._result, ok_results)
+    return processed_results[1:] # 0 is setup
 
-def run_template_ssh(name, opts, path='', expected_num=None):
+def run_template_ssh(name, opts, path='', expected_num=None, object=None):
     instance_name = opts["instance_name"]
     hostname = opts["hostname"]
     private_key = opts["private_key"]
@@ -241,7 +182,7 @@
     print "ANSIBLE_CONFIG=%s" % config_pathname
     print "ANSIBLE_HOSTS=%s" % hosts_pathname
 
-    return run_template(name, opts, path, ansible_config = config_pathname, ansible_hosts = hosts_pathname, run_ansible_script="/opt/xos/synchronizers/base/run_ansible_verbose")
+    return run_template(name, opts, path, ansible_config = config_pathname, ansible_hosts = hosts_pathname, run_ansible_script="/opt/xos/synchronizers/base/run_ansible_verbose", object=object)
 
 
 
diff --git a/xos/synchronizers/base/ansible_runner.py b/xos/synchronizers/base/ansible_runner.py
new file mode 100644
index 0000000..8089b43
--- /dev/null
+++ b/xos/synchronizers/base/ansible_runner.py
@@ -0,0 +1,158 @@
+#!/usr/bin/python
+
+import os
+import sys
+
+from tempfile import NamedTemporaryFile
+from ansible.inventory import Inventory
+from ansible.vars import VariableManager
+from ansible.parsing.dataloader import DataLoader
+from ansible.executor import playbook_executor
+from ansible.utils.display import Display
+from ansible.plugins.callback import CallbackBase
+
+
+class ResultCallback(CallbackBase):
+    def __init__(self):
+        super(ResultCallback, self).__init__()
+        self.results = []
+
+    def v2_runner_on_ok(self, result, **kwargs):
+        self.results.append(result)
+    
+    def v2_runner_on_failed(self, result, **kwargs):
+        host = result._host
+        self.results.append(result)
+
+    def v2_runner_on_unreachable(self, result, **kwargs):
+        host = result._host
+        self.results.append(result)
+
+class Options(object):
+    """
+    Options class to replace Ansible OptParser
+    """
+    def __init__(self, verbosity=None, inventory=None, listhosts=None, subset=None, module_paths=None, extra_vars=None,
+                 forks=None, ask_vault_pass=None, vault_password_files=None, new_vault_password_file=None,
+                 output_file=None, tags=None, skip_tags=None, one_line=None, tree=None, ask_sudo_pass=None, ask_su_pass=None,
+                 sudo=None, sudo_user=None, become=None, become_method=None, become_user=None, become_ask_pass=None,
+                 ask_pass=None, private_key_file=None, remote_user=None, connection=None, timeout=None, ssh_common_args=None,
+                 sftp_extra_args=None, scp_extra_args=None, ssh_extra_args=None, poll_interval=None, seconds=None, check=None,
+                 syntax=None, diff=None, force_handlers=None, flush_cache=None, listtasks=None, listtags=None, module_path=None):
+        self.verbosity = verbosity
+        self.inventory = inventory
+        self.listhosts = listhosts
+        self.subset = subset
+        self.module_paths = module_paths
+        self.extra_vars = extra_vars
+        self.forks = forks
+        self.ask_vault_pass = ask_vault_pass
+        self.vault_password_files = vault_password_files
+        self.new_vault_password_file = new_vault_password_file
+        self.output_file = output_file
+        self.tags = tags
+        self.skip_tags = skip_tags
+        self.one_line = one_line
+        self.tree = tree
+        self.ask_sudo_pass = ask_sudo_pass
+        self.ask_su_pass = ask_su_pass
+        self.sudo = sudo
+        self.sudo_user = sudo_user
+        self.become = become
+        self.become_method = become_method
+        self.become_user = become_user
+        self.become_ask_pass = become_ask_pass
+        self.ask_pass = ask_pass
+        self.private_key_file = private_key_file
+        self.remote_user = remote_user
+        self.connection = connection
+        self.timeout = timeout
+        self.ssh_common_args = ssh_common_args
+        self.sftp_extra_args = sftp_extra_args
+        self.scp_extra_args = scp_extra_args
+        self.ssh_extra_args = ssh_extra_args
+        self.poll_interval = poll_interval
+        self.seconds = seconds
+        self.check = check
+        self.syntax = syntax
+        self.diff = diff
+        self.force_handlers = force_handlers
+        self.flush_cache = flush_cache
+        self.listtasks = listtasks
+        self.listtags = listtags
+        self.module_path = module_path
+
+
+class Runner(object):
+
+    def __init__(self, playbook, run_data, private_key_file=None, verbosity=0):
+
+        self.run_data = run_data
+
+        self.options = Options()
+        self.options.output_file = playbook + '.result'
+        self.options.private_key_file = private_key_file
+        self.options.verbosity = verbosity
+        self.options.connection = 'ssh'  # Need a connection type "smart" or "ssh"
+        #self.options.become = True
+        #self.options.become_method = 'sudo'
+        #self.options.become_user = 'root'
+
+        # Set global verbosity
+        self.display = Display()
+        self.display.verbosity = self.options.verbosity
+        # Executor appears to have it's own 
+        # verbosity object/setting as well
+        playbook_executor.verbosity = self.options.verbosity
+
+        # Become Pass Needed if not logging in as user root
+        #passwords = {'become_pass': become_pass}
+
+        # Gets data from YAML/JSON files
+        self.loader = DataLoader()
+        try:
+            self.loader.set_vault_password(os.environ['VAULT_PASS'])
+        except KeyError:
+            pass
+
+        
+        # All the variables from all the various places
+        self.variable_manager = VariableManager()
+        self.variable_manager.extra_vars = self.run_data
+
+        # Set inventory, using most of above objects
+        self.inventory = Inventory(loader=self.loader, variable_manager=self.variable_manager)
+        self.variable_manager.set_inventory(self.inventory)
+
+
+        # Setup playbook executor, but don't run until run() called
+        self.pbex = playbook_executor.PlaybookExecutor(
+            playbooks=[playbook], 
+            inventory=self.inventory, 
+            variable_manager=self.variable_manager,
+            loader=self.loader, 
+            options=self.options, 
+            passwords={})
+
+    def run(self):
+        # Results of PlaybookExecutor
+        os.environ['REQUESTS_CA_BUNDLE'] = '/usr/local/share/ca-certificates/local_certs.crt'
+	callback = ResultCallback()
+	self.pbex._tqm._stdout_callback = callback
+
+        self.pbex.run()
+        stats = self.pbex._tqm._stats
+
+        # Test if success for record_logs
+        run_success = True
+        hosts = sorted(stats.processed.keys())
+        for h in hosts:
+            t = stats.summarize(h)
+            if t['unreachable'] > 0 or t['failures'] > 0:
+                run_success = False
+
+
+        #os.remove(self.hosts.name)
+
+        return stats,callback.results
+