Reflect changes on Airflow sensors/operators and essence
Rename event '*.notify_*' to '*.report_*'
Add a new function to report status of workflow runs
Change-Id: Ib7eb9df787ecba286bbdfbf4e9aed62a193fed7c
diff --git a/test/dummy_server.py b/test/dummy_server.py
index 9c22968..f7a3411 100644
--- a/test/dummy_server.py
+++ b/test/dummy_server.py
@@ -27,7 +27,7 @@
from cord_workflow_controller_client.manager \
import (WORKFLOW_KICKSTART,
WORKFLOW_REGISTER, WORKFLOW_REGISTER_ESSENCE, WORKFLOW_LIST, WORKFLOW_LIST_RUN,
- WORKFLOW_CHECK, WORKFLOW_REMOVE, WORKFLOW_REMOVE_RUN, WORKFLOW_NOTIFY_NEW_RUN)
+ WORKFLOW_CHECK, WORKFLOW_REMOVE, WORKFLOW_REMOVE_RUN, WORKFLOW_REPORT_NEW_RUN)
from cord_workflow_controller_client.workflow_run \
import (WORKFLOW_RUN_NOTIFY_EVENT,
WORKFLOW_RUN_UPDATE_STATUS, WORKFLOW_RUN_COUNT_EVENTS, WORKFLOW_RUN_FETCH_EVENT)
@@ -427,7 +427,7 @@
log.info('returning a result for a new workflow run event to sid %s' % sid)
sio.emit(
- event=WORKFLOW_NOTIFY_NEW_RUN,
+ event=WORKFLOW_REPORT_NEW_RUN,
data=data,
room=sid
)
@@ -560,7 +560,7 @@
_handle_event_disconnect(sid)
# manager
- elif event == WORKFLOW_NOTIFY_NEW_RUN:
+ elif event == WORKFLOW_REPORT_NEW_RUN:
_handle_event_new_workflow_run(sid, args[1])
elif event == WORKFLOW_REGISTER_ESSENCE:
_handle_event_workflow_reg_essence(sid, args[1])
@@ -621,6 +621,8 @@
proc.kill()
process.kill()
p.join()
+ except BaseException:
+ pass
except psutil.NoSuchProcess:
pass
diff --git a/test/hello_workflow.json b/test/hello_workflow.json
index 9de71bc..fd407e7 100644
--- a/test/hello_workflow.json
+++ b/test/hello_workflow.json
@@ -5,19 +5,18 @@
"local_variable": "dag_hello"
},
"dependencies": {
- "onu_event_handler": {}
+ "onu_event_sensor": {}
},
"tasks": {
- "onu_event_handler": {
- "class": "XOSEventSensor",
- "dag_id": "hello_workflow",
+ "onu_event_sensor": {
+ "class": "CORDEventSensor",
+ "controller_conn_id": "local_cord_controller",
"dag": "dag_hello",
+ "dag_id": "hello_workflow",
"key_field": "serialNumber",
- "local_variable": "onu_event_handler",
+ "local_variable": "onu_event_sensor",
"poke_interval": 5,
- "provide_context": true,
- "python_callable": "ONU_event",
- "task_id": "onu_event_handler",
+ "task_id": "onu_event_sensor",
"topic": "onu.events"
}
}
diff --git a/test/test_manager.py b/test/test_manager.py
index d35a031..f3996a8 100644
--- a/test/test_manager.py
+++ b/test/test_manager.py
@@ -80,7 +80,7 @@
'workflow_id': workflow_id,
'workflow_run_id': workflow_run_id
}
- manager.notify_new_workflow_run(workflow_id, workflow_run_id)
+ manager.report_new_workflow_run(workflow_id, workflow_run_id)
manager.set_handlers({'kickstart': on_kickstart})
diff --git a/test/test_workflow_run.py b/test/test_workflow_run.py
index 80d6fcf..1736049 100644
--- a/test/test_workflow_run.py
+++ b/test/test_workflow_run.py
@@ -49,7 +49,7 @@
essence_path = os.path.join(code_dir, "hello_workflow.json")
essence = read_json_file(essence_path)
self.manager.register_workflow_essence(essence)
- self.manager.notify_new_workflow_run('hello_workflow', 'hello_workflow_123')
+ self.manager.report_new_workflow_run('hello_workflow', 'hello_workflow_123')
# wait for 2 seconds for registering a new workflow run
time.sleep(2)
@@ -160,7 +160,7 @@
self.assertTrue(count_queued >= 2, 'There must be more than 2 events queued')
# count_notified and count_queued may not have the same number temporarily
- for i in range(count_notified):
+ for _ in range(count_notified):
notification = self.notifications.pop(0)
topic = notification['topic']
event = run.fetch_event('task123', topic)