blob: 80d6fcff8aaedec2e92c7cd60bac07daebbe2a7b [file] [log] [blame]
Illyoung Choia9d2c2c2019-07-12 13:29:42 -07001# Copyright 2019-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from __future__ import absolute_import
16import unittest
17import time
18import os
19import json
20from cord_workflow_controller_client.manager import Manager
21from cord_workflow_controller_client.workflow_run import WorkflowRun
22from multistructlog import create_logger
23from .dummy_server import start as server_start, stop as server_stop
24
25log = create_logger()
26code_dir = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
27
28
29def read_json_file(filename):
30 if filename:
31 with open(filename, 'r') as f:
32 return json.load(f)
33 return None
34
35
36class TestWorkflowRun(unittest.TestCase):
37 """
38 Try to connect to a local fake Controller Service as a Manager.
39 """
40
41 def setUp(self):
42 self.kickstarted_workflows = {}
43 self.notifications = []
44
45 self.server = server_start(17080)
46 self.manager = Manager(logger=log)
47 self.manager.connect('http://localhost:17080')
48
49 essence_path = os.path.join(code_dir, "hello_workflow.json")
50 essence = read_json_file(essence_path)
51 self.manager.register_workflow_essence(essence)
52 self.manager.notify_new_workflow_run('hello_workflow', 'hello_workflow_123')
53
54 # wait for 2 seconds for registering a new workflow run
55 time.sleep(2)
56
57 def tearDown(self):
58 self.manager.disconnect()
59 self.manager = None
60
61 server_stop(self.server)
62 self.server = None
63
64 self.kickstarted_workflows = {}
65 self.notifications = []
66
67 def test_connect(self):
68 """
69 This tests if workflow run client can connect to a socket.io server properly.
70 """
71 succeed = False
72 try:
73 run = WorkflowRun('hello_workflow', 'hello_workflow_123')
74 run.connect('http://localhost:17080')
75
76 time.sleep(1)
77
78 run.disconnect()
79 succeed = True
80 finally:
81 self.assertTrue(succeed, 'Finished with error')
82
83 def test_count_events(self):
84 """
85 This tests if workflow run client can retrieve the number of events.
86 """
87 succeed = False
88 try:
89 run = WorkflowRun('hello_workflow', 'hello_workflow_123')
90 run.connect('http://localhost:17080')
91
92 # dummy server generates a message for every 2 seconds
93 # we wait 6 seconds to queue at least 2 messages
94 time.sleep(6)
95
96 count = run.count_events()
97
98 run.disconnect()
99 succeed = True
100 finally:
101 self.assertTrue(succeed, 'Finished with error')
102 self.assertTrue(count >= 2, 'There must be more than 2 events queued')
103
104 def test_notify_event(self):
105 """
106 This tests if workflow run client can get a noficitation for events.
107 """
108 succeed = False
109 try:
110 run = WorkflowRun('hello_workflow', 'hello_workflow_123')
111 run.connect('http://localhost:17080')
112
113 def on_notification(workflow_id, workflow_run_id, topic):
114 self.notifications.append({
115 'workflow_id': workflow_id,
116 'workflow_run_id': workflow_run_id,
117 'topic': topic
118 })
119
120 run.set_handlers({'notify': on_notification})
121
122 # dummy server generates a message for every 2 seconds
123 # we wait 6 seconds to get at least 2 notifications
124 time.sleep(6)
125
126 count = len(self.notifications)
127
128 run.disconnect()
129 succeed = True
130 finally:
131 self.assertTrue(succeed, 'Finished with error')
132 self.assertTrue(count >= 2, 'There must be more than 2 notifications received')
133
134 def test_get_events(self):
135 """
136 This tests if workflow run client can retrieve events.
137 """
138 succeed = False
139 try:
140 run = WorkflowRun('hello_workflow', 'hello_workflow_123')
141 run.connect('http://localhost:17080')
142
143 def on_notification(workflow_id, workflow_run_id, topic):
144 self.notifications.append({
145 'workflow_id': workflow_id,
146 'workflow_run_id': workflow_run_id,
147 'topic': topic
148 })
149
150 run.set_handlers({'notify': on_notification})
151
152 # dummy server generates a message for every 2 seconds
153 # we wait 6 seconds to queue at least 2 messages
154 time.sleep(6)
155
156 count_notified = len(self.notifications)
157 count_queued = run.count_events()
158
159 self.assertTrue(count_notified >= 2, 'There must be more than 2 events notified')
160 self.assertTrue(count_queued >= 2, 'There must be more than 2 events queued')
161
162 # count_notified and count_queued may not have the same number temporarily
163 for i in range(count_notified):
164 notification = self.notifications.pop(0)
165 topic = notification['topic']
166 event = run.fetch_event('task123', topic)
167
168 self.assertTrue('topic' in event, 'event should not be empty')
169 self.assertTrue(event['topic'] == topic, 'event should be retrieved by topic')
170 self.assertTrue(len(event['message']) > 0, 'there must be some messages')
171
172 run.disconnect()
173 succeed = True
174 finally:
175 self.assertTrue(succeed, 'Finished with error')
176
177
178if __name__ == "__main__":
179 unittest.main()