blob: f3996a8d46248e276d577962f78ea1683301d893 [file] [log] [blame]
Illyoung Choia9d2c2c2019-07-12 13:29:42 -07001# Copyright 2019-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from __future__ import absolute_import
16import unittest
17import time
18import os
19import json
20from cord_workflow_controller_client.manager import Manager
21from multistructlog import create_logger
22from .dummy_server import start as server_start, stop as server_stop
23from .dummy_server_util import register_dummy_server_cleanup, unregister_dummy_server_cleanup
24
25log = create_logger()
26code_dir = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
27
28
29def read_json_file(filename):
30 if filename:
31 with open(filename, 'r') as f:
32 return json.load(f)
33 return None
34
35
36class TestManager(unittest.TestCase):
37 """
38 Try to connect to a local fake Controller Service as a Manager.
39 """
40
41 def setUp(self):
42 self.server = server_start(17080)
43 self.kickstarted_workflows = {}
44 register_dummy_server_cleanup(self.server)
45
46 def tearDown(self):
47 server_stop(self.server)
48 unregister_dummy_server_cleanup(self.server)
49 self.server = None
50 self.kickstarted_workflows = {}
51
52 def test_connect(self):
53 """
54 This tests if Manager client can connect to a socket.io server properly.
55 """
56 succeed = False
57 try:
58 manager = Manager(logger=log)
59 manager.connect('http://localhost:17080')
60
61 time.sleep(1)
62
63 manager.disconnect()
64 succeed = True
65 finally:
66 self.assertTrue(succeed, 'Finished with error')
67
68 def test_kickstart(self):
69 """
70 This tests if Manager client can receive a kickstart event.
71 """
72 succeed = False
73
74 try:
75 manager = Manager(logger=log)
76 manager.connect('http://localhost:17080')
77
78 def on_kickstart(workflow_id, workflow_run_id):
79 self.kickstarted_workflows[workflow_id] = {
80 'workflow_id': workflow_id,
81 'workflow_run_id': workflow_run_id
82 }
Illyoung Choi4df34b72019-07-18 13:55:18 -070083 manager.report_new_workflow_run(workflow_id, workflow_run_id)
Illyoung Choia9d2c2c2019-07-12 13:29:42 -070084
85 manager.set_handlers({'kickstart': on_kickstart})
86
87 # dummy server sends a kickstart message for every 2 seconds
88 # we wait 6 seconds to receive at least 2 messages
89 time.sleep(6)
90
91 manager.disconnect()
92 succeed = True
93 finally:
94 self.assertTrue(succeed, 'Finished with error')
95 self.assertTrue(len(self.kickstarted_workflows) >= 2, 'Kickstart event is not handled')
96
97 def test_workflow_essence_register(self):
98 """
99 This tests if Manager client can register workflow essence.
100 """
101 succeed = False
102 essence_path = os.path.join(code_dir, "hello_workflow.json")
103 essence = read_json_file(essence_path)
104
105 try:
106 manager = Manager(logger=log)
107 manager.connect('http://localhost:17080')
108
109 # the command is synchronous
110 result = manager.register_workflow_essence(essence)
111
112 manager.disconnect()
113 succeed = True
114 finally:
115 self.assertTrue(succeed, 'Finished with error')
116 self.assertTrue(result, 'workflow essence register failed')
117
118
119if __name__ == "__main__":
120 unittest.main()