blob: 80d6fcff8aaedec2e92c7cd60bac07daebbe2a7b [file] [log] [blame]
# Copyright 2019-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
import unittest
import time
import os
import json
from cord_workflow_controller_client.manager import Manager
from cord_workflow_controller_client.workflow_run import WorkflowRun
from multistructlog import create_logger
from .dummy_server import start as server_start, stop as server_stop
log = create_logger()
code_dir = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
def read_json_file(filename):
if filename:
with open(filename, 'r') as f:
return json.load(f)
return None
class TestWorkflowRun(unittest.TestCase):
"""
Try to connect to a local fake Controller Service as a Manager.
"""
def setUp(self):
self.kickstarted_workflows = {}
self.notifications = []
self.server = server_start(17080)
self.manager = Manager(logger=log)
self.manager.connect('http://localhost:17080')
essence_path = os.path.join(code_dir, "hello_workflow.json")
essence = read_json_file(essence_path)
self.manager.register_workflow_essence(essence)
self.manager.notify_new_workflow_run('hello_workflow', 'hello_workflow_123')
# wait for 2 seconds for registering a new workflow run
time.sleep(2)
def tearDown(self):
self.manager.disconnect()
self.manager = None
server_stop(self.server)
self.server = None
self.kickstarted_workflows = {}
self.notifications = []
def test_connect(self):
"""
This tests if workflow run client can connect to a socket.io server properly.
"""
succeed = False
try:
run = WorkflowRun('hello_workflow', 'hello_workflow_123')
run.connect('http://localhost:17080')
time.sleep(1)
run.disconnect()
succeed = True
finally:
self.assertTrue(succeed, 'Finished with error')
def test_count_events(self):
"""
This tests if workflow run client can retrieve the number of events.
"""
succeed = False
try:
run = WorkflowRun('hello_workflow', 'hello_workflow_123')
run.connect('http://localhost:17080')
# dummy server generates a message for every 2 seconds
# we wait 6 seconds to queue at least 2 messages
time.sleep(6)
count = run.count_events()
run.disconnect()
succeed = True
finally:
self.assertTrue(succeed, 'Finished with error')
self.assertTrue(count >= 2, 'There must be more than 2 events queued')
def test_notify_event(self):
"""
This tests if workflow run client can get a noficitation for events.
"""
succeed = False
try:
run = WorkflowRun('hello_workflow', 'hello_workflow_123')
run.connect('http://localhost:17080')
def on_notification(workflow_id, workflow_run_id, topic):
self.notifications.append({
'workflow_id': workflow_id,
'workflow_run_id': workflow_run_id,
'topic': topic
})
run.set_handlers({'notify': on_notification})
# dummy server generates a message for every 2 seconds
# we wait 6 seconds to get at least 2 notifications
time.sleep(6)
count = len(self.notifications)
run.disconnect()
succeed = True
finally:
self.assertTrue(succeed, 'Finished with error')
self.assertTrue(count >= 2, 'There must be more than 2 notifications received')
def test_get_events(self):
"""
This tests if workflow run client can retrieve events.
"""
succeed = False
try:
run = WorkflowRun('hello_workflow', 'hello_workflow_123')
run.connect('http://localhost:17080')
def on_notification(workflow_id, workflow_run_id, topic):
self.notifications.append({
'workflow_id': workflow_id,
'workflow_run_id': workflow_run_id,
'topic': topic
})
run.set_handlers({'notify': on_notification})
# dummy server generates a message for every 2 seconds
# we wait 6 seconds to queue at least 2 messages
time.sleep(6)
count_notified = len(self.notifications)
count_queued = run.count_events()
self.assertTrue(count_notified >= 2, 'There must be more than 2 events notified')
self.assertTrue(count_queued >= 2, 'There must be more than 2 events queued')
# count_notified and count_queued may not have the same number temporarily
for i in range(count_notified):
notification = self.notifications.pop(0)
topic = notification['topic']
event = run.fetch_event('task123', topic)
self.assertTrue('topic' in event, 'event should not be empty')
self.assertTrue(event['topic'] == topic, 'event should be retrieved by topic')
self.assertTrue(len(event['message']) > 0, 'there must be some messages')
run.disconnect()
succeed = True
finally:
self.assertTrue(succeed, 'Finished with error')
if __name__ == "__main__":
unittest.main()