Implement controller client
- Probe, Manager, WorkflowRun classes are provided to interact with CORD Workflow Controller

Change-Id: I0ad8d3661864635d9701eab1cb089cb17f81cd50
diff --git a/test/__init__.py b/test/__init__.py
new file mode 100644
index 0000000..19d1424
--- /dev/null
+++ b/test/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/test/dummy_server.py b/test/dummy_server.py
new file mode 100644
index 0000000..9c22968
--- /dev/null
+++ b/test/dummy_server.py
@@ -0,0 +1,636 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+import socketio
+import psutil
+import time
+import datetime
+
+from threading import Timer
+from gevent import pywsgi
+from geventwebsocket.handler import WebSocketHandler
+from multiprocessing import Process
+from multistructlog import create_logger
+from cord_workflow_controller_client.probe import GREETING
+from cord_workflow_controller_client.manager \
+    import (WORKFLOW_KICKSTART,
+            WORKFLOW_REGISTER, WORKFLOW_REGISTER_ESSENCE, WORKFLOW_LIST, WORKFLOW_LIST_RUN,
+            WORKFLOW_CHECK, WORKFLOW_REMOVE, WORKFLOW_REMOVE_RUN, WORKFLOW_NOTIFY_NEW_RUN)
+from cord_workflow_controller_client.workflow_run \
+    import (WORKFLOW_RUN_NOTIFY_EVENT,
+            WORKFLOW_RUN_UPDATE_STATUS, WORKFLOW_RUN_COUNT_EVENTS, WORKFLOW_RUN_FETCH_EVENT)
+
+
+"""
+Run a dummy socket.io server as a separate process.
+serve_forever() blocks until the process is killed,
+so I had to use multi-process approach.
+"""
+
+log = create_logger()
+
+# Socket IO
+sio = None
+
+manager_clients = {}
+workflows = {}
+workflow_essences = {}
+workflow_runs = {}
+workflow_run_clients = {}
+seq_no = 1
+
+
+class repeatableTimer():
+    def __init__(self, time, handler, arg):
+        self.time = time
+        self.handler = handler
+        self.arg = arg
+        self.thread = Timer(self.time, self.on_tick)
+
+    def on_tick(self):
+        self.handler(self.arg)
+        self.thread = Timer(self.time, self.on_tick)
+        self.thread.start()
+
+    def start(self):
+        self.thread.start()
+
+    def cancel(self):
+        self.thread.cancel()
+
+
+def make_query_string_dict(query_string):
+    obj = {}
+    params = query_string.split('&')
+    for param in params:
+        kv = param.split('=')
+        key = kv[0]
+        val = kv[1]
+        obj[key] = val
+
+    return obj
+
+
+def _send_kickstart_event(sid):
+    global seq_no
+
+    workflow_id = 'dummy_workflow_%d' % seq_no
+    workflow_run_id = 'dummy_workflow_run_%d' % seq_no
+
+    seq_no += 1
+    log.info('sending a kickstart event to sid %s' % sid)
+    sio.emit(
+        event=WORKFLOW_KICKSTART,
+        data={
+            'workflow_id': workflow_id,
+            'workflow_run_id': workflow_run_id,
+            'timestamp': str(datetime.datetime.now())
+        },
+        room=sid
+    )
+
+
+def _send_notify_event(sid):
+    global seq_no
+
+    topic = 'topic_%s' % seq_no
+    message = {
+        'sample_key': 'sample_value'
+    }
+    seq_no += 1
+
+    run_client = workflow_run_clients[sid]
+    if run_client:
+        workflow_run_id = run_client['workflow_run_id']
+        workflow_run = workflow_runs[workflow_run_id]
+        if workflow_run:
+            workflow_run['queue'].append({
+                'topic': topic,
+                'message': message
+            })
+
+            log.info('sending a notify event to sid %s' % sid)
+            sio.emit(
+                event=WORKFLOW_RUN_NOTIFY_EVENT,
+                data={
+                    'topic': topic,
+                    'timestamp': str(datetime.datetime.now())
+                },
+                room=sid
+            )
+
+
+def _handle_event_connect(sid, query):
+    sio.emit(GREETING, {})
+
+    global last_client_action_time
+    last_client_action_time = datetime.datetime.now
+
+    # if the client is a manager, send kickstart events every 3 sec
+    if query['type'] == 'workflow_manager':
+        log.info('manager (%s) is connected' % sid)
+        kickstart_timer = repeatableTimer(2, _send_kickstart_event, sid)
+        manager_clients[sid] = {
+            'kickstart_timer': kickstart_timer
+        }
+
+        kickstart_timer.start()
+    elif query['type'] == 'workflow_run':
+        log.info('workflow run (%s) is connected' % sid)
+        notify_event_timer = repeatableTimer(2, _send_notify_event, sid)
+        workflow_run_clients[sid] = {
+            'workflow_id': query['workflow_id'],
+            'workflow_run_id': query['workflow_run_id'],
+            'notify_event_timer': notify_event_timer
+        }
+
+        notify_event_timer.start()
+
+
+def _handle_event_disconnect(sid):
+    if sid in manager_clients:
+        log.info('manager (%s) is disconnected' % sid)
+        if manager_clients[sid]['kickstart_timer']:
+            manager_clients[sid]['kickstart_timer'].cancel()
+
+        del manager_clients[sid]
+
+    if sid in workflow_run_clients:
+        log.info('workflow run (%s) is disconnected' % sid)
+        if workflow_run_clients[sid]['notify_event_timer']:
+            workflow_run_clients[sid]['notify_event_timer'].cancel()
+
+        del workflow_run_clients[sid]
+
+    global last_client_action_time
+    last_client_action_time = datetime.datetime.now
+
+
+def _get_req_id(body):
+    req_id = 101010
+    if 'req_id' in body:
+        req_id = int(body['req_id'])
+    return req_id
+
+
+def _handle_event_workflow_reg(sid, body):
+    data = {
+        'req_id': _get_req_id(body)
+    }
+
+    if 'workflow' in body:
+        workflow = body['workflow']
+        workflow_id = workflow['id']
+
+        if workflow_id in workflows:
+            # already exist
+            data['error'] = True
+            data['result'] = False
+            data['message'] = 'workflow is already registered'
+        else:
+            log.info('manager (%s) registers a workflow (%s)' % (sid, workflow_id))
+            workflows[workflow_id] = workflow
+
+            data['error'] = False
+            data['result'] = True
+    else:
+        data['error'] = True
+        data['result'] = False
+        data['message'] = 'workflow is not in the message body'
+
+    log.info('returning a result for workflow register event to sid %s' % sid)
+    sio.emit(
+        event=WORKFLOW_REGISTER,
+        data=data,
+        room=sid
+    )
+
+
+def _handle_event_workflow_reg_essence(sid, body):
+    data = {
+        'req_id': _get_req_id(body)
+    }
+
+    if 'essence' in body:
+        essence = body['essence']
+        for wid in essence:
+            workflow_essence = essence[wid]
+            if 'dag' in workflow_essence and 'dag_id' in workflow_essence['dag']:
+                dag = workflow_essence['dag']
+                workflow_id = dag['dag_id']
+
+                if workflow_id in workflow_essences or workflow_id in workflows:
+                    # already exist
+                    data['error'] = True
+                    data['result'] = False
+                    data['message'] = 'workflow is already registered'
+                else:
+                    log.info('manager (%s) registers a workflow (%s)' % (sid, workflow_id))
+                    workflow_essences[workflow_id] = workflow_essence
+
+                    data['error'] = False
+                    data['result'] = True
+            else:
+                data['error'] = True
+                data['result'] = False
+                data['message'] = 'essence is not in the message body'
+    else:
+        data['error'] = True
+        data['result'] = False
+        data['message'] = 'essence is not in the message body'
+
+    log.info('returning a result for workflow essence register event to sid %s' % sid)
+    sio.emit(
+        event=WORKFLOW_REGISTER_ESSENCE,
+        data=data,
+        room=sid
+    )
+
+
+def _handle_event_workflow_list(sid, body):
+    data = {
+        'req_id': _get_req_id(body)
+    }
+
+    workflow_ids = []
+
+    for workflow_id in workflows:
+        workflow_ids.append(workflow_id)
+
+    for workflow_id in workflow_essences:
+        workflow_ids.append(workflow_id)
+
+    data['error'] = False
+    data['result'] = workflow_ids
+
+    log.info('returning a result for workflow list event to sid %s' % sid)
+    sio.emit(
+        event=WORKFLOW_LIST,
+        data=data,
+        room=sid
+    )
+
+
+def _handle_event_workflow_run_list(sid, body):
+    data = {
+        'req_id': _get_req_id(body)
+    }
+
+    workflow_run_ids = []
+
+    for workflow_run_id in workflow_runs:
+        workflow_run_ids.append(workflow_run_id)
+
+    data['error'] = False
+    data['result'] = workflow_run_ids
+
+    log.info('returning a result for workflow run list event to sid %s' % sid)
+    sio.emit(
+        event=WORKFLOW_LIST_RUN,
+        data=data,
+        room=sid
+    )
+
+
+def _handle_event_workflow_check(sid, body):
+    data = {
+        'req_id': _get_req_id(body)
+    }
+
+    if 'workflow_id' in body:
+        workflow_id = body['workflow_id']
+        if workflow_id in workflows:
+            data['error'] = False
+            data['result'] = True
+        else:
+            data['error'] = False
+            data['result'] = False
+    else:
+        data['error'] = True
+        data['result'] = False
+        data['message'] = 'workflow_id is not in the message body'
+
+    log.info('returning a result for workflow check event to sid %s' % sid)
+    sio.emit(
+        event=WORKFLOW_CHECK,
+        data=data,
+        room=sid
+    )
+
+
+def _handle_event_workflow_remove(sid, body):
+    data = {
+        'req_id': _get_req_id(body)
+    }
+
+    if 'workflow_id' in body:
+        workflow_id = body['workflow_id']
+        if workflow_id in workflows:
+
+            hasWorkflowRuns = False
+            for workflow_run_id in workflow_runs:
+                workflow_run = workflow_runs[workflow_run_id]
+                wid = workflow_run['workflow_id']
+                if wid == workflow_id:
+                    # there is a workflow run for the workflow id
+                    hasWorkflowRuns = True
+                    break
+
+            if hasWorkflowRuns:
+                data['error'] = False
+                data['result'] = False
+            else:
+                del workflows[workflow_id]
+
+                data['error'] = False
+                data['result'] = True
+        else:
+            data['error'] = False
+            data['result'] = False
+    else:
+        data['error'] = True
+        data['result'] = False
+        data['message'] = 'workflow_id is not in the message body'
+
+    log.info('returning a result for workflow remove event to sid %s' % sid)
+    sio.emit(
+        event=WORKFLOW_REMOVE,
+        data=data,
+        room=sid
+    )
+
+
+def _handle_event_workflow_run_remove(sid, body):
+    data = {
+        'req_id': _get_req_id(body)
+    }
+
+    if 'workflow_id' in body and 'workflow_run_id' in body:
+        # workflow_id = body['workflow_id']
+        workflow_run_id = body['workflow_run_id']
+
+        if workflow_run_id in workflow_runs:
+            del workflow_runs[workflow_run_id]
+
+            data['error'] = False
+            data['result'] = True
+        else:
+            data['error'] = False
+            data['result'] = False
+    else:
+        data['error'] = True
+        data['result'] = False
+        data['message'] = 'workflow_id or workflow_run_id is not in the message body'
+
+    log.info('returning a result for workflow run remove event to sid %s' % sid)
+    sio.emit(
+        event=WORKFLOW_REMOVE_RUN,
+        data=data,
+        room=sid
+    )
+
+
+def _handle_event_new_workflow_run(sid, body):
+    data = {
+        'req_id': _get_req_id(body)
+    }
+
+    if 'workflow_id' in body and 'workflow_run_id' in body:
+        workflow_id = body['workflow_id']
+        workflow_run_id = body['workflow_run_id']
+
+        log.info('manager (%s) started a new workflow (%s), workflow_run (%s)' % (sid, workflow_id, workflow_run_id))
+        workflow_runs[workflow_run_id] = {
+            'workflow_id': workflow_id,
+            'workflow_run_id': workflow_run_id,
+            'queue': []
+        }
+
+        data['error'] = False
+        data['result'] = True
+    else:
+        data['error'] = True
+        data['result'] = False
+        data['message'] = 'workflow_id or workflow_run_id is not in the message body'
+
+    log.info('returning a result for a new workflow run event to sid %s' % sid)
+    sio.emit(
+        event=WORKFLOW_NOTIFY_NEW_RUN,
+        data=data,
+        room=sid
+    )
+
+
+def _handle_event_workflow_run_update_status(sid, body):
+    data = {
+        'req_id': _get_req_id(body)
+    }
+
+    if 'workflow_id' in body and 'workflow_run_id' in body and 'task_id' in body and 'status' in body:
+        # workflow_id = body['workflow_id']
+        workflow_run_id = body['workflow_run_id']
+        task_id = body['task_id']
+        status = body['status']
+
+        if workflow_run_id in workflow_runs:
+            workflow_run = workflow_runs[workflow_run_id]
+            workflow_run[task_id] = status
+
+            data['error'] = False
+            data['result'] = True
+        else:
+            data['error'] = True
+            data['result'] = False
+            data['message'] = 'cannot find workflow run'
+    else:
+        data['error'] = True
+        data['result'] = False
+        data['message'] = 'workflow_id, workflow_run_id, task_id or status is not in the message body'
+
+    log.info('returning a result for workflow run update status event to sid %s' % sid)
+    sio.emit(
+        event=WORKFLOW_RUN_UPDATE_STATUS,
+        data=data,
+        room=sid
+    )
+
+
+def _handle_event_workflow_run_count_events(sid, body):
+    data = {
+        'req_id': _get_req_id(body)
+    }
+
+    if 'workflow_id' in body and 'workflow_run_id' in body:
+        # workflow_id = body['workflow_id']
+        workflow_run_id = body['workflow_run_id']
+
+        if workflow_run_id in workflow_runs:
+            workflow_run = workflow_runs[workflow_run_id]
+            queue = workflow_run['queue']
+            count = len(queue)
+
+            data['error'] = False
+            data['result'] = count
+        else:
+            data['error'] = True
+            data['result'] = 0
+            data['message'] = 'cannot find workflow run'
+    else:
+        data['error'] = True
+        data['result'] = 0
+        data['message'] = 'workflow_id, workflow_run_id, task_id or status is not in the message body'
+
+    log.info('returning a result for workflow run count events to sid %s' % sid)
+    sio.emit(
+        event=WORKFLOW_RUN_COUNT_EVENTS,
+        data=data,
+        room=sid
+    )
+
+
+def _handle_event_workflow_run_fetch_event(sid, body):
+    data = {
+        'req_id': _get_req_id(body)
+    }
+
+    if 'workflow_id' in body and 'workflow_run_id' in body and 'task_id' in body and 'topic' in body:
+        # workflow_id = body['workflow_id']
+        workflow_run_id = body['workflow_run_id']
+        # task_id = body['task_id']
+        topic = body['topic']
+
+        if workflow_run_id in workflow_runs:
+            workflow_run = workflow_runs[workflow_run_id]
+            queue = workflow_run['queue']
+
+            event = None
+            for idx in range(len(queue)):
+                if queue[idx]['topic'] == topic:
+                    # found
+                    event = queue.pop(idx)
+                    break
+
+            if event:
+                data['error'] = False
+                data['result'] = event
+            else:
+                data['error'] = False
+                data['result'] = {}
+        else:
+            data['error'] = False
+            data['result'] = False
+            data['message'] = 'cannot find workflow run'
+    else:
+        data['error'] = True
+        data['result'] = False
+        data['message'] = 'workflow_id, workflow_run_id, task_id or topic is not in the message body'
+
+    log.info('returning a result for workflow run fetch event to sid %s' % sid)
+    sio.emit(
+        event=WORKFLOW_RUN_FETCH_EVENT,
+        data=data,
+        room=sid
+    )
+
+
+def _handle_event(event, sid, body):
+    log.info('event %s - body %s (%s)' % (event, body, type(body)))
+
+
+class ServerEventHandler(socketio.namespace.Namespace):
+    def trigger_event(self, event, *args):
+        sid = args[0]
+        if event == 'connect':
+            querystr = args[1]['QUERY_STRING']
+            query = make_query_string_dict(querystr)
+            _handle_event_connect(sid, query)
+        elif event == 'disconnect':
+            _handle_event_disconnect(sid)
+
+        # manager
+        elif event == WORKFLOW_NOTIFY_NEW_RUN:
+            _handle_event_new_workflow_run(sid, args[1])
+        elif event == WORKFLOW_REGISTER_ESSENCE:
+            _handle_event_workflow_reg_essence(sid, args[1])
+        elif event == WORKFLOW_REGISTER:
+            _handle_event_workflow_reg(sid, args[1])
+        elif event == WORKFLOW_LIST:
+            _handle_event_workflow_list(sid, args[1])
+        elif event == WORKFLOW_LIST_RUN:
+            _handle_event_workflow_run_list(sid, args[1])
+        elif event == WORKFLOW_CHECK:
+            _handle_event_workflow_check(sid, args[1])
+        elif event == WORKFLOW_REMOVE:
+            _handle_event_workflow_remove(sid, args[1])
+        elif event == WORKFLOW_REMOVE_RUN:
+            _handle_event_workflow_run_remove(sid, args[1])
+
+        # workflow run
+        elif event == WORKFLOW_RUN_UPDATE_STATUS:
+            _handle_event_workflow_run_update_status(sid, args[1])
+        elif event == WORKFLOW_RUN_COUNT_EVENTS:
+            _handle_event_workflow_run_count_events(sid, args[1])
+        elif event == WORKFLOW_RUN_FETCH_EVENT:
+            _handle_event_workflow_run_fetch_event(sid, args[1])
+        else:
+            _handle_event(event, args[0], args[1])
+
+
+def _run(port):
+    global sio
+    sio = socketio.Server(ping_timeout=5, ping_interval=1)
+    app = socketio.WSGIApp(sio)
+    sio.register_namespace(ServerEventHandler('/'))
+
+    server = pywsgi.WSGIServer(
+        ('', port),
+        app,
+        handler_class=WebSocketHandler
+    )
+
+    server.serve_forever()
+
+
+def start(port):
+    p = Process(target=_run, args=(port, ))
+    p.start()
+    time.sleep(3)
+
+    log.info('Dummy server is started!')
+    return p
+
+
+def stop(p):
+    log.info('Stopping dummy server!')
+
+    try:
+        process = psutil.Process(p.pid)
+        for proc in process.children(recursive=True):
+            proc.kill()
+        process.kill()
+        p.join()
+    except psutil.NoSuchProcess:
+        pass
+
+    # clean-up
+    global sio, manager_clients, workflow_runs, seq_no
+    sio = None
+    manager_clients = {}
+    workflow_runs = {}
+    seq_no = 1
+
+    time.sleep(3)
+
+    log.info('Dummy server is stopped!')
diff --git a/test/dummy_server_util.py b/test/dummy_server_util.py
new file mode 100644
index 0000000..7cc60db
--- /dev/null
+++ b/test/dummy_server_util.py
@@ -0,0 +1,39 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+import atexit
+from .dummy_server import stop as server_stop
+
+dummy_servers = {}
+
+
+def cleanup_dummy_servers():
+    for pid in dummy_servers:
+        s = dummy_servers[pid]
+        server_stop(s)
+        del dummy_servers[pid]
+
+
+def register_dummy_server_cleanup(s):
+    if s.pid not in dummy_servers:
+        dummy_servers[s.pid] = s
+
+
+def unregister_dummy_server_cleanup(s):
+    if s.pid in dummy_servers:
+        del dummy_servers[s.pid]
+
+
+atexit.register(cleanup_dummy_servers)
diff --git a/test/hello_workflow.json b/test/hello_workflow.json
new file mode 100644
index 0000000..9de71bc
--- /dev/null
+++ b/test/hello_workflow.json
@@ -0,0 +1,25 @@
+{
+    "hello_workflow": {
+        "dag": {
+            "dag_id": "hello_workflow",
+            "local_variable": "dag_hello"
+        },
+        "dependencies": {
+            "onu_event_handler": {}
+        },
+        "tasks": {
+            "onu_event_handler": {
+                "class": "XOSEventSensor",
+                "dag_id": "hello_workflow",
+                "dag": "dag_hello",
+                "key_field": "serialNumber",
+                "local_variable": "onu_event_handler",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "ONU_event",
+                "task_id": "onu_event_handler",
+                "topic": "onu.events"
+            }
+        }
+    }
+}
diff --git a/test/test_manager.py b/test/test_manager.py
new file mode 100644
index 0000000..d35a031
--- /dev/null
+++ b/test/test_manager.py
@@ -0,0 +1,120 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+import unittest
+import time
+import os
+import json
+from cord_workflow_controller_client.manager import Manager
+from multistructlog import create_logger
+from .dummy_server import start as server_start, stop as server_stop
+from .dummy_server_util import register_dummy_server_cleanup, unregister_dummy_server_cleanup
+
+log = create_logger()
+code_dir = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+
+
+def read_json_file(filename):
+    if filename:
+        with open(filename, 'r') as f:
+            return json.load(f)
+    return None
+
+
+class TestManager(unittest.TestCase):
+    """
+    Try to connect to a local fake Controller Service as a Manager.
+    """
+
+    def setUp(self):
+        self.server = server_start(17080)
+        self.kickstarted_workflows = {}
+        register_dummy_server_cleanup(self.server)
+
+    def tearDown(self):
+        server_stop(self.server)
+        unregister_dummy_server_cleanup(self.server)
+        self.server = None
+        self.kickstarted_workflows = {}
+
+    def test_connect(self):
+        """
+        This tests if Manager client can connect to a socket.io server properly.
+        """
+        succeed = False
+        try:
+            manager = Manager(logger=log)
+            manager.connect('http://localhost:17080')
+
+            time.sleep(1)
+
+            manager.disconnect()
+            succeed = True
+        finally:
+            self.assertTrue(succeed, 'Finished with error')
+
+    def test_kickstart(self):
+        """
+        This tests if Manager client can receive a kickstart event.
+        """
+        succeed = False
+
+        try:
+            manager = Manager(logger=log)
+            manager.connect('http://localhost:17080')
+
+            def on_kickstart(workflow_id, workflow_run_id):
+                self.kickstarted_workflows[workflow_id] = {
+                    'workflow_id': workflow_id,
+                    'workflow_run_id': workflow_run_id
+                }
+                manager.notify_new_workflow_run(workflow_id, workflow_run_id)
+
+            manager.set_handlers({'kickstart': on_kickstart})
+
+            # dummy server sends a kickstart message for every 2 seconds
+            # we wait 6 seconds to receive at least 2 messages
+            time.sleep(6)
+
+            manager.disconnect()
+            succeed = True
+        finally:
+            self.assertTrue(succeed, 'Finished with error')
+            self.assertTrue(len(self.kickstarted_workflows) >= 2, 'Kickstart event is not handled')
+
+    def test_workflow_essence_register(self):
+        """
+        This tests if Manager client can register workflow essence.
+        """
+        succeed = False
+        essence_path = os.path.join(code_dir, "hello_workflow.json")
+        essence = read_json_file(essence_path)
+
+        try:
+            manager = Manager(logger=log)
+            manager.connect('http://localhost:17080')
+
+            # the command is synchronous
+            result = manager.register_workflow_essence(essence)
+
+            manager.disconnect()
+            succeed = True
+        finally:
+            self.assertTrue(succeed, 'Finished with error')
+            self.assertTrue(result, 'workflow essence register failed')
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/test/test_probe.py b/test/test_probe.py
new file mode 100644
index 0000000..3f6baed
--- /dev/null
+++ b/test/test_probe.py
@@ -0,0 +1,95 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+import unittest
+import time
+from cord_workflow_controller_client.probe import Probe
+from multistructlog import create_logger
+from .dummy_server import start as server_start, stop as server_stop
+
+log = create_logger()
+
+
+class TestProbe(unittest.TestCase):
+    """
+    Try to connect to a local fake Controller Service as a Probe.
+    """
+
+    def setUp(self):
+        self.server = server_start(17080)
+
+    def tearDown(self):
+        server_stop(self.server)
+        self.server = None
+
+    def test_connect(self):
+        """
+        This tests if Probe client can connect to a socket.io server properly.
+        """
+        succeed = False
+        try:
+            probe = Probe(logger=log)
+            probe.connect('http://localhost:17080')
+
+            time.sleep(1)
+
+            probe.disconnect()
+            succeed = True
+        finally:
+            self.assertTrue(succeed, 'Finished with error')
+
+    def test_emit_string(self):
+        """
+        This tests if Probe client can emit an event.
+        """
+        succeed = False
+        try:
+            probe = Probe(logger=log)
+            probe.connect('http://localhost:17080')
+
+            probe.emit_event('xos.test.event', 'string message - hello')
+            time.sleep(1)
+
+            probe.disconnect()
+            succeed = True
+        finally:
+            self.assertTrue(succeed, 'Finished with error')
+
+    def test_emit_json(self):
+        """
+        This tests if Probe client can emit an event with a dict (json) object.
+        """
+        succeed = False
+        try:
+            probe = Probe(logger=log)
+            probe.connect('http://localhost:17080')
+
+            probe.emit_event(
+                'xos.test.event',
+                {
+                    'str_key': 'value',
+                    'int_key': 32335
+                }
+            )
+            time.sleep(1)
+
+            probe.disconnect()
+            succeed = True
+        finally:
+            self.assertTrue(succeed, 'Finished with error')
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/test/test_workflow_run.py b/test/test_workflow_run.py
new file mode 100644
index 0000000..80d6fcf
--- /dev/null
+++ b/test/test_workflow_run.py
@@ -0,0 +1,179 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+import unittest
+import time
+import os
+import json
+from cord_workflow_controller_client.manager import Manager
+from cord_workflow_controller_client.workflow_run import WorkflowRun
+from multistructlog import create_logger
+from .dummy_server import start as server_start, stop as server_stop
+
+log = create_logger()
+code_dir = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+
+
+def read_json_file(filename):
+    if filename:
+        with open(filename, 'r') as f:
+            return json.load(f)
+    return None
+
+
+class TestWorkflowRun(unittest.TestCase):
+    """
+    Try to connect to a local fake Controller Service as a Manager.
+    """
+
+    def setUp(self):
+        self.kickstarted_workflows = {}
+        self.notifications = []
+
+        self.server = server_start(17080)
+        self.manager = Manager(logger=log)
+        self.manager.connect('http://localhost:17080')
+
+        essence_path = os.path.join(code_dir, "hello_workflow.json")
+        essence = read_json_file(essence_path)
+        self.manager.register_workflow_essence(essence)
+        self.manager.notify_new_workflow_run('hello_workflow', 'hello_workflow_123')
+
+        # wait for 2 seconds for registering a new workflow run
+        time.sleep(2)
+
+    def tearDown(self):
+        self.manager.disconnect()
+        self.manager = None
+
+        server_stop(self.server)
+        self.server = None
+
+        self.kickstarted_workflows = {}
+        self.notifications = []
+
+    def test_connect(self):
+        """
+        This tests if workflow run client can connect to a socket.io server properly.
+        """
+        succeed = False
+        try:
+            run = WorkflowRun('hello_workflow', 'hello_workflow_123')
+            run.connect('http://localhost:17080')
+
+            time.sleep(1)
+
+            run.disconnect()
+            succeed = True
+        finally:
+            self.assertTrue(succeed, 'Finished with error')
+
+    def test_count_events(self):
+        """
+        This tests if workflow run client can retrieve the number of events.
+        """
+        succeed = False
+        try:
+            run = WorkflowRun('hello_workflow', 'hello_workflow_123')
+            run.connect('http://localhost:17080')
+
+            # dummy server generates a message for every 2 seconds
+            # we wait 6 seconds to queue at least 2 messages
+            time.sleep(6)
+
+            count = run.count_events()
+
+            run.disconnect()
+            succeed = True
+        finally:
+            self.assertTrue(succeed, 'Finished with error')
+            self.assertTrue(count >= 2, 'There must be more than 2 events queued')
+
+    def test_notify_event(self):
+        """
+        This tests if workflow run client can get a noficitation for events.
+        """
+        succeed = False
+        try:
+            run = WorkflowRun('hello_workflow', 'hello_workflow_123')
+            run.connect('http://localhost:17080')
+
+            def on_notification(workflow_id, workflow_run_id, topic):
+                self.notifications.append({
+                    'workflow_id': workflow_id,
+                    'workflow_run_id': workflow_run_id,
+                    'topic': topic
+                })
+
+            run.set_handlers({'notify': on_notification})
+
+            # dummy server generates a message for every 2 seconds
+            # we wait 6 seconds to get at least 2 notifications
+            time.sleep(6)
+
+            count = len(self.notifications)
+
+            run.disconnect()
+            succeed = True
+        finally:
+            self.assertTrue(succeed, 'Finished with error')
+            self.assertTrue(count >= 2, 'There must be more than 2 notifications received')
+
+    def test_get_events(self):
+        """
+        This tests if workflow run client can retrieve events.
+        """
+        succeed = False
+        try:
+            run = WorkflowRun('hello_workflow', 'hello_workflow_123')
+            run.connect('http://localhost:17080')
+
+            def on_notification(workflow_id, workflow_run_id, topic):
+                self.notifications.append({
+                    'workflow_id': workflow_id,
+                    'workflow_run_id': workflow_run_id,
+                    'topic': topic
+                })
+
+            run.set_handlers({'notify': on_notification})
+
+            # dummy server generates a message for every 2 seconds
+            # we wait 6 seconds to queue at least 2 messages
+            time.sleep(6)
+
+            count_notified = len(self.notifications)
+            count_queued = run.count_events()
+
+            self.assertTrue(count_notified >= 2, 'There must be more than 2 events notified')
+            self.assertTrue(count_queued >= 2, 'There must be more than 2 events queued')
+
+            # count_notified and count_queued may not have the same number temporarily
+            for i in range(count_notified):
+                notification = self.notifications.pop(0)
+                topic = notification['topic']
+                event = run.fetch_event('task123', topic)
+
+            self.assertTrue('topic' in event, 'event should not be empty')
+            self.assertTrue(event['topic'] == topic, 'event should be retrieved by topic')
+            self.assertTrue(len(event['message']) > 0, 'there must be some messages')
+
+            run.disconnect()
+            succeed = True
+        finally:
+            self.assertTrue(succeed, 'Finished with error')
+
+
+if __name__ == "__main__":
+    unittest.main()