Read airflow connection parameters properly
Read configuration json file properly even if it doesn't define all parameters
Add emit command to workflow_ctl
Add some example commands for emit message & register essence
Make docker-compose configuration files to allow communication between containers
Rework kickstarter as api_client does not work properly

Change-Id: I59d00bfe17027a7ab367e6acde6a9eaaed3b6937
diff --git a/src/cord_workflow_airflow_extensions/config.yml b/src/cord_workflow_airflow_extensions/config.yml
new file mode 100644
index 0000000..a137ecd
--- /dev/null
+++ b/src/cord_workflow_airflow_extensions/config.yml
@@ -0,0 +1,29 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: cord-workflow-airflow-extensions
+accessor:
+  username: "admin@opencord.org"
+  password: "letmein"
+  endpoint: "xos-core:50051"
+logging:
+  version: 1
+  handlers:
+    console:
+      class: logging.StreamHandler
+  loggers:
+    '':
+      handlers:
+        - console
+      level: DEBUG
diff --git a/src/cord_workflow_airflow_extensions/cord_workflow_plugin.py b/src/cord_workflow_airflow_extensions/cord_workflow_plugin.py
index 152862f..2532477 100644
--- a/src/cord_workflow_airflow_extensions/cord_workflow_plugin.py
+++ b/src/cord_workflow_airflow_extensions/cord_workflow_plugin.py
@@ -65,13 +65,28 @@
             # find connection info from database or environment
             # ENV: AIRFLOW_CONN_CORD_CONTROLLER_DEFAULT
             connection_params = self.get_connection(self.controller_conn_id)
-            # connection_params have three fields
+            # 'connection_params' has following fields
+            # schema
             # host
+            # port
             # login - we don't use this yet
             # password - we don't use this yet
             try:
                 self.workflow_run_client = WorkflowRun(self.workflow_id, self.workflow_run_id)
-                self.workflow_run_client.connect(connection_params.host)
+                schema = connection_params.schema
+                if not schema:
+                    schema = 'http'
+
+                host = connection_params.host
+                if not host:
+                    host = 'localhost'
+
+                port = connection_params.port
+                if (not port) or (port <= 0):
+                    port = 3030
+
+                url = '%s://%s:%s' % (schema, host, port)
+                self.workflow_run_client.connect(url)
             except BaseException as ex:
                 raise CORDWorkflowControllerException(ex)
 
@@ -89,17 +104,6 @@
 
         self.workflow_run_client = None
 
-    def update_status(self, task_id, status):
-        """
-        Update status of the workflow run.
-        'state' should be one of ['begin', 'end']
-        """
-        client = self.get_conn()
-        try:
-            return client.update_status(task_id, status)
-        except BaseException as ex:
-            raise CORDWorkflowControllerException(ex)
-
     def count_events(self):
         """
         Count queued events for the workflow run.
@@ -191,6 +195,8 @@
             **kwargs):
         super().__init__(*args, **kwargs)
 
+        self.log.debug('Initializing CORD EventSensor for topic %s' % topic)
+
         self.topic = topic
         self.key_field = key_field
         self.controller_conn_id = controller_conn_id
@@ -201,20 +207,20 @@
         """
         Return connection hook.
         """
+        self.log.debug('Creating a hook for run_id %s' % context['dag_run'].run_id)
         return CORDWorkflowControllerHook(self.dag_id, context['dag_run'].run_id, self.controller_conn_id)
 
     def execute(self, context):
         """
         Overridden to allow messages to be passed to next tasks via XCOM
         """
+        self.log.debug('Executing a task %s for run_id %s' % (self.task_id, context['dag_run'].run_id))
+
         if self.hook is None:
             self.hook = self.__create_hook(context)
 
-        self.hook.update_status(self.task_id, 'begin')
-
         super().execute(context)
 
-        self.hook.update_status(self.task_id, 'end')
         self.hook.close_conn()
         self.hook = None
         return self.message
@@ -223,6 +229,7 @@
         # we need to use notification to immediately react at event
         # https://github.com/apache/airflow/blob/master/airflow/sensors/base_sensor_operator.py#L122
         self.log.info('Poking : trying to fetch a message with a topic %s', self.topic)
+
         event = self.hook.fetch_event(self.task_id, self.topic)
         if event:
             self.message = event
@@ -244,7 +251,7 @@
             *args,
             **kwargs):
         topic = 'datamodel.%s' % model_name
-        super().__init__(topic=topic, *args, **kwargs)
+        super().__init__(topic=topic, key_field=key_field, controller_conn_id=controller_conn_id, *args, **kwargs)
 
 
 """
@@ -254,7 +261,7 @@
 
 # Defining the plugin class
 class CORD_Workflow_Airflow_Plugin(AirflowPlugin):
-    name = "CORD_Workflow_Airflow_Plugin"
+    name = "cord_workflow_plugin"
     operators = [CORDModelOperator]
     sensors = [CORDEventSensor, CORDModelSensor]
     hooks = [CORDWorkflowControllerHook]
diff --git a/src/cord_workflow_airflow_extensions/kickstarter.py b/src/cord_workflow_airflow_extensions/kickstarter.py
deleted file mode 100644
index 6f9924e..0000000
--- a/src/cord_workflow_airflow_extensions/kickstarter.py
+++ /dev/null
@@ -1,236 +0,0 @@
-#!/usr/bin/env python3
-
-# Copyright 2019-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Workflow Kickstarter
-
-This module kickstarts Airflow workflows for requests from Workflow Controller
-"""
-
-import json
-import os.path
-import argparse
-import pyfiglet
-import traceback
-import socket
-import time
-
-from multistructlog import create_logger
-from cord_workflow_controller_client.manager import Manager
-from importlib import import_module
-from urlparse import urlparse
-from airflow import configuration as AirflowConf
-from airflow import api
-from airflow.models import DagRun
-
-
-log = create_logger()
-manager = None
-airflow_client = None
-
-progargs = {
-    'controller_url': 'http://localhost:3030',
-    'logging': None
-}
-
-DEFAULT_CONFIG_FILE_PATH = '/etc/cord_workflow_airflow_extensions/config.json'
-SOCKET_CONNECTION_TEST_TIMEOUT = 5
-DEFAULT_CONNECTION_TEST_DELAY = 5
-DEFAULT_CONNECTION_TEST_RETRY = 999999
-
-
-def print_graffiti():
-    result = pyfiglet.figlet_format("CORD\nWorkflow\nKickstarter", font="graffiti")
-    print(result)
-
-
-def get_arg_parser():
-    parser = argparse.ArgumentParser(description='CORD Workflow Kickstarter Daemon.', prog='kickstarter')
-    parser.add_argument('--config', help='locate a configuration file')
-    parser.add_argument('--controller', help='CORD Workflow Controller URL')
-    return parser
-
-
-def read_config(path):
-    if os.path.exists(path):
-        with open(path) as json_config_file:
-            data = json.load(json_config_file)
-            return data
-    return {}
-
-
-def pretty_format_json(j):
-    dumps = json.dumps(j, sort_keys=True, indent=4, separators=(',', ': '))
-    return dumps
-
-
-def is_port_open(url, timeout):
-    o = urlparse(url)
-    hostname = o.hostname
-    port = o.port
-
-    if (not port) or port <= 0:
-        if o.scheme.lower() == 'http':
-            port = 80
-        elif o.scheme.lower() == 'https':
-            port = 443
-
-    succeed = False
-    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-    sock.settimeout(timeout)
-    try:
-        sock.connect((hostname, int(port)))
-        sock.shutdown(socket.SHUT_RDWR)
-        succeed = True
-    except BaseException:
-        pass
-    finally:
-        sock.close()
-
-    return succeed
-
-
-def check_web_live(url,
-                   retry=DEFAULT_CONNECTION_TEST_RETRY,
-                   delay=DEFAULT_CONNECTION_TEST_DELAY,
-                   timeout=SOCKET_CONNECTION_TEST_TIMEOUT):
-    ipup = False
-    for _ in range(retry):
-        if is_port_open(url, timeout):
-            ipup = True
-            break
-        else:
-            time.sleep(delay)
-    return ipup
-
-
-def on_kickstart(workflow_id, workflow_run_id):
-    if manager and airflow_client:
-        try:
-            log.info('> Kickstarting a workflow (%s) => workflow run (%s)' % (workflow_id, workflow_run_id))
-
-            airflow_client.trigger_dag(dag_id=workflow_id, run_id=workflow_run_id)
-            message = airflow_client.trigger_dag(
-                dag_id=workflow_id,
-                run_id=workflow_run_id
-            )
-            log.info('> Airflow Response: %s' % message)
-
-            # let controller know that the new workflow run is created
-            log.info('> Notifying a workflow (%s), a workflow run (%s)' % (workflow_id, workflow_run_id))
-            manager.notify_new_workflow_run(workflow_id, workflow_run_id)
-        except Exception as e:
-            log.error('> Error : %s' % e)
-            log.debug(traceback.format_exc())
-
-
-def on_check_state(workflow_id, workflow_run_id):
-    if manager and airflow_client:
-        try:
-            log.info('> Checking state of a workflow (%s) => workflow run (%s)' % (workflow_id, workflow_run_id))
-
-            run = DagRun.find(dag_id=workflow_id, run_id=workflow_run_id)
-            state = 'unknown'
-            if run:
-                # run is an array
-                # this should be one of ['success', 'running', 'failed']
-                state = run[0].state
-            else:
-                log.error(
-                    'Cannot retrieve state of a workflow run (%s, %s)' %
-                    (workflow_id, workflow_run_id)
-                )
-                state = 'unknown'
-
-            log.info('> state : %s' % state)
-
-            # let controller know the state of the workflow run
-            log.info(
-                '> Notifying update of state of a workflow (%s), a workflow run (%s) - state : %s' %
-                (workflow_id, workflow_run_id, state)
-            )
-            manager.report_workflow_run_state(workflow_id, workflow_run_id, state)
-        except Exception as e:
-            log.error('> Error : %s' % e)
-            log.debug(traceback.format_exc())
-
-
-# for command-line execution
-def main(args):
-    print_graffiti()
-
-    # check if config path is set
-    config_file_path = DEFAULT_CONFIG_FILE_PATH
-    if args.config:
-        config_file_path = args.config
-
-    if os.path.exists(config_file_path):
-        # read config
-        config = read_config(config_file_path)
-        if config:
-            global progargs
-            for k in progargs:
-                # overwrite
-                progargs[k] = config[k]
-
-    global log
-    log = create_logger(progargs["logging"])
-
-    if args.controller:
-        progargs['controller_url'] = args.controller
-
-    print('=CONFIG=')
-    config_json_string = pretty_format_json(progargs)
-    print(config_json_string)
-    print('\n')
-
-    # checking controller and airflow web interface
-    log.info('Checking if Workflow Controller (%s) is live...' % progargs['controller_url'])
-    controller_live = check_web_live(progargs['controller_url'])
-    if not controller_live:
-        log.error('Controller (%s) appears to be down' % progargs['controller_url'])
-        raise IOError('Controller (%s) appears to be down' % progargs['controller_url'])
-
-    # connect to workflow controller
-    log.info('Connecting to Workflow Controller (%s)...' % progargs['controller_url'])
-    global manager
-    manager = Manager(logger=log)
-    manager.connect(progargs['controller_url'])
-    manager.set_handlers({'kickstart': on_kickstart})
-
-    # connect to airflow
-    global airflow_client
-    log.info('Connecting to Airflow...')
-
-    api.load_auth()
-    api_module = import_module(AirflowConf.get('cli', 'api_client'))
-    airflow_client = api_module.Client(
-        api_base_url=AirflowConf.get('cli', 'endpoint_url'),
-        auth=api.api_auth.client_auth
-    )
-
-    log.info('Waiting for kickstart events from Workflow Controller...')
-    try:
-        manager.wait()
-    finally:
-        log.info('Terminating the program...')
-        manager.disconnect()
-
-
-if __name__ == "__main__":
-    parser = get_arg_parser()
-    args = parser.parse_args()
-    main(args)
diff --git a/src/tools/__init__.py b/src/tools/__init__.py
new file mode 100644
index 0000000..19d1424
--- /dev/null
+++ b/src/tools/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/src/tools/config.json b/src/tools/config.json
new file mode 100644
index 0000000..601d686
--- /dev/null
+++ b/src/tools/config.json
@@ -0,0 +1,4 @@
+{
+    "controller_url": "http://controller:3030",
+    "airflow_bin": "/usr/local/bin"
+}
diff --git a/src/cord_workflow_airflow_extensions/essence_extractor.py b/src/tools/essence_extractor.py
similarity index 99%
rename from src/cord_workflow_airflow_extensions/essence_extractor.py
rename to src/tools/essence_extractor.py
index 72c8fdb..6335dc8 100644
--- a/src/cord_workflow_airflow_extensions/essence_extractor.py
+++ b/src/tools/essence_extractor.py
@@ -611,7 +611,8 @@
             global progargs
             for k in progargs:
                 # overwrite
-                progargs[k] = config[k]
+                if k in config:
+                    progargs[k] = config[k]
 
     log = create_logger(progargs["logging"])
 
diff --git a/src/tools/kickstarter.py b/src/tools/kickstarter.py
new file mode 100644
index 0000000..7c5a9de
--- /dev/null
+++ b/src/tools/kickstarter.py
@@ -0,0 +1,377 @@
+#!/usr/bin/env python3
+
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Workflow Kickstarter
+
+This module kickstarts Airflow workflows for requests from Workflow Controller
+"""
+
+import json
+import os
+import os.path
+import argparse
+import pyfiglet
+import traceback
+import socket
+import time
+import subprocess
+
+from multistructlog import create_logger
+from cord_workflow_controller_client.manager import Manager
+
+# We can't use experimental APIs for managing workflows/workflow runs of Airflow
+# - REST API does not provide sufficient features at this version
+# - API_Client does not work if a caller is not in main thread
+
+# from importlib import import_module
+# from airflow import configuration as AirflowConf
+# from airflow import api
+# from airflow.models import DagRun
+
+try:
+    from urllib.parse import urlparse
+except ImportError:
+    from urlparse import urlparse
+
+log = create_logger()
+manager = None
+# airflow_client = None
+
+airflow_bin = os.getenv('AIRFLOW_BIN', '/usr/local/bin')
+
+progargs = {
+    'controller_url': 'http://localhost:3030',
+    'airflow_bin': airflow_bin,
+    'logging': None
+}
+
+DEFAULT_CONFIG_FILE_PATH = '/etc/cord_workflow_airflow_extensions/config.json'
+SOCKET_CONNECTION_TEST_TIMEOUT = 5
+DEFAULT_CONNECTION_TEST_DELAY = 5
+DEFAULT_CONNECTION_TEST_RETRY = 999999
+
+
+def print_graffiti():
+    result = pyfiglet.figlet_format("CORD\nWorkflow\nKickstarter", font="graffiti")
+    print(result)
+
+
+def get_arg_parser():
+    parser = argparse.ArgumentParser(description='CORD Workflow Kickstarter Daemon.', prog='kickstarter')
+    parser.add_argument('--config', help='locate a configuration file')
+    parser.add_argument('--controller', help='CORD Workflow Controller URL')
+    return parser
+
+
+def read_config(path):
+    if os.path.exists(path):
+        with open(path) as json_config_file:
+            data = json.load(json_config_file)
+            return data
+    return {}
+
+
+def pretty_format_json(j):
+    dumps = json.dumps(j, sort_keys=True, indent=4, separators=(',', ': '))
+    return dumps
+
+
+def is_port_open(url, timeout):
+    o = urlparse(url)
+    hostname = o.hostname
+    port = o.port
+
+    if (not port) or port <= 0:
+        if o.scheme.lower() == 'http':
+            port = 80
+        elif o.scheme.lower() == 'https':
+            port = 443
+
+    succeed = False
+    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    sock.settimeout(timeout)
+    try:
+        sock.connect((hostname, int(port)))
+        sock.shutdown(socket.SHUT_RDWR)
+        succeed = True
+    except BaseException:
+        pass
+    finally:
+        sock.close()
+
+    return succeed
+
+
+def check_web_live(url,
+                   retry=DEFAULT_CONNECTION_TEST_RETRY,
+                   delay=DEFAULT_CONNECTION_TEST_DELAY,
+                   timeout=SOCKET_CONNECTION_TEST_TIMEOUT):
+    ipup = False
+    for _ in range(retry):
+        if is_port_open(url, timeout):
+            ipup = True
+            break
+        else:
+            time.sleep(delay)
+    return ipup
+
+
+def get_airflow_cli():
+    return os.path.join(progargs['airflow_bin'], 'airflow')
+
+
+def check_airflow_live():
+    try:
+        subprocess.check_call([get_airflow_cli(), 'list_dags'])
+        return True
+    except subprocess.CalledProcessError as e:
+        log.error(e)
+        return False
+
+
+def on_kickstart(workflow_id, workflow_run_id):
+    # if manager and airflow_client:
+    if manager:
+        try:
+            created = False
+            log.info('> Kickstarting a workflow (%s) => workflow run (%s)' % (workflow_id, workflow_run_id))
+            # message = airflow_client.trigger_dag(
+            #     dag_id=workflow_id,
+            #     run_id=workflow_run_id
+            # )
+            # log.info('> Airflow Response: %s' % message)
+
+            output = subprocess.Popen(
+                [get_airflow_cli(), 'trigger_dag', '-r', workflow_run_id, workflow_id],
+                stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
+                encoding='utf8'
+            )
+
+            for line in output.stdout.readlines():
+                if 'Created <DagRun ' in line:
+                    created = True
+                    break
+
+            if created:
+                # let controller know that the new workflow run is created
+                log.info('> Notifying a new workflow run (%s) for a workflow (%s)' % (workflow_run_id, workflow_id))
+                manager.report_new_workflow_run(workflow_id, workflow_run_id)
+        except subprocess.CalledProcessError as e:
+            # when shell exited with non-zero code
+            log.error('> Error : %s' % e)
+        except Exception as e:
+            log.error('> Error : %s' % e)
+            log.debug(traceback.format_exc())
+
+
+def on_check_status(workflow_id, workflow_run_id):
+    # if manager and airflow_client:
+    if manager:
+        try:
+            status = 'unknown'
+            log.info('> Checking status of workflow run (%s)' % (workflow_run_id))
+
+            # run = DagRun.find(dag_id=workflow_id, run_id=workflow_run_id)
+            # status = 'unknown'
+            # if run:
+            #     # run is an array
+            #     # this should be one of ['success', 'running', 'failed']
+            #     status = run[0].state
+            # else:
+            #     log.error(
+            #         'Cannot retrieve status of a workflow run (%s, %s)' %
+            #         (workflow_id, workflow_run_id)
+            #     )
+            #     status = 'unknown'
+
+            output = subprocess.Popen(
+                [get_airflow_cli(), 'list_dag_runs', workflow_id],
+                stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
+                encoding='utf8'
+            )
+
+            title = False
+            body = False
+            for line in output.stdout.readlines():
+                if 'DAG RUNS' in line:
+                    title = True
+                elif title and ('--------' in line):
+                    body = True
+                elif body:
+                    # id  | run_id | state | execution_date | state_date |
+                    if workflow_run_id in line:
+                        # found the line
+                        # 1 | ssss | running | 2019-07-25T21:35:06+00:00 |
+                        # 2019-07-25T21:35:06.242130+00:00 |
+                        fields = line.split('|')
+                        status = fields[2].strip()
+                        break
+
+            log.info('> status : %s' % status)
+
+            # let controller know the status of the workflow run
+            log.info(
+                '> Updating status of a workflow run (%s) - status : %s' %
+                (workflow_run_id, status)
+            )
+            manager.report_workflow_run_status(workflow_id, workflow_run_id, status)
+        except subprocess.CalledProcessError as e:
+            # when shell exited with non-zero code
+            log.error('> Error : %s' % e)
+        except Exception as e:
+            log.error('> Error : %s' % e)
+            log.debug(traceback.format_exc())
+
+
+def on_check_status_bulk(requests):
+    # if manager and airflow_client:
+    if requests:
+        req = {}
+        for req in requests:
+            workflow_id = req['workflow_id']
+            workflow_run_id = req['workflow_run_id']
+
+            if workflow_id not in req:
+                req[workflow_id] = []
+
+            req[workflow_id].append(workflow_run_id)
+
+        if manager:
+            try:
+                log.info('> Checking status of workflow runs')
+
+                statuses = []
+                for workflow_id in req:
+                    workflow_run_ids = req[workflow_id]
+
+                    output = subprocess.Popen(
+                        [get_airflow_cli(), 'list_dag_runs', workflow_id],
+                        stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
+                        encoding='utf8'
+                    )
+
+                    title = False
+                    body = False
+                    for line in output.stdout.readlines():
+                        if 'DAG RUNS' in line:
+                            title = True
+                        elif title and ('--------' in line):
+                            body = True
+                        elif body:
+                            # id  | run_id | state | execution_date | state_date |
+                            for workflow_run_id in workflow_run_ids:
+                                if workflow_run_id in line:
+                                    # found the line
+                                    # 1 | ssss | running | 2019-07-25T21:35:06+00:00 |
+                                    # 2019-07-25T21:35:06.242130+00:00 |
+                                    fields = line.split('|')
+                                    status = fields[2].strip()
+
+                                    log.info('> status of a workflow run (%s) : %s' % (workflow_run_id, status))
+                                    statuses.append({
+                                        'workflow_id': workflow_id,
+                                        'workflow_run_id': workflow_run_id,
+                                        'status': status
+                                    })
+
+                # let controller know statuses of workflow runs
+                log.info('> Updating status of workflow runs')
+                manager.report_workflow_run_status_bulk(statuses)
+            except subprocess.CalledProcessError as e:
+                # when shell exited with non-zero code
+                log.error('> Error : %s' % e)
+            except Exception as e:
+                log.error('> Error : %s' % e)
+                log.debug(traceback.format_exc())
+
+
+# for command-line execution
+def main(args):
+    print_graffiti()
+
+    # check if config path is set
+    config_file_path = DEFAULT_CONFIG_FILE_PATH
+    if args.config:
+        config_file_path = args.config
+
+    if os.path.exists(config_file_path):
+        # read config
+        config = read_config(config_file_path)
+        if config:
+            global progargs
+            for k in progargs:
+                # overwrite
+                if k in config:
+                    progargs[k] = config[k]
+
+    global log
+    log = create_logger(progargs["logging"])
+
+    if args.controller:
+        progargs['controller_url'] = args.controller
+
+    print('=CONFIG=')
+    config_json_string = pretty_format_json(progargs)
+    print(config_json_string)
+    print('\n')
+
+    # checking controller and airflow web interface
+    log.info('Checking if Workflow Controller (%s) is live...' % progargs['controller_url'])
+    controller_live = check_web_live(progargs['controller_url'])
+    if not controller_live:
+        log.error('Controller (%s) appears to be down' % progargs['controller_url'])
+        raise IOError('Controller (%s) appears to be down' % progargs['controller_url'])
+
+    airflow_live = check_airflow_live()
+    if not airflow_live:
+        log.error('Airflow appears to be down')
+        raise IOError('Airflow appears to be down')
+
+    # connect to workflow controller
+    log.info('Connecting to Workflow Controller (%s)...' % progargs['controller_url'])
+    global manager
+    manager = Manager(logger=log)
+    manager.connect(progargs['controller_url'])
+    manager.set_handlers({
+        'kickstart': on_kickstart,
+        'check_status': on_check_status,
+        'check_status_bulk': on_check_status_bulk
+    })
+
+    # connect to airflow
+    # global airflow_client
+    # log.info('Connecting to Airflow...')
+
+    # api.load_auth()
+    # api_module = import_module(AirflowConf.get('cli', 'api_client'))
+    # airflow_client = api_module.Client(
+    #     api_base_url=AirflowConf.get('cli', 'endpoint_url'),
+    #     auth=api.api_auth.client_auth
+    # )
+
+    log.info('Waiting for kickstart events from Workflow Controller...')
+    try:
+        manager.wait()
+    finally:
+        log.info('Terminating the program...')
+        manager.disconnect()
+
+
+if __name__ == "__main__":
+    parser = get_arg_parser()
+    args = parser.parse_args()
+    main(args)
diff --git a/src/cord_workflow_airflow_extensions/workflow_ctl.py b/src/tools/workflow_ctl.py
similarity index 70%
rename from src/cord_workflow_airflow_extensions/workflow_ctl.py
rename to src/tools/workflow_ctl.py
index 563d966..751356a 100644
--- a/src/cord_workflow_airflow_extensions/workflow_ctl.py
+++ b/src/tools/workflow_ctl.py
@@ -23,9 +23,11 @@
 import json
 import os.path
 import argparse
+import re
 
 from multistructlog import create_logger
 from cord_workflow_controller_client.manager import Manager
+from cord_workflow_controller_client.probe import Probe
 
 
 log = create_logger()
@@ -72,6 +74,27 @@
     return None
 
 
+def read_json_string(str):
+    if str:
+        try:
+            return json.loads(str)
+        except json.decoder.JSONDecodeError:
+            return load_dirty_json(str)
+    return None
+
+
+def load_dirty_json(dirty_json):
+    regex_replace = [
+        (r"([ \{,:\[])(u)?'([^']+)'", r'\1"\3"'),
+        (r" False([, \}\]])", r' false\1'),
+        (r" True([, \}\]])", r' true\1')
+    ]
+    for r, s in regex_replace:
+        dirty_json = re.sub(r, s, dirty_json)
+    clean_json = json.loads(dirty_json)
+    return clean_json
+
+
 def register_workflow(args):
     # expect args should be a list of essence files
     if not args:
@@ -95,7 +118,7 @@
             log.info('Registering an essence file (%s)...' % essence_file)
             result = manager.register_workflow_essence(essence)
             if result:
-                log.inof('registered an essence file (%s)' % essence_file)
+                log.info('registered an essence file (%s)' % essence_file)
             else:
                 log.error('cannot register an essence file (%s)' % essence_file)
 
@@ -107,6 +130,33 @@
     return results
 
 
+def emit_event(args):
+    # expect args should be a json event
+    if not args or len(args) != 2:
+        raise InputError('parameter should be <topic> <message>')
+
+    log.info('Connecting to Workflow Controller (%s)...' % progargs['controller_url'])
+    probe = Probe(logger=log)
+    connected = False
+
+    try:
+        probe.connect(progargs['controller_url'])
+        connected = True
+
+        topic = args[0]
+        message = read_json_string(args[1])
+
+        log.info('Emitting an event (%s - %s)...' % (topic, message))
+        probe.emit_event(topic, message)
+        log.info('Emitted an event (%s - %s)...' % (topic, message))
+        return True
+    finally:
+        if connected:
+            probe.disconnect()
+
+    return False
+
+
 # for command-line execution
 def main(args):
     # check if config path is set
@@ -121,7 +171,8 @@
             global progargs
             for k in progargs:
                 # overwrite
-                progargs[k] = config[k]
+                if k in config:
+                    progargs[k] = config[k]
 
     global log
     log = create_logger(progargs["logging"])
@@ -133,6 +184,9 @@
         if args.cmd.strip().lower() in ['reg', 'register', 'register_workflow']:
             results = register_workflow(args.cmd_args)
             print(results)
+        elif args.cmd.strip().lower() in ['emit', 'send', 'event', 'message']:
+            results = emit_event(args.cmd_args)
+            print(results)
         else:
             log.error('unknown command %s' % args.cmd)
             raise InputError('unknown command %s' % args.cmd)