Implement controller client
- Probe, Manager, WorkflowRun classes are provided to interact with CORD Workflow Controller
Change-Id: I0ad8d3661864635d9701eab1cb089cb17f81cd50
diff --git a/test/__init__.py b/test/__init__.py
new file mode 100644
index 0000000..19d1424
--- /dev/null
+++ b/test/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/test/dummy_server.py b/test/dummy_server.py
new file mode 100644
index 0000000..9c22968
--- /dev/null
+++ b/test/dummy_server.py
@@ -0,0 +1,636 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+import socketio
+import psutil
+import time
+import datetime
+
+from threading import Timer
+from gevent import pywsgi
+from geventwebsocket.handler import WebSocketHandler
+from multiprocessing import Process
+from multistructlog import create_logger
+from cord_workflow_controller_client.probe import GREETING
+from cord_workflow_controller_client.manager \
+ import (WORKFLOW_KICKSTART,
+ WORKFLOW_REGISTER, WORKFLOW_REGISTER_ESSENCE, WORKFLOW_LIST, WORKFLOW_LIST_RUN,
+ WORKFLOW_CHECK, WORKFLOW_REMOVE, WORKFLOW_REMOVE_RUN, WORKFLOW_NOTIFY_NEW_RUN)
+from cord_workflow_controller_client.workflow_run \
+ import (WORKFLOW_RUN_NOTIFY_EVENT,
+ WORKFLOW_RUN_UPDATE_STATUS, WORKFLOW_RUN_COUNT_EVENTS, WORKFLOW_RUN_FETCH_EVENT)
+
+
+"""
+Run a dummy socket.io server as a separate process.
+serve_forever() blocks until the process is killed,
+so I had to use multi-process approach.
+"""
+
+log = create_logger()
+
+# Socket IO
+sio = None
+
+manager_clients = {}
+workflows = {}
+workflow_essences = {}
+workflow_runs = {}
+workflow_run_clients = {}
+seq_no = 1
+
+
+class repeatableTimer():
+ def __init__(self, time, handler, arg):
+ self.time = time
+ self.handler = handler
+ self.arg = arg
+ self.thread = Timer(self.time, self.on_tick)
+
+ def on_tick(self):
+ self.handler(self.arg)
+ self.thread = Timer(self.time, self.on_tick)
+ self.thread.start()
+
+ def start(self):
+ self.thread.start()
+
+ def cancel(self):
+ self.thread.cancel()
+
+
+def make_query_string_dict(query_string):
+ obj = {}
+ params = query_string.split('&')
+ for param in params:
+ kv = param.split('=')
+ key = kv[0]
+ val = kv[1]
+ obj[key] = val
+
+ return obj
+
+
+def _send_kickstart_event(sid):
+ global seq_no
+
+ workflow_id = 'dummy_workflow_%d' % seq_no
+ workflow_run_id = 'dummy_workflow_run_%d' % seq_no
+
+ seq_no += 1
+ log.info('sending a kickstart event to sid %s' % sid)
+ sio.emit(
+ event=WORKFLOW_KICKSTART,
+ data={
+ 'workflow_id': workflow_id,
+ 'workflow_run_id': workflow_run_id,
+ 'timestamp': str(datetime.datetime.now())
+ },
+ room=sid
+ )
+
+
+def _send_notify_event(sid):
+ global seq_no
+
+ topic = 'topic_%s' % seq_no
+ message = {
+ 'sample_key': 'sample_value'
+ }
+ seq_no += 1
+
+ run_client = workflow_run_clients[sid]
+ if run_client:
+ workflow_run_id = run_client['workflow_run_id']
+ workflow_run = workflow_runs[workflow_run_id]
+ if workflow_run:
+ workflow_run['queue'].append({
+ 'topic': topic,
+ 'message': message
+ })
+
+ log.info('sending a notify event to sid %s' % sid)
+ sio.emit(
+ event=WORKFLOW_RUN_NOTIFY_EVENT,
+ data={
+ 'topic': topic,
+ 'timestamp': str(datetime.datetime.now())
+ },
+ room=sid
+ )
+
+
+def _handle_event_connect(sid, query):
+ sio.emit(GREETING, {})
+
+ global last_client_action_time
+ last_client_action_time = datetime.datetime.now
+
+ # if the client is a manager, send kickstart events every 3 sec
+ if query['type'] == 'workflow_manager':
+ log.info('manager (%s) is connected' % sid)
+ kickstart_timer = repeatableTimer(2, _send_kickstart_event, sid)
+ manager_clients[sid] = {
+ 'kickstart_timer': kickstart_timer
+ }
+
+ kickstart_timer.start()
+ elif query['type'] == 'workflow_run':
+ log.info('workflow run (%s) is connected' % sid)
+ notify_event_timer = repeatableTimer(2, _send_notify_event, sid)
+ workflow_run_clients[sid] = {
+ 'workflow_id': query['workflow_id'],
+ 'workflow_run_id': query['workflow_run_id'],
+ 'notify_event_timer': notify_event_timer
+ }
+
+ notify_event_timer.start()
+
+
+def _handle_event_disconnect(sid):
+ if sid in manager_clients:
+ log.info('manager (%s) is disconnected' % sid)
+ if manager_clients[sid]['kickstart_timer']:
+ manager_clients[sid]['kickstart_timer'].cancel()
+
+ del manager_clients[sid]
+
+ if sid in workflow_run_clients:
+ log.info('workflow run (%s) is disconnected' % sid)
+ if workflow_run_clients[sid]['notify_event_timer']:
+ workflow_run_clients[sid]['notify_event_timer'].cancel()
+
+ del workflow_run_clients[sid]
+
+ global last_client_action_time
+ last_client_action_time = datetime.datetime.now
+
+
+def _get_req_id(body):
+ req_id = 101010
+ if 'req_id' in body:
+ req_id = int(body['req_id'])
+ return req_id
+
+
+def _handle_event_workflow_reg(sid, body):
+ data = {
+ 'req_id': _get_req_id(body)
+ }
+
+ if 'workflow' in body:
+ workflow = body['workflow']
+ workflow_id = workflow['id']
+
+ if workflow_id in workflows:
+ # already exist
+ data['error'] = True
+ data['result'] = False
+ data['message'] = 'workflow is already registered'
+ else:
+ log.info('manager (%s) registers a workflow (%s)' % (sid, workflow_id))
+ workflows[workflow_id] = workflow
+
+ data['error'] = False
+ data['result'] = True
+ else:
+ data['error'] = True
+ data['result'] = False
+ data['message'] = 'workflow is not in the message body'
+
+ log.info('returning a result for workflow register event to sid %s' % sid)
+ sio.emit(
+ event=WORKFLOW_REGISTER,
+ data=data,
+ room=sid
+ )
+
+
+def _handle_event_workflow_reg_essence(sid, body):
+ data = {
+ 'req_id': _get_req_id(body)
+ }
+
+ if 'essence' in body:
+ essence = body['essence']
+ for wid in essence:
+ workflow_essence = essence[wid]
+ if 'dag' in workflow_essence and 'dag_id' in workflow_essence['dag']:
+ dag = workflow_essence['dag']
+ workflow_id = dag['dag_id']
+
+ if workflow_id in workflow_essences or workflow_id in workflows:
+ # already exist
+ data['error'] = True
+ data['result'] = False
+ data['message'] = 'workflow is already registered'
+ else:
+ log.info('manager (%s) registers a workflow (%s)' % (sid, workflow_id))
+ workflow_essences[workflow_id] = workflow_essence
+
+ data['error'] = False
+ data['result'] = True
+ else:
+ data['error'] = True
+ data['result'] = False
+ data['message'] = 'essence is not in the message body'
+ else:
+ data['error'] = True
+ data['result'] = False
+ data['message'] = 'essence is not in the message body'
+
+ log.info('returning a result for workflow essence register event to sid %s' % sid)
+ sio.emit(
+ event=WORKFLOW_REGISTER_ESSENCE,
+ data=data,
+ room=sid
+ )
+
+
+def _handle_event_workflow_list(sid, body):
+ data = {
+ 'req_id': _get_req_id(body)
+ }
+
+ workflow_ids = []
+
+ for workflow_id in workflows:
+ workflow_ids.append(workflow_id)
+
+ for workflow_id in workflow_essences:
+ workflow_ids.append(workflow_id)
+
+ data['error'] = False
+ data['result'] = workflow_ids
+
+ log.info('returning a result for workflow list event to sid %s' % sid)
+ sio.emit(
+ event=WORKFLOW_LIST,
+ data=data,
+ room=sid
+ )
+
+
+def _handle_event_workflow_run_list(sid, body):
+ data = {
+ 'req_id': _get_req_id(body)
+ }
+
+ workflow_run_ids = []
+
+ for workflow_run_id in workflow_runs:
+ workflow_run_ids.append(workflow_run_id)
+
+ data['error'] = False
+ data['result'] = workflow_run_ids
+
+ log.info('returning a result for workflow run list event to sid %s' % sid)
+ sio.emit(
+ event=WORKFLOW_LIST_RUN,
+ data=data,
+ room=sid
+ )
+
+
+def _handle_event_workflow_check(sid, body):
+ data = {
+ 'req_id': _get_req_id(body)
+ }
+
+ if 'workflow_id' in body:
+ workflow_id = body['workflow_id']
+ if workflow_id in workflows:
+ data['error'] = False
+ data['result'] = True
+ else:
+ data['error'] = False
+ data['result'] = False
+ else:
+ data['error'] = True
+ data['result'] = False
+ data['message'] = 'workflow_id is not in the message body'
+
+ log.info('returning a result for workflow check event to sid %s' % sid)
+ sio.emit(
+ event=WORKFLOW_CHECK,
+ data=data,
+ room=sid
+ )
+
+
+def _handle_event_workflow_remove(sid, body):
+ data = {
+ 'req_id': _get_req_id(body)
+ }
+
+ if 'workflow_id' in body:
+ workflow_id = body['workflow_id']
+ if workflow_id in workflows:
+
+ hasWorkflowRuns = False
+ for workflow_run_id in workflow_runs:
+ workflow_run = workflow_runs[workflow_run_id]
+ wid = workflow_run['workflow_id']
+ if wid == workflow_id:
+ # there is a workflow run for the workflow id
+ hasWorkflowRuns = True
+ break
+
+ if hasWorkflowRuns:
+ data['error'] = False
+ data['result'] = False
+ else:
+ del workflows[workflow_id]
+
+ data['error'] = False
+ data['result'] = True
+ else:
+ data['error'] = False
+ data['result'] = False
+ else:
+ data['error'] = True
+ data['result'] = False
+ data['message'] = 'workflow_id is not in the message body'
+
+ log.info('returning a result for workflow remove event to sid %s' % sid)
+ sio.emit(
+ event=WORKFLOW_REMOVE,
+ data=data,
+ room=sid
+ )
+
+
+def _handle_event_workflow_run_remove(sid, body):
+ data = {
+ 'req_id': _get_req_id(body)
+ }
+
+ if 'workflow_id' in body and 'workflow_run_id' in body:
+ # workflow_id = body['workflow_id']
+ workflow_run_id = body['workflow_run_id']
+
+ if workflow_run_id in workflow_runs:
+ del workflow_runs[workflow_run_id]
+
+ data['error'] = False
+ data['result'] = True
+ else:
+ data['error'] = False
+ data['result'] = False
+ else:
+ data['error'] = True
+ data['result'] = False
+ data['message'] = 'workflow_id or workflow_run_id is not in the message body'
+
+ log.info('returning a result for workflow run remove event to sid %s' % sid)
+ sio.emit(
+ event=WORKFLOW_REMOVE_RUN,
+ data=data,
+ room=sid
+ )
+
+
+def _handle_event_new_workflow_run(sid, body):
+ data = {
+ 'req_id': _get_req_id(body)
+ }
+
+ if 'workflow_id' in body and 'workflow_run_id' in body:
+ workflow_id = body['workflow_id']
+ workflow_run_id = body['workflow_run_id']
+
+ log.info('manager (%s) started a new workflow (%s), workflow_run (%s)' % (sid, workflow_id, workflow_run_id))
+ workflow_runs[workflow_run_id] = {
+ 'workflow_id': workflow_id,
+ 'workflow_run_id': workflow_run_id,
+ 'queue': []
+ }
+
+ data['error'] = False
+ data['result'] = True
+ else:
+ data['error'] = True
+ data['result'] = False
+ data['message'] = 'workflow_id or workflow_run_id is not in the message body'
+
+ log.info('returning a result for a new workflow run event to sid %s' % sid)
+ sio.emit(
+ event=WORKFLOW_NOTIFY_NEW_RUN,
+ data=data,
+ room=sid
+ )
+
+
+def _handle_event_workflow_run_update_status(sid, body):
+ data = {
+ 'req_id': _get_req_id(body)
+ }
+
+ if 'workflow_id' in body and 'workflow_run_id' in body and 'task_id' in body and 'status' in body:
+ # workflow_id = body['workflow_id']
+ workflow_run_id = body['workflow_run_id']
+ task_id = body['task_id']
+ status = body['status']
+
+ if workflow_run_id in workflow_runs:
+ workflow_run = workflow_runs[workflow_run_id]
+ workflow_run[task_id] = status
+
+ data['error'] = False
+ data['result'] = True
+ else:
+ data['error'] = True
+ data['result'] = False
+ data['message'] = 'cannot find workflow run'
+ else:
+ data['error'] = True
+ data['result'] = False
+ data['message'] = 'workflow_id, workflow_run_id, task_id or status is not in the message body'
+
+ log.info('returning a result for workflow run update status event to sid %s' % sid)
+ sio.emit(
+ event=WORKFLOW_RUN_UPDATE_STATUS,
+ data=data,
+ room=sid
+ )
+
+
+def _handle_event_workflow_run_count_events(sid, body):
+ data = {
+ 'req_id': _get_req_id(body)
+ }
+
+ if 'workflow_id' in body and 'workflow_run_id' in body:
+ # workflow_id = body['workflow_id']
+ workflow_run_id = body['workflow_run_id']
+
+ if workflow_run_id in workflow_runs:
+ workflow_run = workflow_runs[workflow_run_id]
+ queue = workflow_run['queue']
+ count = len(queue)
+
+ data['error'] = False
+ data['result'] = count
+ else:
+ data['error'] = True
+ data['result'] = 0
+ data['message'] = 'cannot find workflow run'
+ else:
+ data['error'] = True
+ data['result'] = 0
+ data['message'] = 'workflow_id, workflow_run_id, task_id or status is not in the message body'
+
+ log.info('returning a result for workflow run count events to sid %s' % sid)
+ sio.emit(
+ event=WORKFLOW_RUN_COUNT_EVENTS,
+ data=data,
+ room=sid
+ )
+
+
+def _handle_event_workflow_run_fetch_event(sid, body):
+ data = {
+ 'req_id': _get_req_id(body)
+ }
+
+ if 'workflow_id' in body and 'workflow_run_id' in body and 'task_id' in body and 'topic' in body:
+ # workflow_id = body['workflow_id']
+ workflow_run_id = body['workflow_run_id']
+ # task_id = body['task_id']
+ topic = body['topic']
+
+ if workflow_run_id in workflow_runs:
+ workflow_run = workflow_runs[workflow_run_id]
+ queue = workflow_run['queue']
+
+ event = None
+ for idx in range(len(queue)):
+ if queue[idx]['topic'] == topic:
+ # found
+ event = queue.pop(idx)
+ break
+
+ if event:
+ data['error'] = False
+ data['result'] = event
+ else:
+ data['error'] = False
+ data['result'] = {}
+ else:
+ data['error'] = False
+ data['result'] = False
+ data['message'] = 'cannot find workflow run'
+ else:
+ data['error'] = True
+ data['result'] = False
+ data['message'] = 'workflow_id, workflow_run_id, task_id or topic is not in the message body'
+
+ log.info('returning a result for workflow run fetch event to sid %s' % sid)
+ sio.emit(
+ event=WORKFLOW_RUN_FETCH_EVENT,
+ data=data,
+ room=sid
+ )
+
+
+def _handle_event(event, sid, body):
+ log.info('event %s - body %s (%s)' % (event, body, type(body)))
+
+
+class ServerEventHandler(socketio.namespace.Namespace):
+ def trigger_event(self, event, *args):
+ sid = args[0]
+ if event == 'connect':
+ querystr = args[1]['QUERY_STRING']
+ query = make_query_string_dict(querystr)
+ _handle_event_connect(sid, query)
+ elif event == 'disconnect':
+ _handle_event_disconnect(sid)
+
+ # manager
+ elif event == WORKFLOW_NOTIFY_NEW_RUN:
+ _handle_event_new_workflow_run(sid, args[1])
+ elif event == WORKFLOW_REGISTER_ESSENCE:
+ _handle_event_workflow_reg_essence(sid, args[1])
+ elif event == WORKFLOW_REGISTER:
+ _handle_event_workflow_reg(sid, args[1])
+ elif event == WORKFLOW_LIST:
+ _handle_event_workflow_list(sid, args[1])
+ elif event == WORKFLOW_LIST_RUN:
+ _handle_event_workflow_run_list(sid, args[1])
+ elif event == WORKFLOW_CHECK:
+ _handle_event_workflow_check(sid, args[1])
+ elif event == WORKFLOW_REMOVE:
+ _handle_event_workflow_remove(sid, args[1])
+ elif event == WORKFLOW_REMOVE_RUN:
+ _handle_event_workflow_run_remove(sid, args[1])
+
+ # workflow run
+ elif event == WORKFLOW_RUN_UPDATE_STATUS:
+ _handle_event_workflow_run_update_status(sid, args[1])
+ elif event == WORKFLOW_RUN_COUNT_EVENTS:
+ _handle_event_workflow_run_count_events(sid, args[1])
+ elif event == WORKFLOW_RUN_FETCH_EVENT:
+ _handle_event_workflow_run_fetch_event(sid, args[1])
+ else:
+ _handle_event(event, args[0], args[1])
+
+
+def _run(port):
+ global sio
+ sio = socketio.Server(ping_timeout=5, ping_interval=1)
+ app = socketio.WSGIApp(sio)
+ sio.register_namespace(ServerEventHandler('/'))
+
+ server = pywsgi.WSGIServer(
+ ('', port),
+ app,
+ handler_class=WebSocketHandler
+ )
+
+ server.serve_forever()
+
+
+def start(port):
+ p = Process(target=_run, args=(port, ))
+ p.start()
+ time.sleep(3)
+
+ log.info('Dummy server is started!')
+ return p
+
+
+def stop(p):
+ log.info('Stopping dummy server!')
+
+ try:
+ process = psutil.Process(p.pid)
+ for proc in process.children(recursive=True):
+ proc.kill()
+ process.kill()
+ p.join()
+ except psutil.NoSuchProcess:
+ pass
+
+ # clean-up
+ global sio, manager_clients, workflow_runs, seq_no
+ sio = None
+ manager_clients = {}
+ workflow_runs = {}
+ seq_no = 1
+
+ time.sleep(3)
+
+ log.info('Dummy server is stopped!')
diff --git a/test/dummy_server_util.py b/test/dummy_server_util.py
new file mode 100644
index 0000000..7cc60db
--- /dev/null
+++ b/test/dummy_server_util.py
@@ -0,0 +1,39 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+import atexit
+from .dummy_server import stop as server_stop
+
+dummy_servers = {}
+
+
+def cleanup_dummy_servers():
+ for pid in dummy_servers:
+ s = dummy_servers[pid]
+ server_stop(s)
+ del dummy_servers[pid]
+
+
+def register_dummy_server_cleanup(s):
+ if s.pid not in dummy_servers:
+ dummy_servers[s.pid] = s
+
+
+def unregister_dummy_server_cleanup(s):
+ if s.pid in dummy_servers:
+ del dummy_servers[s.pid]
+
+
+atexit.register(cleanup_dummy_servers)
diff --git a/test/hello_workflow.json b/test/hello_workflow.json
new file mode 100644
index 0000000..9de71bc
--- /dev/null
+++ b/test/hello_workflow.json
@@ -0,0 +1,25 @@
+{
+ "hello_workflow": {
+ "dag": {
+ "dag_id": "hello_workflow",
+ "local_variable": "dag_hello"
+ },
+ "dependencies": {
+ "onu_event_handler": {}
+ },
+ "tasks": {
+ "onu_event_handler": {
+ "class": "XOSEventSensor",
+ "dag_id": "hello_workflow",
+ "dag": "dag_hello",
+ "key_field": "serialNumber",
+ "local_variable": "onu_event_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "ONU_event",
+ "task_id": "onu_event_handler",
+ "topic": "onu.events"
+ }
+ }
+ }
+}
diff --git a/test/test_manager.py b/test/test_manager.py
new file mode 100644
index 0000000..d35a031
--- /dev/null
+++ b/test/test_manager.py
@@ -0,0 +1,120 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+import unittest
+import time
+import os
+import json
+from cord_workflow_controller_client.manager import Manager
+from multistructlog import create_logger
+from .dummy_server import start as server_start, stop as server_stop
+from .dummy_server_util import register_dummy_server_cleanup, unregister_dummy_server_cleanup
+
+log = create_logger()
+code_dir = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+
+
+def read_json_file(filename):
+ if filename:
+ with open(filename, 'r') as f:
+ return json.load(f)
+ return None
+
+
+class TestManager(unittest.TestCase):
+ """
+ Try to connect to a local fake Controller Service as a Manager.
+ """
+
+ def setUp(self):
+ self.server = server_start(17080)
+ self.kickstarted_workflows = {}
+ register_dummy_server_cleanup(self.server)
+
+ def tearDown(self):
+ server_stop(self.server)
+ unregister_dummy_server_cleanup(self.server)
+ self.server = None
+ self.kickstarted_workflows = {}
+
+ def test_connect(self):
+ """
+ This tests if Manager client can connect to a socket.io server properly.
+ """
+ succeed = False
+ try:
+ manager = Manager(logger=log)
+ manager.connect('http://localhost:17080')
+
+ time.sleep(1)
+
+ manager.disconnect()
+ succeed = True
+ finally:
+ self.assertTrue(succeed, 'Finished with error')
+
+ def test_kickstart(self):
+ """
+ This tests if Manager client can receive a kickstart event.
+ """
+ succeed = False
+
+ try:
+ manager = Manager(logger=log)
+ manager.connect('http://localhost:17080')
+
+ def on_kickstart(workflow_id, workflow_run_id):
+ self.kickstarted_workflows[workflow_id] = {
+ 'workflow_id': workflow_id,
+ 'workflow_run_id': workflow_run_id
+ }
+ manager.notify_new_workflow_run(workflow_id, workflow_run_id)
+
+ manager.set_handlers({'kickstart': on_kickstart})
+
+ # dummy server sends a kickstart message for every 2 seconds
+ # we wait 6 seconds to receive at least 2 messages
+ time.sleep(6)
+
+ manager.disconnect()
+ succeed = True
+ finally:
+ self.assertTrue(succeed, 'Finished with error')
+ self.assertTrue(len(self.kickstarted_workflows) >= 2, 'Kickstart event is not handled')
+
+ def test_workflow_essence_register(self):
+ """
+ This tests if Manager client can register workflow essence.
+ """
+ succeed = False
+ essence_path = os.path.join(code_dir, "hello_workflow.json")
+ essence = read_json_file(essence_path)
+
+ try:
+ manager = Manager(logger=log)
+ manager.connect('http://localhost:17080')
+
+ # the command is synchronous
+ result = manager.register_workflow_essence(essence)
+
+ manager.disconnect()
+ succeed = True
+ finally:
+ self.assertTrue(succeed, 'Finished with error')
+ self.assertTrue(result, 'workflow essence register failed')
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/test/test_probe.py b/test/test_probe.py
new file mode 100644
index 0000000..3f6baed
--- /dev/null
+++ b/test/test_probe.py
@@ -0,0 +1,95 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+import unittest
+import time
+from cord_workflow_controller_client.probe import Probe
+from multistructlog import create_logger
+from .dummy_server import start as server_start, stop as server_stop
+
+log = create_logger()
+
+
+class TestProbe(unittest.TestCase):
+ """
+ Try to connect to a local fake Controller Service as a Probe.
+ """
+
+ def setUp(self):
+ self.server = server_start(17080)
+
+ def tearDown(self):
+ server_stop(self.server)
+ self.server = None
+
+ def test_connect(self):
+ """
+ This tests if Probe client can connect to a socket.io server properly.
+ """
+ succeed = False
+ try:
+ probe = Probe(logger=log)
+ probe.connect('http://localhost:17080')
+
+ time.sleep(1)
+
+ probe.disconnect()
+ succeed = True
+ finally:
+ self.assertTrue(succeed, 'Finished with error')
+
+ def test_emit_string(self):
+ """
+ This tests if Probe client can emit an event.
+ """
+ succeed = False
+ try:
+ probe = Probe(logger=log)
+ probe.connect('http://localhost:17080')
+
+ probe.emit_event('xos.test.event', 'string message - hello')
+ time.sleep(1)
+
+ probe.disconnect()
+ succeed = True
+ finally:
+ self.assertTrue(succeed, 'Finished with error')
+
+ def test_emit_json(self):
+ """
+ This tests if Probe client can emit an event with a dict (json) object.
+ """
+ succeed = False
+ try:
+ probe = Probe(logger=log)
+ probe.connect('http://localhost:17080')
+
+ probe.emit_event(
+ 'xos.test.event',
+ {
+ 'str_key': 'value',
+ 'int_key': 32335
+ }
+ )
+ time.sleep(1)
+
+ probe.disconnect()
+ succeed = True
+ finally:
+ self.assertTrue(succeed, 'Finished with error')
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/test/test_workflow_run.py b/test/test_workflow_run.py
new file mode 100644
index 0000000..80d6fcf
--- /dev/null
+++ b/test/test_workflow_run.py
@@ -0,0 +1,179 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+import unittest
+import time
+import os
+import json
+from cord_workflow_controller_client.manager import Manager
+from cord_workflow_controller_client.workflow_run import WorkflowRun
+from multistructlog import create_logger
+from .dummy_server import start as server_start, stop as server_stop
+
+log = create_logger()
+code_dir = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+
+
+def read_json_file(filename):
+ if filename:
+ with open(filename, 'r') as f:
+ return json.load(f)
+ return None
+
+
+class TestWorkflowRun(unittest.TestCase):
+ """
+ Try to connect to a local fake Controller Service as a Manager.
+ """
+
+ def setUp(self):
+ self.kickstarted_workflows = {}
+ self.notifications = []
+
+ self.server = server_start(17080)
+ self.manager = Manager(logger=log)
+ self.manager.connect('http://localhost:17080')
+
+ essence_path = os.path.join(code_dir, "hello_workflow.json")
+ essence = read_json_file(essence_path)
+ self.manager.register_workflow_essence(essence)
+ self.manager.notify_new_workflow_run('hello_workflow', 'hello_workflow_123')
+
+ # wait for 2 seconds for registering a new workflow run
+ time.sleep(2)
+
+ def tearDown(self):
+ self.manager.disconnect()
+ self.manager = None
+
+ server_stop(self.server)
+ self.server = None
+
+ self.kickstarted_workflows = {}
+ self.notifications = []
+
+ def test_connect(self):
+ """
+ This tests if workflow run client can connect to a socket.io server properly.
+ """
+ succeed = False
+ try:
+ run = WorkflowRun('hello_workflow', 'hello_workflow_123')
+ run.connect('http://localhost:17080')
+
+ time.sleep(1)
+
+ run.disconnect()
+ succeed = True
+ finally:
+ self.assertTrue(succeed, 'Finished with error')
+
+ def test_count_events(self):
+ """
+ This tests if workflow run client can retrieve the number of events.
+ """
+ succeed = False
+ try:
+ run = WorkflowRun('hello_workflow', 'hello_workflow_123')
+ run.connect('http://localhost:17080')
+
+ # dummy server generates a message for every 2 seconds
+ # we wait 6 seconds to queue at least 2 messages
+ time.sleep(6)
+
+ count = run.count_events()
+
+ run.disconnect()
+ succeed = True
+ finally:
+ self.assertTrue(succeed, 'Finished with error')
+ self.assertTrue(count >= 2, 'There must be more than 2 events queued')
+
+ def test_notify_event(self):
+ """
+ This tests if workflow run client can get a noficitation for events.
+ """
+ succeed = False
+ try:
+ run = WorkflowRun('hello_workflow', 'hello_workflow_123')
+ run.connect('http://localhost:17080')
+
+ def on_notification(workflow_id, workflow_run_id, topic):
+ self.notifications.append({
+ 'workflow_id': workflow_id,
+ 'workflow_run_id': workflow_run_id,
+ 'topic': topic
+ })
+
+ run.set_handlers({'notify': on_notification})
+
+ # dummy server generates a message for every 2 seconds
+ # we wait 6 seconds to get at least 2 notifications
+ time.sleep(6)
+
+ count = len(self.notifications)
+
+ run.disconnect()
+ succeed = True
+ finally:
+ self.assertTrue(succeed, 'Finished with error')
+ self.assertTrue(count >= 2, 'There must be more than 2 notifications received')
+
+ def test_get_events(self):
+ """
+ This tests if workflow run client can retrieve events.
+ """
+ succeed = False
+ try:
+ run = WorkflowRun('hello_workflow', 'hello_workflow_123')
+ run.connect('http://localhost:17080')
+
+ def on_notification(workflow_id, workflow_run_id, topic):
+ self.notifications.append({
+ 'workflow_id': workflow_id,
+ 'workflow_run_id': workflow_run_id,
+ 'topic': topic
+ })
+
+ run.set_handlers({'notify': on_notification})
+
+ # dummy server generates a message for every 2 seconds
+ # we wait 6 seconds to queue at least 2 messages
+ time.sleep(6)
+
+ count_notified = len(self.notifications)
+ count_queued = run.count_events()
+
+ self.assertTrue(count_notified >= 2, 'There must be more than 2 events notified')
+ self.assertTrue(count_queued >= 2, 'There must be more than 2 events queued')
+
+ # count_notified and count_queued may not have the same number temporarily
+ for i in range(count_notified):
+ notification = self.notifications.pop(0)
+ topic = notification['topic']
+ event = run.fetch_event('task123', topic)
+
+ self.assertTrue('topic' in event, 'event should not be empty')
+ self.assertTrue(event['topic'] == topic, 'event should be retrieved by topic')
+ self.assertTrue(len(event['message']) > 0, 'there must be some messages')
+
+ run.disconnect()
+ succeed = True
+ finally:
+ self.assertTrue(succeed, 'Finished with error')
+
+
+if __name__ == "__main__":
+ unittest.main()