Implement controller client
- Probe, Manager, WorkflowRun classes are provided to interact with CORD Workflow Controller

Change-Id: I0ad8d3661864635d9701eab1cb089cb17f81cd50
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..92a524d
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,12 @@
+.noseids
+.vscode
+build
+dist
+.coverage
+coverage.xml
+cover
+.tox
+.DS_Store
+nose2-results.xml
+venv-cordworkflowcontrollerclient
+*.pyc
\ No newline at end of file
diff --git a/.gitreview b/.gitreview
new file mode 100644
index 0000000..12fa340
--- /dev/null
+++ b/.gitreview
@@ -0,0 +1,5 @@
+[gerrit]
+host=gerrit.opencord.org
+port=29418
+project=cord-workflow-controller-client.git
+defaultremote=origin
diff --git a/LICENSE.txt b/LICENSE.txt
new file mode 100644
index 0000000..261eeb9
--- /dev/null
+++ b/LICENSE.txt
@@ -0,0 +1,201 @@
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
diff --git a/MANIFEST.in b/MANIFEST.in
new file mode 100644
index 0000000..45c47fc
--- /dev/null
+++ b/MANIFEST.in
@@ -0,0 +1,3 @@
+include README.rst
+include requirements.txt
+include VERSION
\ No newline at end of file
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..aeaa95f
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,49 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# set default shell
+SHELL = bash -e -o pipefail
+
+# Variables
+VERSION                  ?= $(shell cat ./VERSION)
+
+# Targets
+all: test
+
+# Create a virtualenv and install all the libraries
+venv-cordworkflowcontrollerclient:
+	virtualenv $@;\
+    source ./$@/bin/activate ; set -u ;\
+    pip install -r requirements.txt nose2 ;\
+    pip install -e ./
+
+# tests
+test: unit-test
+
+unit-test:
+	tox
+
+clean:
+	find . -name '*.pyc' | xargs rm -f
+	find . -name '__pycache__' | xargs rm -rf
+	rm -rf \
+    .coverage \
+    coverage.xml \
+    nose2-results.xml \
+    venv-cordworkflowcontrollerclient \
+    .tox \
+    build \
+    dist \
+    *.egg-info \
+    *results.xml
diff --git a/README.rst b/README.rst
new file mode 100644
index 0000000..1f968c9
--- /dev/null
+++ b/README.rst
@@ -0,0 +1,20 @@
+CORD Workflow Controller Client
+===============================
+A CORD Workflow Controller Client Library.
+This library allows users to communicate to CORD Workflow Controller.
+There are three classes provided, Manager, Probe and WorkflwoRun.
+
+Manager
+-------
+This class is used to act as a workflow manager.
+Workflow Managers manage **workflow registration** and handle **kickstart** events.
+
+Probe
+-----
+This class is used to act as a probe.
+Probes emit events to CORD Workflow Controller.
+
+WorkflowRun
+-----------
+This class is used to act as a workflow run.
+Workflow Runs are used by workflow instances to receive events emitted by Probes.
diff --git a/VERSION b/VERSION
new file mode 100644
index 0000000..49ffebc
--- /dev/null
+++ b/VERSION
@@ -0,0 +1 @@
+0.1.0-dev
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..83d1b7f
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1 @@
+python-socketio[client]~=4.1.0
diff --git a/setup.py b/setup.py
new file mode 100644
index 0000000..1e84c5c
--- /dev/null
+++ b/setup.py
@@ -0,0 +1,53 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+
+from setuptools import setup
+
+
+def readme():
+    with open("README.rst") as f:
+        return f.read()
+
+
+def version():
+    with open("VERSION") as f:
+        return f.read().strip()
+
+
+def parse_requirements(filename):
+    # parse a requirements.txt file, allowing for blank lines and comments
+    requirements = []
+    for line in open(filename):
+        if line and not line.startswith("#"):
+            requirements.append(line)
+    return requirements
+
+
+setup(
+    name="cord_workflow_controller_client",
+    version=version(),
+    description="A client library for CORD Workflow Controller",
+    url="https://gerrit.opencord.org/gitweb?p=cord-workflow-controller-client.git",
+    long_description=readme(),
+    author="Illyoung Choi",
+    author_email="iychoi@opennetworking.org",
+    classifiers=["License :: OSI Approved :: Apache Software License"],
+    license="Apache v2",
+    packages=["cord_workflow_controller_client"],
+    package_dir={"cord_workflow_controller_client": "src/cord_workflow_controller_client"},
+    install_requires=parse_requirements("requirements.txt"),
+    include_package_data=True,
+)
diff --git a/src/cord_workflow_controller_client/__init__.py b/src/cord_workflow_controller_client/__init__.py
new file mode 100644
index 0000000..19d1424
--- /dev/null
+++ b/src/cord_workflow_controller_client/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/src/cord_workflow_controller_client/countdown_latch.py b/src/cord_workflow_controller_client/countdown_latch.py
new file mode 100644
index 0000000..5d3c398
--- /dev/null
+++ b/src/cord_workflow_controller_client/countdown_latch.py
@@ -0,0 +1,55 @@
+#!/usr/bin/env python3
+
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Count-down latch
+"""
+
+import threading
+import time
+
+
+class CountDownLatch(object):
+    def __init__(self, count=1):
+        self.count = count
+        self.condition = threading.Condition()
+
+    def count_down(self, count=1):
+        self.condition.acquire()
+        self.count -= count
+        if self.count <= 0:
+            self.condition.notifyAll()
+        self.condition.release()
+
+    def wait(self, timeout=0):
+        self.condition.acquire()
+        start_time = time.time()
+
+        while self.count > 0:
+            self.condition.wait(timeout)
+            cur_time = time.time()
+            if cur_time - start_time >= timeout:
+                break
+
+        self.condition.release()
+        if self.count <= 0:
+            return True
+        else:
+            # timeout
+            return False
+
+    def get_count(self):
+        return self.count
diff --git a/src/cord_workflow_controller_client/errors.py b/src/cord_workflow_controller_client/errors.py
new file mode 100644
index 0000000..3e42283
--- /dev/null
+++ b/src/cord_workflow_controller_client/errors.py
@@ -0,0 +1,61 @@
+#!/usr/bin/env python3
+
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Errors
+"""
+
+
+class ClientError(Exception):
+    """
+    Base class for exceptions in this module.
+    """
+    pass
+
+
+class ClientRPCError(ClientError):
+    """
+    Raised when an RPC call failed.
+
+    Attributes:
+        req_id -- request id
+        message -- explanation of the reason why the RPC call is failed
+    """
+    def __init__(self, req_id, message):
+        self.req_id = req_id
+        self.message = message
+
+
+class ClientInputError(ClientError):
+    """
+    Raised when input parameters are missing or wrong.
+
+    Attributes:
+        message -- explanation of the reason why the RPC call is failed
+    """
+    def __init__(self, message):
+        self.message = message
+
+
+class ClientResponseError(ClientError):
+    """
+    Raised when error is returned
+
+    Attributes:
+        message -- explanation of the reason why the request is failed
+    """
+    def __init__(self, message):
+        self.message = message
diff --git a/src/cord_workflow_controller_client/manager.py b/src/cord_workflow_controller_client/manager.py
new file mode 100644
index 0000000..d788fe9
--- /dev/null
+++ b/src/cord_workflow_controller_client/manager.py
@@ -0,0 +1,523 @@
+#!/usr/bin/env python3
+
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Workflow Manager
+
+This module implements Workflow Manager interface
+"""
+
+import json
+import socketio
+
+from .countdown_latch import CountDownLatch
+from .utils import get_noop_logger, gen_id, gen_seq_id
+from .errors import ClientRPCError, ClientInputError, ClientResponseError
+
+WAIT_TIMEOUT = 10  # 10 seconds
+
+# controller -> manager
+GREETING = 'cord.workflow.ctlsvc.greeting'
+WORKFLOW_KICKSTART = 'cord.workflow.ctlsvc.workflow.kickstart'
+
+# manager -> controller -> manager
+WORKFLOW_REGISTER = 'cord.workflow.ctlsvc.workflow.register'
+WORKFLOW_REGISTER_ESSENCE = 'cord.workflow.ctlsvc.workflow.register_essence'
+WORKFLOW_LIST = 'cord.workflow.ctlsvc.workflow.list'
+WORKFLOW_LIST_RUN = 'cord.workflow.ctlsvc.workflow.run.list'
+WORKFLOW_CHECK = 'cord.workflow.ctlsvc.workflow.check'
+WORKFLOW_REMOVE = 'cord.workflow.ctlsvc.workflow.remove'
+WORKFLOW_REMOVE_RUN = 'cord.workflow.ctlsvc.workflow.run.remove'
+WORKFLOW_NOTIFY_NEW_RUN = 'cord.workflow.ctlsvc.workflow.notify_new_run'
+
+
+class Manager(object):
+    def __init__(self, logger=None, name=None):
+        self.sio = socketio.Client()
+
+        if logger:
+            self.logger = logger
+        else:
+            self.logger = get_noop_logger()
+
+        if name:
+            self.name = name
+        else:
+            self.name = 'manager_%s' % gen_id()
+
+        self.req_id = gen_seq_id()
+
+        # set sio handlers
+        self.logger.debug('Setting event handlers to Socket.IO')
+        self.sio.on('connect', self.__on_sio_connect)
+        self.sio.on('disconnect', self.__on_sio_disconnect)
+        self.sio.on(WORKFLOW_KICKSTART, self.__on_kickstart_message)
+        self.sio.on(GREETING, self.__on_greeting_message)
+        self.sio.on(WORKFLOW_REGISTER, self.__on_workflow_reg_message)
+        self.sio.on(WORKFLOW_REGISTER_ESSENCE, self.__on_workflow_reg_essence_message)
+        self.sio.on(WORKFLOW_LIST, self.__on_workflow_list_message)
+        self.sio.on(WORKFLOW_LIST_RUN, self.__on_workflow_list_run_message)
+        self.sio.on(WORKFLOW_CHECK, self.__on_workflow_check_message)
+        self.sio.on(WORKFLOW_REMOVE, self.__on_workflow_remove_message)
+        self.sio.on(WORKFLOW_REMOVE_RUN, self.__on_workflow_remove_run_message)
+        self.sio.on(WORKFLOW_NOTIFY_NEW_RUN, self.__on_workflow_notify_new_run_message)
+
+        self.handlers = {
+            'connect': self.__noop_connect_handler,
+            'disconnect': self.__noop_disconnect_handler,
+            'kickstart': self.__noop_kickstart_handler
+        }
+
+        # key is req_id
+        self.pending_requests = {}
+
+    def set_logger(self, logger):
+        self.logger = logger
+
+    def get_logger(self):
+        return self.logger
+
+    def __on_sio_connect(self):
+        self.logger.debug('connected to the server')
+        handler = self.handlers['connect']
+        if callable(handler):
+            handler()
+
+    def __noop_connect_handler(self):
+        self.logger.debug('no-op connect handler')
+
+    def __on_sio_disconnect(self):
+        self.logger.debug('disconnected from the server')
+        handler = self.handlers['disconnect']
+        if callable(handler):
+            handler()
+
+    def __noop_disconnect_handler(self):
+        self.logger.debug('no-op disconnect handler')
+
+    def __noop_kickstart_handler(self, workflow_id, workflow_run_id):
+        self.logger.debug('no-op kickstart handler')
+
+    def __get_next_req_id(self):
+        req_id = self.req_id
+        self.req_id += 1
+        return req_id
+
+    def __on_greeting_message(self, data):
+        self.logger.debug('received a gretting message from the server')
+
+    def __on_kickstart_message(self, data):
+        """
+        Handler for a kickstart event
+        REQ = {
+            'workflow_id': <workflow_id>,
+            'workflow_run_id': <workflow_run_id>
+        }
+        """
+        self.logger.info('received a kickstart message from the server')
+        workflow_id = data['workflow_id']
+        workflow_run_id = data['workflow_run_id']
+
+        self.logger.info(
+            'a kickstart message - workflow_id (%s), workflow_run_id (%s)' %
+            (workflow_id, workflow_run_id)
+        )
+        if workflow_id and workflow_run_id:
+            handler = self.handlers['kickstart']
+            if callable(handler):
+                self.logger.info('calling a kickstart handler - %s' % handler)
+                handler(workflow_id, workflow_run_id)
+
+    def __on_workflow_reg_message(self, data):
+        self.__on_response(WORKFLOW_REGISTER, data)
+
+    def __on_workflow_reg_essence_message(self, data):
+        self.__on_response(WORKFLOW_REGISTER_ESSENCE, data)
+
+    def __on_workflow_list_message(self, data):
+        self.__on_response(WORKFLOW_LIST, data)
+
+    def __on_workflow_list_run_message(self, data):
+        self.__on_response(WORKFLOW_LIST_RUN, data)
+
+    def __on_workflow_check_message(self, data):
+        self.__on_response(WORKFLOW_CHECK, data)
+
+    def __on_workflow_remove_message(self, data):
+        self.__on_response(WORKFLOW_REMOVE, data)
+
+    def __on_workflow_remove_run_message(self, data):
+        self.__on_response(WORKFLOW_REMOVE_RUN, data)
+
+    def __on_workflow_notify_new_run_message(self, data):
+        self.__on_response(WORKFLOW_NOTIFY_NEW_RUN, data)
+
+    def __check_pending_request(self, req_id):
+        """
+        Check a pending request
+        """
+        if req_id in self.pending_requests:
+            return True
+        return False
+
+    def __put_pending_request(self, api, params):
+        """
+        Put a pending request to a queue
+        """
+        req_id = self.__get_next_req_id()
+        latch = CountDownLatch()
+        params['req_id'] = req_id  # inject req_id
+        self.sio.emit(api, params)
+        self.pending_requests[req_id] = {
+            'req_id': req_id,
+            'latch': latch,
+            'api': api,
+            'params': params,
+            'result': None
+        }
+        return req_id
+
+    def __wait_response(self, req_id):
+        """
+        Wait for completion of a request
+        """
+        if req_id in self.pending_requests:
+            req = self.pending_requests[req_id]
+            # python v 3.2 or below does not return a result
+            # that tells whether it is timedout or not
+            return req['latch'].wait(WAIT_TIMEOUT)
+        else:
+            self.logger.error(
+                'cannot find a pending request (%s) from a queue' % req_id
+            )
+            raise ClientRPCError(
+                req_id,
+                'cannot find a pending request (%s) from a queue' % req_id
+            )
+
+    def __complete_request(self, req_id, result):
+        """
+        Compelete a pending request
+        """
+        if req_id in self.pending_requests:
+            req = self.pending_requests[req_id]
+            req['latch'].count_down()
+            req['result'] = result
+            return
+
+        self.logger.error(
+            'cannot find a pending request (%s) from a queue' % req_id
+        )
+        raise ClientRPCError(
+            req_id,
+            'cannot find a pending request (%s) from a queue' % req_id
+        )
+
+    def __pop_pending_request(self, req_name):
+        """
+        Pop a pending request from a queue
+        """
+        return self.pending_requests.pop(req_name, None)
+
+    def connect(self, url):
+        """
+        Connect to the given url
+        """
+        query_string = 'id=%s&type=workflow_manager&name=%s' % (self.name, self.name)
+        connect_url = '%s?%s' % (url, query_string)
+
+        if not (connect_url.startswith('http://') or connect_url.startswith('https://')):
+            connect_url = 'http://%s' % connect_url
+
+        self.logger.debug('Connecting to a Socket.IO server (%s)' % connect_url)
+        self.sio.connect(url=connect_url, transports=['websocket'])
+
+    def disconnect(self):
+        """
+        Disconnect from the server
+        """
+        self.sio.disconnect()
+
+    def wait(self):
+        self.sio.wait()
+
+    def sleep(self, sec):
+        self.sio.sleep(sec)
+
+    def get_handlers(self):
+        return self.handlers
+
+    def set_handlers(self, new_handlers):
+        for k in self.handlers:
+            if k in new_handlers:
+                self.handlers[k] = new_handlers[k]
+
+    def __request(self, api, params={}):
+        if api and params:
+            req_id = self.__put_pending_request(api, params)
+            self.logger.debug('waiting for a response for req_id (%s)' % req_id)
+            self.__wait_response(req_id)  # wait for completion
+            req = self.__pop_pending_request(req_id)
+            if req:
+                if req['latch'].get_count() > 0:
+                    # timed out
+                    self.logger.error('request (%s) timed out' % req_id)
+                    raise ClientRPCError(
+                        req_id,
+                        'request (%s) timed out' % req_id
+                    )
+                else:
+                    return req['result']
+            else:
+                self.logger.error('cannot find a pending request (%s) from a queue' % req_id)
+                raise ClientRPCError(
+                    req_id,
+                    'cannot find a pending request (%s) from a queue' % req_id
+                )
+        else:
+            self.logger.error(
+                'invalid arguments api (%s), params (%s)' %
+                (api, json.dumps(params))
+            )
+            raise ClientInputError(
+                'invalid arguments api (%s), params (%s)' %
+                (api, json.dumps(params))
+            )
+
+    def __on_response(self, api, result):
+        if result and 'req_id' in result:
+            self.logger.debug('completing a request (%s)' % result['req_id'])
+            self.__complete_request(result['req_id'], result)
+        else:
+            self.logger.error(
+                'invalid arguments api (%s), result (%s)' %
+                (api, json.dumps(result))
+            )
+            raise ClientInputError(
+                'invalid arguments api (%s), result (%s)' %
+                (api, json.dumps(result))
+            )
+
+    def register_workflow(self, workflow):
+        """
+        Register a workflow.
+        Workflow parameter is a workflow object
+        """
+        if workflow:
+            result = self.__request(WORKFLOW_REGISTER, {
+                'workflow': workflow
+            })
+            if result['error']:
+                self.logger.error(
+                    'request (%s) failed with an error - %s' %
+                    (result['req_id'], result['message'])
+                )
+                raise ClientResponseError(
+                    'request (%s) failed with an error - %s' %
+                    (result['req_id'], result['message'])
+                )
+            else:
+                return result['result']
+        else:
+            self.logger.error(
+                'invalid arguments workflow (%s)' %
+                json.dumps(workflow)
+            )
+            raise ClientInputError(
+                'invalid arguments workflow (%s)' %
+                json.dumps(workflow)
+            )
+
+    def register_workflow_essence(self, essence):
+        """
+        Register a workflow by essence.
+        """
+        if essence:
+            result = self.__request(WORKFLOW_REGISTER_ESSENCE, {
+                'essence': essence
+            })
+            if result['error']:
+                self.logger.error(
+                    'request (%s) failed with an error - %s' %
+                    (result['req_id'], result['message'])
+                )
+                raise ClientResponseError(
+                    'request (%s) failed with an error - %s' %
+                    (result['req_id'], result['message'])
+                )
+            else:
+                return result['result']
+        else:
+            self.logger.error(
+                'invalid arguments workflow essence (%s)' %
+                json.dumps(essence)
+            )
+            raise ClientInputError(
+                'invalid arguments workflow essence (%s)' %
+                json.dumps(essence)
+            )
+
+    def list_workflows(self):
+        """
+        List workflows.
+        """
+        result = self.__request(WORKFLOW_LIST, {})
+        if result['error']:
+            self.logger.error(
+                'request (%s) failed with an error - %s' %
+                (result['req_id'], result['message'])
+            )
+            raise ClientResponseError(
+                'request (%s) failed with an error - %s' %
+                (result['req_id'], result['message'])
+            )
+        else:
+            return result['result']
+
+    def list_workflow_runs(self):
+        """
+        List workflow runs.
+        """
+        result = self.__request(WORKFLOW_LIST_RUN, {})
+        if result['error']:
+            self.logger.error(
+                'request (%s) failed with an error - %s' %
+                (result['req_id'], result['message'])
+            )
+            raise ClientResponseError(
+                'request (%s) failed with an error - %s' %
+                (result['req_id'], result['message'])
+            )
+        else:
+            return result['result']
+
+    def check_workflow(self, workflow_id):
+        """
+        Check a workflow.
+        """
+        if workflow_id:
+            result = self.__request(WORKFLOW_CHECK, {
+                'workflow_id': workflow_id
+            })
+            if result['error']:
+                self.logger.error(
+                    'request (%s) failed with an error - %s' %
+                    (result['req_id'], result['message'])
+                )
+                raise ClientResponseError(
+                    'request (%s) failed with an error - %s' %
+                    (result['req_id'], result['message'])
+                )
+            else:
+                return result['result']
+        else:
+            self.logger.error(
+                'invalid arguments workflow_id (%s)' %
+                workflow_id
+            )
+            raise ClientInputError(
+                'invalid arguments workflow_id (%s)' %
+                workflow_id
+            )
+
+    def remove_workflow(self, workflow_id):
+        """
+        Remove a workflow.
+        """
+        if workflow_id:
+            result = self.__request(WORKFLOW_REMOVE, {
+                'workflow_id': workflow_id
+            })
+            if result['error']:
+                self.logger.error(
+                    'request (%s) failed with an error - %s' %
+                    (result['req_id'], result['message'])
+                )
+                raise ClientResponseError(
+                    'request (%s) failed with an error - %s' %
+                    (result['req_id'], result['message'])
+                )
+            else:
+                return result['result']
+        else:
+            self.logger.error(
+                'invalid arguments workflow_id (%s)' %
+                workflow_id
+            )
+            raise ClientInputError(
+                'invalid arguments workflow_id (%s)' %
+                workflow_id
+            )
+
+    def remove_workflow_run(self, workflow_id, workflow_run_id):
+        """
+        Remove a workflow run.
+        """
+        if workflow_id and workflow_run_id:
+            result = self.__request(WORKFLOW_REMOVE_RUN, {
+                'workflow_id': workflow_id,
+                'workflow_run_id': workflow_run_id
+            })
+            if result['error']:
+                self.logger.error(
+                    'request (%s) failed with an error - %s' %
+                    (result['req_id'], result['message'])
+                )
+                raise ClientResponseError(
+                    'request (%s) failed with an error - %s' %
+                    (result['req_id'], result['message'])
+                )
+            else:
+                return result['result']
+        else:
+            self.logger.error(
+                'invalid arguments workflow_id (%s) workflow_run_id (%s)' %
+                (workflow_id, workflow_run_id)
+            )
+            raise ClientInputError(
+                'invalid arguments workflow_id (%s) workflow_run_id (%s)' %
+                (workflow_id, workflow_run_id)
+            )
+
+    def notify_new_workflow_run(self, workflow_id, workflow_run_id):
+        """
+        Notify a new workflow run
+        """
+        if workflow_id and workflow_run_id:
+            result = self.__request(WORKFLOW_NOTIFY_NEW_RUN, {
+                'workflow_id': workflow_id,
+                'workflow_run_id': workflow_run_id
+            })
+            if result['error']:
+                self.logger.error(
+                    'request (%s) failed with an error - %s' %
+                    (result['req_id'], result['message'])
+                )
+                raise ClientResponseError(
+                    'request (%s) failed with an error - %s' %
+                    (result['req_id'], result['message'])
+                )
+            else:
+                return result['result']
+        else:
+            self.logger.error(
+                'invalid arguments workflow_id (%s), workflow_run_id (%s)' %
+                (workflow_id, workflow_run_id)
+            )
+            raise ClientInputError(
+                'invalid arguments workflow_id (%s), workflow_run_id (%s)' %
+                (workflow_id, workflow_run_id)
+            )
diff --git a/src/cord_workflow_controller_client/probe.py b/src/cord_workflow_controller_client/probe.py
new file mode 100644
index 0000000..0ef5288
--- /dev/null
+++ b/src/cord_workflow_controller_client/probe.py
@@ -0,0 +1,125 @@
+#!/usr/bin/env python3
+
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Workflow Probe
+
+This module implements Workflow Probe interface
+"""
+
+import json
+import socketio
+
+from .utils import get_noop_logger, gen_id
+from .errors import ClientInputError
+
+GREETING = 'cord.workflow.ctlsvc.greeting'
+
+
+class Probe(object):
+    def __init__(self, logger=None, name=None):
+        self.sio = socketio.Client()
+
+        if logger:
+            self.logger = logger
+        else:
+            self.logger = get_noop_logger()
+
+        if name:
+            self.name = name
+        else:
+            self.name = 'probe_%s' % gen_id()
+
+        # set sio handlers
+        self.logger.debug('Setting event handlers to Socket.IO')
+        self.sio.on('connect', self.__on_sio_connect)
+        self.sio.on('disconnect', self.__on_sio_disconnect)
+        self.sio.on(GREETING, self.__on_greeting_message)
+
+        self.handlers = {
+            'connect': self.__noop_connect_handler,
+            'disconnect': self.__noop_disconnect_handler
+        }
+
+    def set_logger(self, logger):
+        self.logger = logger
+
+    def get_logger(self):
+        return self.logger
+
+    def __on_sio_connect(self):
+        self.logger.debug('connected to the server')
+        handler = self.handlers['connect']
+        if callable(handler):
+            handler()
+
+    def __noop_connect_handler(self):
+        self.logger.debug('no-op connect handler')
+
+    def __on_sio_disconnect(self):
+        self.logger.debug('disconnected from the server')
+        handler = self.handlers['disconnect']
+        if callable(handler):
+            handler()
+
+    def __noop_disconnect_handler(self):
+        self.logger.debug('no-op disconnect handler')
+
+    def __on_greeting_message(self, data):
+        self.logger.debug('received a greeting message from the server')
+
+    def connect(self, url):
+        """
+        Connect to the given url
+        """
+        query_string = 'id=%s&type=probe&name=%s' % (self.name, self.name)
+        connect_url = '%s?%s' % (url, query_string)
+
+        if not (connect_url.startswith('http://') or connect_url.startswith('https://')):
+            connect_url = 'http://%s' % connect_url
+
+        self.logger.debug('Connecting to a Socket.IO server (%s)' % connect_url)
+        self.sio.connect(url=connect_url, transports=['websocket'])
+
+    def disconnect(self):
+        """
+        Disconnect from the server
+        """
+        self.sio.disconnect()
+
+    def get_handlers(self):
+        return self.handlers
+
+    def set_handlers(self, new_handlers):
+        for k in self.handlers:
+            if k in new_handlers:
+                self.handlers[k] = new_handlers[k]
+
+    def emit_event(self, event, body):
+        """
+        Emit event to Workflow Controller
+        """
+        if event and body:
+            self.sio.emit(event, body)
+        else:
+            self.logger.error(
+                'invalid arguments event(%s), body(%s)' %
+                (event, json.dumps(body))
+            )
+            raise ClientInputError(
+                'invalid arguments event(%s), body(%s)' %
+                (event, json.dumps(body))
+            )
diff --git a/src/cord_workflow_controller_client/utils.py b/src/cord_workflow_controller_client/utils.py
new file mode 100644
index 0000000..008d81f
--- /dev/null
+++ b/src/cord_workflow_controller_client/utils.py
@@ -0,0 +1,51 @@
+#!/usr/bin/env python3
+
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Utils
+"""
+
+import string
+import random
+
+
+class NoopLogger(object):
+    def __init__(self):
+        pass
+
+    def info(self, *args):
+        pass
+
+    def debug(self, *args):
+        pass
+
+    def error(self, *args):
+        pass
+
+    def warn(self, *args):
+        pass
+
+
+def get_noop_logger():
+    return NoopLogger()
+
+
+def gen_id(size=6, chars=string.ascii_uppercase + string.digits):
+    return ''.join(random.choice(chars) for _ in range(size))
+
+
+def gen_seq_id():
+    return random.randint(1010, 101010)
diff --git a/src/cord_workflow_controller_client/workflow_run.py b/src/cord_workflow_controller_client/workflow_run.py
new file mode 100644
index 0000000..9d3cf99
--- /dev/null
+++ b/src/cord_workflow_controller_client/workflow_run.py
@@ -0,0 +1,366 @@
+#!/usr/bin/env python3
+
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Workflow Run
+
+This module implements Workflow Run interface
+"""
+
+import json
+import socketio
+
+from .countdown_latch import CountDownLatch
+from .utils import get_noop_logger, gen_id, gen_seq_id
+from .errors import ClientRPCError, ClientInputError, ClientResponseError
+
+WAIT_TIMEOUT = 10  # 10 seconds
+
+GREETING = 'cord.workflow.ctlsvc.greeting'
+WORKFLOW_RUN_UPDATE_STATUS = 'cord.workflow.ctlsvc.workflow.run.status'
+WORKFLOW_RUN_COUNT_EVENTS = 'cord.workflow.ctlsvc.workflow.run.count'
+WORKFLOW_RUN_FETCH_EVENT = 'cord.workflow.ctlsvc.workflow.run.fetch'
+WORKFLOW_RUN_NOTIFY_EVENT = 'cord.workflow.ctlsvc.workflow.run.notify'
+
+
+class WorkflowRun(object):
+    def __init__(self, workflow_id, workflow_run_id, logger=None, name=None):
+        self.sio = socketio.Client()
+        self.workflow_id = workflow_id
+        self.workflow_run_id = workflow_run_id
+
+        if logger:
+            self.logger = logger
+        else:
+            self.logger = get_noop_logger()
+
+        if name:
+            self.name = name
+        else:
+            self.name = 'workflow_run_%s' % gen_id()
+
+        self.req_id = gen_seq_id()
+
+        # set sio handlers
+        self.logger.debug('Setting event handlers to Socket.IO')
+        self.sio.on('connect', self.__on_sio_connect)
+        self.sio.on('disconnect', self.__on_sio_disconnect)
+        self.sio.on(GREETING, self.__on_greeting_message)
+        self.sio.on(WORKFLOW_RUN_UPDATE_STATUS, self.__on_update_status_message)
+        self.sio.on(WORKFLOW_RUN_COUNT_EVENTS, self.__on_count_events_message)
+        self.sio.on(WORKFLOW_RUN_FETCH_EVENT, self.__on_fetch_event_message)
+        self.sio.on(WORKFLOW_RUN_NOTIFY_EVENT, self.__on_notify_event_message)
+
+        self.handlers = {
+            'connect': self.__noop_connect_handler,
+            'disconnect': self.__noop_disconnect_handler,
+            'notify': self.__noop_notify_handler
+        }
+
+        # key is req_id
+        self.pending_requests = {}
+
+    def set_logger(self, logger):
+        self.logger = logger
+
+    def get_logger(self):
+        return self.logger
+
+    def __on_sio_connect(self):
+        self.logger.debug('connected to the server')
+        handler = self.handlers['connect']
+        if callable(handler):
+            handler()
+
+    def __noop_connect_handler(self):
+        self.logger.debug('no-op connect handler')
+
+    def __on_sio_disconnect(self):
+        self.logger.debug('disconnected from the server')
+        handler = self.handlers['disconnect']
+        if callable(handler):
+            handler()
+
+    def __noop_disconnect_handler(self):
+        self.logger.debug('no-op disconnect handler')
+
+    def __noop_notify_handler(self, workflow_id, workflow_run_id, topic):
+        self.logger.debug('no-op notify handler')
+
+    def __get_next_req_id(self):
+        req_id = self.req_id
+        self.req_id += 1
+        return req_id
+
+    def __on_greeting_message(self, data):
+        self.logger.debug('received a gretting message from the server')
+
+    def __on_notify_event_message(self, data):
+        """
+        Handler for a notify event
+        REQ = {
+            'topic': <topic>
+        }
+        """
+        self.logger.info('received a notify event message from the server')
+        topic = data['topic']
+
+        self.logger.info('a notify event message - topic (%s)' % topic)
+        if topic:
+            handler = self.handlers['notify']
+            if callable(handler):
+                self.logger.info('calling a notify event handler - %s' % handler)
+                handler(self.workflow_id, self.workflow_run_id, topic)
+
+    def __on_update_status_message(self, data):
+        self.__on_response(WORKFLOW_RUN_UPDATE_STATUS, data)
+
+    def __on_count_events_message(self, data):
+        self.__on_response(WORKFLOW_RUN_COUNT_EVENTS, data)
+
+    def __on_fetch_event_message(self, data):
+        self.__on_response(WORKFLOW_RUN_FETCH_EVENT, data)
+
+    def __check_pending_request(self, req_id):
+        """
+        Check a pending request
+        """
+        if req_id in self.pending_requests:
+            return True
+        return False
+
+    def __put_pending_request(self, api, params):
+        """
+        Put a pending request to a queue
+        """
+        req_id = self.__get_next_req_id()
+        latch = CountDownLatch()
+        params['req_id'] = req_id  # inject req_id
+        self.sio.emit(api, params)
+        self.pending_requests[req_id] = {
+            'req_id': req_id,
+            'latch': latch,
+            'api': api,
+            'params': params,
+            'result': None
+        }
+        return req_id
+
+    def __wait_response(self, req_id):
+        """
+        Wait for completion of a request
+        """
+        if req_id in self.pending_requests:
+            req = self.pending_requests[req_id]
+            # python v 3.2 or below does not return a result
+            # that tells whether it is timedout or not
+            return req['latch'].wait(WAIT_TIMEOUT)
+        else:
+            self.logger.error(
+                'cannot find a pending request (%s) from a queue' % req_id
+            )
+            raise ClientRPCError(
+                req_id,
+                'cannot find a pending request (%s) from a queue' % req_id
+            )
+
+    def __complete_request(self, req_id, result):
+        """
+        Compelete a pending request
+        """
+        if req_id in self.pending_requests:
+            req = self.pending_requests[req_id]
+            req['latch'].count_down()
+            req['result'] = result
+            return
+
+        self.logger.error(
+            'cannot find a pending request (%s) from a queue' % req_id
+        )
+        raise ClientRPCError(
+            req_id,
+            'cannot find a pending request (%s) from a queue' % req_id
+        )
+
+    def __pop_pending_request(self, req_name):
+        """
+        Pop a pending request from a queue
+        """
+        return self.pending_requests.pop(req_name, None)
+
+    def connect(self, url):
+        """
+        Connect to the given url
+        """
+        query_string = 'id=%s&type=workflow_run&name=%s&workflow_id=%s&workflow_run_id=%s' % \
+            (self.name, self.name, self.workflow_id, self.workflow_run_id)
+        connect_url = '%s?%s' % (url, query_string)
+
+        if not (connect_url.startswith('http://') or connect_url.startswith('https://')):
+            connect_url = 'http://%s' % connect_url
+
+        self.logger.debug('Connecting to a Socket.IO server (%s)' % connect_url)
+        self.sio.connect(url=connect_url, transports=['websocket'])
+
+    def disconnect(self):
+        """
+        Disconnect from the server
+        """
+        self.sio.disconnect()
+
+    def wait(self):
+        self.sio.wait()
+
+    def sleep(self, sec):
+        self.sio.sleep(sec)
+
+    def get_handlers(self):
+        return self.handlers
+
+    def set_handlers(self, new_handlers):
+        for k in self.handlers:
+            if k in new_handlers:
+                self.handlers[k] = new_handlers[k]
+
+    def __request(self, api, params={}):
+        if api and params:
+            req_id = self.__put_pending_request(api, params)
+            self.logger.debug('waiting for a response for req_id (%s)' % req_id)
+            self.__wait_response(req_id)  # wait for completion
+            req = self.__pop_pending_request(req_id)
+            if req:
+                if req['latch'].get_count() > 0:
+                    # timed out
+                    self.logger.error('request (%s) timed out' % req_id)
+                    raise ClientRPCError(
+                        req_id,
+                        'request (%s) timed out' % req_id
+                    )
+                else:
+                    return req['result']
+            else:
+                self.logger.error('cannot find a pending request (%s) from a queue' % req_id)
+                raise ClientRPCError(
+                    req_id,
+                    'cannot find a pending request (%s) from a queue' % req_id
+                )
+        else:
+            self.logger.error(
+                'invalid arguments api (%s), params (%s)' %
+                (api, json.dumps(params))
+            )
+            raise ClientInputError(
+                'invalid arguments api (%s), params (%s)' %
+                (api, json.dumps(params))
+            )
+
+    def __on_response(self, api, result):
+        if result and 'req_id' in result:
+            self.logger.debug('completing a request (%s)' % result['req_id'])
+            self.__complete_request(result['req_id'], result)
+        else:
+            self.logger.error(
+                'invalid arguments api (%s), result (%s)' %
+                (api, json.dumps(result))
+            )
+            raise ClientInputError(
+                'invalid arguments api (%s), result (%s)' %
+                (api, json.dumps(result))
+            )
+
+    def update_status(self, task_id, status):
+        """
+        Update status of a workflow run.
+        """
+        if task_id and status:
+            result = self.__request(WORKFLOW_RUN_UPDATE_STATUS, {
+                'workflow_id': self.workflow_id,
+                'workflow_run_id': self.workflow_run_id,
+                'task_id': task_id,
+                'status': status
+            })
+            if result['error']:
+                self.logger.error(
+                    'request (%s) failed with an error - %s' %
+                    (result['req_id'], result['message'])
+                )
+                raise ClientResponseError(
+                    'request (%s) failed with an error - %s' %
+                    (result['req_id'], result['message'])
+                )
+            else:
+                return result['result']
+        else:
+            self.logger.error(
+                'invalid arguments task_id (%s) and status (%s)' %
+                (task_id, status)
+            )
+            raise ClientInputError(
+                'invalid arguments task_id (%s) and status (%s)' %
+                (task_id, status)
+            )
+
+    def count_events(self):
+        """
+        Count events.
+        """
+        result = self.__request(WORKFLOW_RUN_COUNT_EVENTS, {
+            'workflow_id': self.workflow_id,
+            'workflow_run_id': self.workflow_run_id
+        })
+        if result['error']:
+            self.logger.error(
+                'request (%s) failed with an error - %s' %
+                (result['req_id'], result['message'])
+            )
+            raise ClientResponseError(
+                'request (%s) failed with an error - %s' %
+                (result['req_id'], result['message'])
+            )
+        else:
+            return result['result']
+
+    def fetch_event(self, task_id, topic):
+        """
+        Fetch an event.
+        """
+        if task_id and topic:
+            result = self.__request(WORKFLOW_RUN_FETCH_EVENT, {
+                'workflow_id': self.workflow_id,
+                'workflow_run_id': self.workflow_run_id,
+                'task_id': task_id,
+                'topic': topic
+            })
+            if result['error']:
+                self.logger.error(
+                    'request (%s) failed with an error - %s' %
+                    (result['req_id'], result['message'])
+                )
+                raise ClientResponseError(
+                    'request (%s) failed with an error - %s' %
+                    (result['req_id'], result['message'])
+                )
+            else:
+                return result['result']
+        else:
+            self.logger.error(
+                'invalid arguments task_id (%s), topic (%s)' %
+                (task_id, topic)
+            )
+            raise ClientInputError(
+                'invalid arguments task_id (%s), topic (%s)' %
+                (task_id, topic)
+            )
diff --git a/test/__init__.py b/test/__init__.py
new file mode 100644
index 0000000..19d1424
--- /dev/null
+++ b/test/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/test/dummy_server.py b/test/dummy_server.py
new file mode 100644
index 0000000..9c22968
--- /dev/null
+++ b/test/dummy_server.py
@@ -0,0 +1,636 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+import socketio
+import psutil
+import time
+import datetime
+
+from threading import Timer
+from gevent import pywsgi
+from geventwebsocket.handler import WebSocketHandler
+from multiprocessing import Process
+from multistructlog import create_logger
+from cord_workflow_controller_client.probe import GREETING
+from cord_workflow_controller_client.manager \
+    import (WORKFLOW_KICKSTART,
+            WORKFLOW_REGISTER, WORKFLOW_REGISTER_ESSENCE, WORKFLOW_LIST, WORKFLOW_LIST_RUN,
+            WORKFLOW_CHECK, WORKFLOW_REMOVE, WORKFLOW_REMOVE_RUN, WORKFLOW_NOTIFY_NEW_RUN)
+from cord_workflow_controller_client.workflow_run \
+    import (WORKFLOW_RUN_NOTIFY_EVENT,
+            WORKFLOW_RUN_UPDATE_STATUS, WORKFLOW_RUN_COUNT_EVENTS, WORKFLOW_RUN_FETCH_EVENT)
+
+
+"""
+Run a dummy socket.io server as a separate process.
+serve_forever() blocks until the process is killed,
+so I had to use multi-process approach.
+"""
+
+log = create_logger()
+
+# Socket IO
+sio = None
+
+manager_clients = {}
+workflows = {}
+workflow_essences = {}
+workflow_runs = {}
+workflow_run_clients = {}
+seq_no = 1
+
+
+class repeatableTimer():
+    def __init__(self, time, handler, arg):
+        self.time = time
+        self.handler = handler
+        self.arg = arg
+        self.thread = Timer(self.time, self.on_tick)
+
+    def on_tick(self):
+        self.handler(self.arg)
+        self.thread = Timer(self.time, self.on_tick)
+        self.thread.start()
+
+    def start(self):
+        self.thread.start()
+
+    def cancel(self):
+        self.thread.cancel()
+
+
+def make_query_string_dict(query_string):
+    obj = {}
+    params = query_string.split('&')
+    for param in params:
+        kv = param.split('=')
+        key = kv[0]
+        val = kv[1]
+        obj[key] = val
+
+    return obj
+
+
+def _send_kickstart_event(sid):
+    global seq_no
+
+    workflow_id = 'dummy_workflow_%d' % seq_no
+    workflow_run_id = 'dummy_workflow_run_%d' % seq_no
+
+    seq_no += 1
+    log.info('sending a kickstart event to sid %s' % sid)
+    sio.emit(
+        event=WORKFLOW_KICKSTART,
+        data={
+            'workflow_id': workflow_id,
+            'workflow_run_id': workflow_run_id,
+            'timestamp': str(datetime.datetime.now())
+        },
+        room=sid
+    )
+
+
+def _send_notify_event(sid):
+    global seq_no
+
+    topic = 'topic_%s' % seq_no
+    message = {
+        'sample_key': 'sample_value'
+    }
+    seq_no += 1
+
+    run_client = workflow_run_clients[sid]
+    if run_client:
+        workflow_run_id = run_client['workflow_run_id']
+        workflow_run = workflow_runs[workflow_run_id]
+        if workflow_run:
+            workflow_run['queue'].append({
+                'topic': topic,
+                'message': message
+            })
+
+            log.info('sending a notify event to sid %s' % sid)
+            sio.emit(
+                event=WORKFLOW_RUN_NOTIFY_EVENT,
+                data={
+                    'topic': topic,
+                    'timestamp': str(datetime.datetime.now())
+                },
+                room=sid
+            )
+
+
+def _handle_event_connect(sid, query):
+    sio.emit(GREETING, {})
+
+    global last_client_action_time
+    last_client_action_time = datetime.datetime.now
+
+    # if the client is a manager, send kickstart events every 3 sec
+    if query['type'] == 'workflow_manager':
+        log.info('manager (%s) is connected' % sid)
+        kickstart_timer = repeatableTimer(2, _send_kickstart_event, sid)
+        manager_clients[sid] = {
+            'kickstart_timer': kickstart_timer
+        }
+
+        kickstart_timer.start()
+    elif query['type'] == 'workflow_run':
+        log.info('workflow run (%s) is connected' % sid)
+        notify_event_timer = repeatableTimer(2, _send_notify_event, sid)
+        workflow_run_clients[sid] = {
+            'workflow_id': query['workflow_id'],
+            'workflow_run_id': query['workflow_run_id'],
+            'notify_event_timer': notify_event_timer
+        }
+
+        notify_event_timer.start()
+
+
+def _handle_event_disconnect(sid):
+    if sid in manager_clients:
+        log.info('manager (%s) is disconnected' % sid)
+        if manager_clients[sid]['kickstart_timer']:
+            manager_clients[sid]['kickstart_timer'].cancel()
+
+        del manager_clients[sid]
+
+    if sid in workflow_run_clients:
+        log.info('workflow run (%s) is disconnected' % sid)
+        if workflow_run_clients[sid]['notify_event_timer']:
+            workflow_run_clients[sid]['notify_event_timer'].cancel()
+
+        del workflow_run_clients[sid]
+
+    global last_client_action_time
+    last_client_action_time = datetime.datetime.now
+
+
+def _get_req_id(body):
+    req_id = 101010
+    if 'req_id' in body:
+        req_id = int(body['req_id'])
+    return req_id
+
+
+def _handle_event_workflow_reg(sid, body):
+    data = {
+        'req_id': _get_req_id(body)
+    }
+
+    if 'workflow' in body:
+        workflow = body['workflow']
+        workflow_id = workflow['id']
+
+        if workflow_id in workflows:
+            # already exist
+            data['error'] = True
+            data['result'] = False
+            data['message'] = 'workflow is already registered'
+        else:
+            log.info('manager (%s) registers a workflow (%s)' % (sid, workflow_id))
+            workflows[workflow_id] = workflow
+
+            data['error'] = False
+            data['result'] = True
+    else:
+        data['error'] = True
+        data['result'] = False
+        data['message'] = 'workflow is not in the message body'
+
+    log.info('returning a result for workflow register event to sid %s' % sid)
+    sio.emit(
+        event=WORKFLOW_REGISTER,
+        data=data,
+        room=sid
+    )
+
+
+def _handle_event_workflow_reg_essence(sid, body):
+    data = {
+        'req_id': _get_req_id(body)
+    }
+
+    if 'essence' in body:
+        essence = body['essence']
+        for wid in essence:
+            workflow_essence = essence[wid]
+            if 'dag' in workflow_essence and 'dag_id' in workflow_essence['dag']:
+                dag = workflow_essence['dag']
+                workflow_id = dag['dag_id']
+
+                if workflow_id in workflow_essences or workflow_id in workflows:
+                    # already exist
+                    data['error'] = True
+                    data['result'] = False
+                    data['message'] = 'workflow is already registered'
+                else:
+                    log.info('manager (%s) registers a workflow (%s)' % (sid, workflow_id))
+                    workflow_essences[workflow_id] = workflow_essence
+
+                    data['error'] = False
+                    data['result'] = True
+            else:
+                data['error'] = True
+                data['result'] = False
+                data['message'] = 'essence is not in the message body'
+    else:
+        data['error'] = True
+        data['result'] = False
+        data['message'] = 'essence is not in the message body'
+
+    log.info('returning a result for workflow essence register event to sid %s' % sid)
+    sio.emit(
+        event=WORKFLOW_REGISTER_ESSENCE,
+        data=data,
+        room=sid
+    )
+
+
+def _handle_event_workflow_list(sid, body):
+    data = {
+        'req_id': _get_req_id(body)
+    }
+
+    workflow_ids = []
+
+    for workflow_id in workflows:
+        workflow_ids.append(workflow_id)
+
+    for workflow_id in workflow_essences:
+        workflow_ids.append(workflow_id)
+
+    data['error'] = False
+    data['result'] = workflow_ids
+
+    log.info('returning a result for workflow list event to sid %s' % sid)
+    sio.emit(
+        event=WORKFLOW_LIST,
+        data=data,
+        room=sid
+    )
+
+
+def _handle_event_workflow_run_list(sid, body):
+    data = {
+        'req_id': _get_req_id(body)
+    }
+
+    workflow_run_ids = []
+
+    for workflow_run_id in workflow_runs:
+        workflow_run_ids.append(workflow_run_id)
+
+    data['error'] = False
+    data['result'] = workflow_run_ids
+
+    log.info('returning a result for workflow run list event to sid %s' % sid)
+    sio.emit(
+        event=WORKFLOW_LIST_RUN,
+        data=data,
+        room=sid
+    )
+
+
+def _handle_event_workflow_check(sid, body):
+    data = {
+        'req_id': _get_req_id(body)
+    }
+
+    if 'workflow_id' in body:
+        workflow_id = body['workflow_id']
+        if workflow_id in workflows:
+            data['error'] = False
+            data['result'] = True
+        else:
+            data['error'] = False
+            data['result'] = False
+    else:
+        data['error'] = True
+        data['result'] = False
+        data['message'] = 'workflow_id is not in the message body'
+
+    log.info('returning a result for workflow check event to sid %s' % sid)
+    sio.emit(
+        event=WORKFLOW_CHECK,
+        data=data,
+        room=sid
+    )
+
+
+def _handle_event_workflow_remove(sid, body):
+    data = {
+        'req_id': _get_req_id(body)
+    }
+
+    if 'workflow_id' in body:
+        workflow_id = body['workflow_id']
+        if workflow_id in workflows:
+
+            hasWorkflowRuns = False
+            for workflow_run_id in workflow_runs:
+                workflow_run = workflow_runs[workflow_run_id]
+                wid = workflow_run['workflow_id']
+                if wid == workflow_id:
+                    # there is a workflow run for the workflow id
+                    hasWorkflowRuns = True
+                    break
+
+            if hasWorkflowRuns:
+                data['error'] = False
+                data['result'] = False
+            else:
+                del workflows[workflow_id]
+
+                data['error'] = False
+                data['result'] = True
+        else:
+            data['error'] = False
+            data['result'] = False
+    else:
+        data['error'] = True
+        data['result'] = False
+        data['message'] = 'workflow_id is not in the message body'
+
+    log.info('returning a result for workflow remove event to sid %s' % sid)
+    sio.emit(
+        event=WORKFLOW_REMOVE,
+        data=data,
+        room=sid
+    )
+
+
+def _handle_event_workflow_run_remove(sid, body):
+    data = {
+        'req_id': _get_req_id(body)
+    }
+
+    if 'workflow_id' in body and 'workflow_run_id' in body:
+        # workflow_id = body['workflow_id']
+        workflow_run_id = body['workflow_run_id']
+
+        if workflow_run_id in workflow_runs:
+            del workflow_runs[workflow_run_id]
+
+            data['error'] = False
+            data['result'] = True
+        else:
+            data['error'] = False
+            data['result'] = False
+    else:
+        data['error'] = True
+        data['result'] = False
+        data['message'] = 'workflow_id or workflow_run_id is not in the message body'
+
+    log.info('returning a result for workflow run remove event to sid %s' % sid)
+    sio.emit(
+        event=WORKFLOW_REMOVE_RUN,
+        data=data,
+        room=sid
+    )
+
+
+def _handle_event_new_workflow_run(sid, body):
+    data = {
+        'req_id': _get_req_id(body)
+    }
+
+    if 'workflow_id' in body and 'workflow_run_id' in body:
+        workflow_id = body['workflow_id']
+        workflow_run_id = body['workflow_run_id']
+
+        log.info('manager (%s) started a new workflow (%s), workflow_run (%s)' % (sid, workflow_id, workflow_run_id))
+        workflow_runs[workflow_run_id] = {
+            'workflow_id': workflow_id,
+            'workflow_run_id': workflow_run_id,
+            'queue': []
+        }
+
+        data['error'] = False
+        data['result'] = True
+    else:
+        data['error'] = True
+        data['result'] = False
+        data['message'] = 'workflow_id or workflow_run_id is not in the message body'
+
+    log.info('returning a result for a new workflow run event to sid %s' % sid)
+    sio.emit(
+        event=WORKFLOW_NOTIFY_NEW_RUN,
+        data=data,
+        room=sid
+    )
+
+
+def _handle_event_workflow_run_update_status(sid, body):
+    data = {
+        'req_id': _get_req_id(body)
+    }
+
+    if 'workflow_id' in body and 'workflow_run_id' in body and 'task_id' in body and 'status' in body:
+        # workflow_id = body['workflow_id']
+        workflow_run_id = body['workflow_run_id']
+        task_id = body['task_id']
+        status = body['status']
+
+        if workflow_run_id in workflow_runs:
+            workflow_run = workflow_runs[workflow_run_id]
+            workflow_run[task_id] = status
+
+            data['error'] = False
+            data['result'] = True
+        else:
+            data['error'] = True
+            data['result'] = False
+            data['message'] = 'cannot find workflow run'
+    else:
+        data['error'] = True
+        data['result'] = False
+        data['message'] = 'workflow_id, workflow_run_id, task_id or status is not in the message body'
+
+    log.info('returning a result for workflow run update status event to sid %s' % sid)
+    sio.emit(
+        event=WORKFLOW_RUN_UPDATE_STATUS,
+        data=data,
+        room=sid
+    )
+
+
+def _handle_event_workflow_run_count_events(sid, body):
+    data = {
+        'req_id': _get_req_id(body)
+    }
+
+    if 'workflow_id' in body and 'workflow_run_id' in body:
+        # workflow_id = body['workflow_id']
+        workflow_run_id = body['workflow_run_id']
+
+        if workflow_run_id in workflow_runs:
+            workflow_run = workflow_runs[workflow_run_id]
+            queue = workflow_run['queue']
+            count = len(queue)
+
+            data['error'] = False
+            data['result'] = count
+        else:
+            data['error'] = True
+            data['result'] = 0
+            data['message'] = 'cannot find workflow run'
+    else:
+        data['error'] = True
+        data['result'] = 0
+        data['message'] = 'workflow_id, workflow_run_id, task_id or status is not in the message body'
+
+    log.info('returning a result for workflow run count events to sid %s' % sid)
+    sio.emit(
+        event=WORKFLOW_RUN_COUNT_EVENTS,
+        data=data,
+        room=sid
+    )
+
+
+def _handle_event_workflow_run_fetch_event(sid, body):
+    data = {
+        'req_id': _get_req_id(body)
+    }
+
+    if 'workflow_id' in body and 'workflow_run_id' in body and 'task_id' in body and 'topic' in body:
+        # workflow_id = body['workflow_id']
+        workflow_run_id = body['workflow_run_id']
+        # task_id = body['task_id']
+        topic = body['topic']
+
+        if workflow_run_id in workflow_runs:
+            workflow_run = workflow_runs[workflow_run_id]
+            queue = workflow_run['queue']
+
+            event = None
+            for idx in range(len(queue)):
+                if queue[idx]['topic'] == topic:
+                    # found
+                    event = queue.pop(idx)
+                    break
+
+            if event:
+                data['error'] = False
+                data['result'] = event
+            else:
+                data['error'] = False
+                data['result'] = {}
+        else:
+            data['error'] = False
+            data['result'] = False
+            data['message'] = 'cannot find workflow run'
+    else:
+        data['error'] = True
+        data['result'] = False
+        data['message'] = 'workflow_id, workflow_run_id, task_id or topic is not in the message body'
+
+    log.info('returning a result for workflow run fetch event to sid %s' % sid)
+    sio.emit(
+        event=WORKFLOW_RUN_FETCH_EVENT,
+        data=data,
+        room=sid
+    )
+
+
+def _handle_event(event, sid, body):
+    log.info('event %s - body %s (%s)' % (event, body, type(body)))
+
+
+class ServerEventHandler(socketio.namespace.Namespace):
+    def trigger_event(self, event, *args):
+        sid = args[0]
+        if event == 'connect':
+            querystr = args[1]['QUERY_STRING']
+            query = make_query_string_dict(querystr)
+            _handle_event_connect(sid, query)
+        elif event == 'disconnect':
+            _handle_event_disconnect(sid)
+
+        # manager
+        elif event == WORKFLOW_NOTIFY_NEW_RUN:
+            _handle_event_new_workflow_run(sid, args[1])
+        elif event == WORKFLOW_REGISTER_ESSENCE:
+            _handle_event_workflow_reg_essence(sid, args[1])
+        elif event == WORKFLOW_REGISTER:
+            _handle_event_workflow_reg(sid, args[1])
+        elif event == WORKFLOW_LIST:
+            _handle_event_workflow_list(sid, args[1])
+        elif event == WORKFLOW_LIST_RUN:
+            _handle_event_workflow_run_list(sid, args[1])
+        elif event == WORKFLOW_CHECK:
+            _handle_event_workflow_check(sid, args[1])
+        elif event == WORKFLOW_REMOVE:
+            _handle_event_workflow_remove(sid, args[1])
+        elif event == WORKFLOW_REMOVE_RUN:
+            _handle_event_workflow_run_remove(sid, args[1])
+
+        # workflow run
+        elif event == WORKFLOW_RUN_UPDATE_STATUS:
+            _handle_event_workflow_run_update_status(sid, args[1])
+        elif event == WORKFLOW_RUN_COUNT_EVENTS:
+            _handle_event_workflow_run_count_events(sid, args[1])
+        elif event == WORKFLOW_RUN_FETCH_EVENT:
+            _handle_event_workflow_run_fetch_event(sid, args[1])
+        else:
+            _handle_event(event, args[0], args[1])
+
+
+def _run(port):
+    global sio
+    sio = socketio.Server(ping_timeout=5, ping_interval=1)
+    app = socketio.WSGIApp(sio)
+    sio.register_namespace(ServerEventHandler('/'))
+
+    server = pywsgi.WSGIServer(
+        ('', port),
+        app,
+        handler_class=WebSocketHandler
+    )
+
+    server.serve_forever()
+
+
+def start(port):
+    p = Process(target=_run, args=(port, ))
+    p.start()
+    time.sleep(3)
+
+    log.info('Dummy server is started!')
+    return p
+
+
+def stop(p):
+    log.info('Stopping dummy server!')
+
+    try:
+        process = psutil.Process(p.pid)
+        for proc in process.children(recursive=True):
+            proc.kill()
+        process.kill()
+        p.join()
+    except psutil.NoSuchProcess:
+        pass
+
+    # clean-up
+    global sio, manager_clients, workflow_runs, seq_no
+    sio = None
+    manager_clients = {}
+    workflow_runs = {}
+    seq_no = 1
+
+    time.sleep(3)
+
+    log.info('Dummy server is stopped!')
diff --git a/test/dummy_server_util.py b/test/dummy_server_util.py
new file mode 100644
index 0000000..7cc60db
--- /dev/null
+++ b/test/dummy_server_util.py
@@ -0,0 +1,39 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+import atexit
+from .dummy_server import stop as server_stop
+
+dummy_servers = {}
+
+
+def cleanup_dummy_servers():
+    for pid in dummy_servers:
+        s = dummy_servers[pid]
+        server_stop(s)
+        del dummy_servers[pid]
+
+
+def register_dummy_server_cleanup(s):
+    if s.pid not in dummy_servers:
+        dummy_servers[s.pid] = s
+
+
+def unregister_dummy_server_cleanup(s):
+    if s.pid in dummy_servers:
+        del dummy_servers[s.pid]
+
+
+atexit.register(cleanup_dummy_servers)
diff --git a/test/hello_workflow.json b/test/hello_workflow.json
new file mode 100644
index 0000000..9de71bc
--- /dev/null
+++ b/test/hello_workflow.json
@@ -0,0 +1,25 @@
+{
+    "hello_workflow": {
+        "dag": {
+            "dag_id": "hello_workflow",
+            "local_variable": "dag_hello"
+        },
+        "dependencies": {
+            "onu_event_handler": {}
+        },
+        "tasks": {
+            "onu_event_handler": {
+                "class": "XOSEventSensor",
+                "dag_id": "hello_workflow",
+                "dag": "dag_hello",
+                "key_field": "serialNumber",
+                "local_variable": "onu_event_handler",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "ONU_event",
+                "task_id": "onu_event_handler",
+                "topic": "onu.events"
+            }
+        }
+    }
+}
diff --git a/test/test_manager.py b/test/test_manager.py
new file mode 100644
index 0000000..d35a031
--- /dev/null
+++ b/test/test_manager.py
@@ -0,0 +1,120 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+import unittest
+import time
+import os
+import json
+from cord_workflow_controller_client.manager import Manager
+from multistructlog import create_logger
+from .dummy_server import start as server_start, stop as server_stop
+from .dummy_server_util import register_dummy_server_cleanup, unregister_dummy_server_cleanup
+
+log = create_logger()
+code_dir = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+
+
+def read_json_file(filename):
+    if filename:
+        with open(filename, 'r') as f:
+            return json.load(f)
+    return None
+
+
+class TestManager(unittest.TestCase):
+    """
+    Try to connect to a local fake Controller Service as a Manager.
+    """
+
+    def setUp(self):
+        self.server = server_start(17080)
+        self.kickstarted_workflows = {}
+        register_dummy_server_cleanup(self.server)
+
+    def tearDown(self):
+        server_stop(self.server)
+        unregister_dummy_server_cleanup(self.server)
+        self.server = None
+        self.kickstarted_workflows = {}
+
+    def test_connect(self):
+        """
+        This tests if Manager client can connect to a socket.io server properly.
+        """
+        succeed = False
+        try:
+            manager = Manager(logger=log)
+            manager.connect('http://localhost:17080')
+
+            time.sleep(1)
+
+            manager.disconnect()
+            succeed = True
+        finally:
+            self.assertTrue(succeed, 'Finished with error')
+
+    def test_kickstart(self):
+        """
+        This tests if Manager client can receive a kickstart event.
+        """
+        succeed = False
+
+        try:
+            manager = Manager(logger=log)
+            manager.connect('http://localhost:17080')
+
+            def on_kickstart(workflow_id, workflow_run_id):
+                self.kickstarted_workflows[workflow_id] = {
+                    'workflow_id': workflow_id,
+                    'workflow_run_id': workflow_run_id
+                }
+                manager.notify_new_workflow_run(workflow_id, workflow_run_id)
+
+            manager.set_handlers({'kickstart': on_kickstart})
+
+            # dummy server sends a kickstart message for every 2 seconds
+            # we wait 6 seconds to receive at least 2 messages
+            time.sleep(6)
+
+            manager.disconnect()
+            succeed = True
+        finally:
+            self.assertTrue(succeed, 'Finished with error')
+            self.assertTrue(len(self.kickstarted_workflows) >= 2, 'Kickstart event is not handled')
+
+    def test_workflow_essence_register(self):
+        """
+        This tests if Manager client can register workflow essence.
+        """
+        succeed = False
+        essence_path = os.path.join(code_dir, "hello_workflow.json")
+        essence = read_json_file(essence_path)
+
+        try:
+            manager = Manager(logger=log)
+            manager.connect('http://localhost:17080')
+
+            # the command is synchronous
+            result = manager.register_workflow_essence(essence)
+
+            manager.disconnect()
+            succeed = True
+        finally:
+            self.assertTrue(succeed, 'Finished with error')
+            self.assertTrue(result, 'workflow essence register failed')
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/test/test_probe.py b/test/test_probe.py
new file mode 100644
index 0000000..3f6baed
--- /dev/null
+++ b/test/test_probe.py
@@ -0,0 +1,95 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+import unittest
+import time
+from cord_workflow_controller_client.probe import Probe
+from multistructlog import create_logger
+from .dummy_server import start as server_start, stop as server_stop
+
+log = create_logger()
+
+
+class TestProbe(unittest.TestCase):
+    """
+    Try to connect to a local fake Controller Service as a Probe.
+    """
+
+    def setUp(self):
+        self.server = server_start(17080)
+
+    def tearDown(self):
+        server_stop(self.server)
+        self.server = None
+
+    def test_connect(self):
+        """
+        This tests if Probe client can connect to a socket.io server properly.
+        """
+        succeed = False
+        try:
+            probe = Probe(logger=log)
+            probe.connect('http://localhost:17080')
+
+            time.sleep(1)
+
+            probe.disconnect()
+            succeed = True
+        finally:
+            self.assertTrue(succeed, 'Finished with error')
+
+    def test_emit_string(self):
+        """
+        This tests if Probe client can emit an event.
+        """
+        succeed = False
+        try:
+            probe = Probe(logger=log)
+            probe.connect('http://localhost:17080')
+
+            probe.emit_event('xos.test.event', 'string message - hello')
+            time.sleep(1)
+
+            probe.disconnect()
+            succeed = True
+        finally:
+            self.assertTrue(succeed, 'Finished with error')
+
+    def test_emit_json(self):
+        """
+        This tests if Probe client can emit an event with a dict (json) object.
+        """
+        succeed = False
+        try:
+            probe = Probe(logger=log)
+            probe.connect('http://localhost:17080')
+
+            probe.emit_event(
+                'xos.test.event',
+                {
+                    'str_key': 'value',
+                    'int_key': 32335
+                }
+            )
+            time.sleep(1)
+
+            probe.disconnect()
+            succeed = True
+        finally:
+            self.assertTrue(succeed, 'Finished with error')
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/test/test_workflow_run.py b/test/test_workflow_run.py
new file mode 100644
index 0000000..80d6fcf
--- /dev/null
+++ b/test/test_workflow_run.py
@@ -0,0 +1,179 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+import unittest
+import time
+import os
+import json
+from cord_workflow_controller_client.manager import Manager
+from cord_workflow_controller_client.workflow_run import WorkflowRun
+from multistructlog import create_logger
+from .dummy_server import start as server_start, stop as server_stop
+
+log = create_logger()
+code_dir = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+
+
+def read_json_file(filename):
+    if filename:
+        with open(filename, 'r') as f:
+            return json.load(f)
+    return None
+
+
+class TestWorkflowRun(unittest.TestCase):
+    """
+    Try to connect to a local fake Controller Service as a Manager.
+    """
+
+    def setUp(self):
+        self.kickstarted_workflows = {}
+        self.notifications = []
+
+        self.server = server_start(17080)
+        self.manager = Manager(logger=log)
+        self.manager.connect('http://localhost:17080')
+
+        essence_path = os.path.join(code_dir, "hello_workflow.json")
+        essence = read_json_file(essence_path)
+        self.manager.register_workflow_essence(essence)
+        self.manager.notify_new_workflow_run('hello_workflow', 'hello_workflow_123')
+
+        # wait for 2 seconds for registering a new workflow run
+        time.sleep(2)
+
+    def tearDown(self):
+        self.manager.disconnect()
+        self.manager = None
+
+        server_stop(self.server)
+        self.server = None
+
+        self.kickstarted_workflows = {}
+        self.notifications = []
+
+    def test_connect(self):
+        """
+        This tests if workflow run client can connect to a socket.io server properly.
+        """
+        succeed = False
+        try:
+            run = WorkflowRun('hello_workflow', 'hello_workflow_123')
+            run.connect('http://localhost:17080')
+
+            time.sleep(1)
+
+            run.disconnect()
+            succeed = True
+        finally:
+            self.assertTrue(succeed, 'Finished with error')
+
+    def test_count_events(self):
+        """
+        This tests if workflow run client can retrieve the number of events.
+        """
+        succeed = False
+        try:
+            run = WorkflowRun('hello_workflow', 'hello_workflow_123')
+            run.connect('http://localhost:17080')
+
+            # dummy server generates a message for every 2 seconds
+            # we wait 6 seconds to queue at least 2 messages
+            time.sleep(6)
+
+            count = run.count_events()
+
+            run.disconnect()
+            succeed = True
+        finally:
+            self.assertTrue(succeed, 'Finished with error')
+            self.assertTrue(count >= 2, 'There must be more than 2 events queued')
+
+    def test_notify_event(self):
+        """
+        This tests if workflow run client can get a noficitation for events.
+        """
+        succeed = False
+        try:
+            run = WorkflowRun('hello_workflow', 'hello_workflow_123')
+            run.connect('http://localhost:17080')
+
+            def on_notification(workflow_id, workflow_run_id, topic):
+                self.notifications.append({
+                    'workflow_id': workflow_id,
+                    'workflow_run_id': workflow_run_id,
+                    'topic': topic
+                })
+
+            run.set_handlers({'notify': on_notification})
+
+            # dummy server generates a message for every 2 seconds
+            # we wait 6 seconds to get at least 2 notifications
+            time.sleep(6)
+
+            count = len(self.notifications)
+
+            run.disconnect()
+            succeed = True
+        finally:
+            self.assertTrue(succeed, 'Finished with error')
+            self.assertTrue(count >= 2, 'There must be more than 2 notifications received')
+
+    def test_get_events(self):
+        """
+        This tests if workflow run client can retrieve events.
+        """
+        succeed = False
+        try:
+            run = WorkflowRun('hello_workflow', 'hello_workflow_123')
+            run.connect('http://localhost:17080')
+
+            def on_notification(workflow_id, workflow_run_id, topic):
+                self.notifications.append({
+                    'workflow_id': workflow_id,
+                    'workflow_run_id': workflow_run_id,
+                    'topic': topic
+                })
+
+            run.set_handlers({'notify': on_notification})
+
+            # dummy server generates a message for every 2 seconds
+            # we wait 6 seconds to queue at least 2 messages
+            time.sleep(6)
+
+            count_notified = len(self.notifications)
+            count_queued = run.count_events()
+
+            self.assertTrue(count_notified >= 2, 'There must be more than 2 events notified')
+            self.assertTrue(count_queued >= 2, 'There must be more than 2 events queued')
+
+            # count_notified and count_queued may not have the same number temporarily
+            for i in range(count_notified):
+                notification = self.notifications.pop(0)
+                topic = notification['topic']
+                event = run.fetch_event('task123', topic)
+
+            self.assertTrue('topic' in event, 'event should not be empty')
+            self.assertTrue(event['topic'] == topic, 'event should be retrieved by topic')
+            self.assertTrue(len(event['message']) > 0, 'there must be some messages')
+
+            run.disconnect()
+            succeed = True
+        finally:
+            self.assertTrue(succeed, 'Finished with error')
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/tox.ini b/tox.ini
new file mode 100644
index 0000000..124ec43
--- /dev/null
+++ b/tox.ini
@@ -0,0 +1,53 @@
+; Copyright 2019-present Open Networking Foundation
+;
+; Licensed under the Apache License, Version 2.0 (the "License");
+; you may not use this file except in compliance with the License.
+; You may obtain a copy of the License at
+;
+; http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing, software
+; distributed under the License is distributed on an "AS IS" BASIS,
+; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+; See the License for the specific language governing permissions and
+; limitations under the License.
+
+[tox]
+envlist = py27,py35,py36,py37
+skip_missing_interpreters = true
+skipsdist = True
+
+[testenv]
+deps =
+  -r requirements.txt
+  nose2
+  flake8
+  gevent
+  gevent-websocket
+  multistructlog
+  psutil
+
+commands =
+  nose2 -c tox.ini --verbose --junit-xml
+  flake8
+
+[flake8]
+max-line-length = 119
+exclude =
+  .tox
+  build
+
+[unittest]
+plugins = nose2.plugins.junitxml
+
+[junit-xml]
+path = nose2-results.xml
+
+[coverage]
+always-on = True
+coverage =
+  src
+  test
+coverage-report =
+  term
+  xml
\ No newline at end of file