Implement controller client
- Probe, Manager, WorkflowRun classes are provided to interact with CORD Workflow Controller
Change-Id: I0ad8d3661864635d9701eab1cb089cb17f81cd50
diff --git a/test/test_manager.py b/test/test_manager.py
new file mode 100644
index 0000000..d35a031
--- /dev/null
+++ b/test/test_manager.py
@@ -0,0 +1,120 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+import unittest
+import time
+import os
+import json
+from cord_workflow_controller_client.manager import Manager
+from multistructlog import create_logger
+from .dummy_server import start as server_start, stop as server_stop
+from .dummy_server_util import register_dummy_server_cleanup, unregister_dummy_server_cleanup
+
+log = create_logger()
+code_dir = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+
+
+def read_json_file(filename):
+ if filename:
+ with open(filename, 'r') as f:
+ return json.load(f)
+ return None
+
+
+class TestManager(unittest.TestCase):
+ """
+ Try to connect to a local fake Controller Service as a Manager.
+ """
+
+ def setUp(self):
+ self.server = server_start(17080)
+ self.kickstarted_workflows = {}
+ register_dummy_server_cleanup(self.server)
+
+ def tearDown(self):
+ server_stop(self.server)
+ unregister_dummy_server_cleanup(self.server)
+ self.server = None
+ self.kickstarted_workflows = {}
+
+ def test_connect(self):
+ """
+ This tests if Manager client can connect to a socket.io server properly.
+ """
+ succeed = False
+ try:
+ manager = Manager(logger=log)
+ manager.connect('http://localhost:17080')
+
+ time.sleep(1)
+
+ manager.disconnect()
+ succeed = True
+ finally:
+ self.assertTrue(succeed, 'Finished with error')
+
+ def test_kickstart(self):
+ """
+ This tests if Manager client can receive a kickstart event.
+ """
+ succeed = False
+
+ try:
+ manager = Manager(logger=log)
+ manager.connect('http://localhost:17080')
+
+ def on_kickstart(workflow_id, workflow_run_id):
+ self.kickstarted_workflows[workflow_id] = {
+ 'workflow_id': workflow_id,
+ 'workflow_run_id': workflow_run_id
+ }
+ manager.notify_new_workflow_run(workflow_id, workflow_run_id)
+
+ manager.set_handlers({'kickstart': on_kickstart})
+
+ # dummy server sends a kickstart message for every 2 seconds
+ # we wait 6 seconds to receive at least 2 messages
+ time.sleep(6)
+
+ manager.disconnect()
+ succeed = True
+ finally:
+ self.assertTrue(succeed, 'Finished with error')
+ self.assertTrue(len(self.kickstarted_workflows) >= 2, 'Kickstart event is not handled')
+
+ def test_workflow_essence_register(self):
+ """
+ This tests if Manager client can register workflow essence.
+ """
+ succeed = False
+ essence_path = os.path.join(code_dir, "hello_workflow.json")
+ essence = read_json_file(essence_path)
+
+ try:
+ manager = Manager(logger=log)
+ manager.connect('http://localhost:17080')
+
+ # the command is synchronous
+ result = manager.register_workflow_essence(essence)
+
+ manager.disconnect()
+ succeed = True
+ finally:
+ self.assertTrue(succeed, 'Finished with error')
+ self.assertTrue(result, 'workflow essence register failed')
+
+
+if __name__ == "__main__":
+ unittest.main()