Reflect changes on Airflow sensors/operators and essence
Rename event '*.notify_*' to '*.report_*'
Add a new function to report status of workflow runs
Change-Id: Ib7eb9df787ecba286bbdfbf4e9aed62a193fed7c
diff --git a/VERSION b/VERSION
index 6c6aa7c..17e51c3 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-0.1.0
\ No newline at end of file
+0.1.1
diff --git a/src/cord_workflow_controller_client/manager.py b/src/cord_workflow_controller_client/manager.py
index d788fe9..2e5645c 100644
--- a/src/cord_workflow_controller_client/manager.py
+++ b/src/cord_workflow_controller_client/manager.py
@@ -32,6 +32,7 @@
# controller -> manager
GREETING = 'cord.workflow.ctlsvc.greeting'
WORKFLOW_KICKSTART = 'cord.workflow.ctlsvc.workflow.kickstart'
+WORKFLOW_CHECK_STATE = 'cord.workflow.ctlsvc.workflow.check.state'
# manager -> controller -> manager
WORKFLOW_REGISTER = 'cord.workflow.ctlsvc.workflow.register'
@@ -41,7 +42,8 @@
WORKFLOW_CHECK = 'cord.workflow.ctlsvc.workflow.check'
WORKFLOW_REMOVE = 'cord.workflow.ctlsvc.workflow.remove'
WORKFLOW_REMOVE_RUN = 'cord.workflow.ctlsvc.workflow.run.remove'
-WORKFLOW_NOTIFY_NEW_RUN = 'cord.workflow.ctlsvc.workflow.notify_new_run'
+WORKFLOW_REPORT_NEW_RUN = 'cord.workflow.ctlsvc.workflow.report_new_run'
+WORKFLOW_REPORT_RUN_STATE = 'cord.workflow.ctlsvc.workflow.report_run_state'
class Manager(object):
@@ -65,6 +67,7 @@
self.sio.on('connect', self.__on_sio_connect)
self.sio.on('disconnect', self.__on_sio_disconnect)
self.sio.on(WORKFLOW_KICKSTART, self.__on_kickstart_message)
+ self.sio.on(WORKFLOW_CHECK_STATE, self.__on_check_state_message)
self.sio.on(GREETING, self.__on_greeting_message)
self.sio.on(WORKFLOW_REGISTER, self.__on_workflow_reg_message)
self.sio.on(WORKFLOW_REGISTER_ESSENCE, self.__on_workflow_reg_essence_message)
@@ -73,12 +76,14 @@
self.sio.on(WORKFLOW_CHECK, self.__on_workflow_check_message)
self.sio.on(WORKFLOW_REMOVE, self.__on_workflow_remove_message)
self.sio.on(WORKFLOW_REMOVE_RUN, self.__on_workflow_remove_run_message)
- self.sio.on(WORKFLOW_NOTIFY_NEW_RUN, self.__on_workflow_notify_new_run_message)
+ self.sio.on(WORKFLOW_REPORT_NEW_RUN, self.__on_workflow_report_new_run_message)
+ self.sio.on(WORKFLOW_REPORT_RUN_STATE, self.__on_workflow_report_run_state_message)
self.handlers = {
'connect': self.__noop_connect_handler,
'disconnect': self.__noop_disconnect_handler,
- 'kickstart': self.__noop_kickstart_handler
+ 'kickstart': self.__noop_kickstart_handler,
+ 'check_state': self.__noop_check_state_handler
}
# key is req_id
@@ -111,6 +116,9 @@
def __noop_kickstart_handler(self, workflow_id, workflow_run_id):
self.logger.debug('no-op kickstart handler')
+ def __noop_check_state_handler(self, workflow_id, workflow_run_id):
+ self.logger.debug('no-op check-state handler')
+
def __get_next_req_id(self):
req_id = self.req_id
self.req_id += 1
@@ -141,6 +149,28 @@
self.logger.info('calling a kickstart handler - %s' % handler)
handler(workflow_id, workflow_run_id)
+ def __on_check_state_message(self, data):
+ """
+ Handler for a check-state event
+ REQ = {
+ 'workflow_id': <workflow_id>,
+ 'workflow_run_id': <workflow_run_id>
+ }
+ """
+ self.logger.info('received a check-state message from the server')
+ workflow_id = data['workflow_id']
+ workflow_run_id = data['workflow_run_id']
+
+ self.logger.info(
+ 'a check-state message - workflow_id (%s), workflow_run_id (%s)' %
+ (workflow_id, workflow_run_id)
+ )
+ if workflow_id and workflow_run_id:
+ handler = self.handlers['check_state']
+ if callable(handler):
+ self.logger.info('calling a check-state handler - %s' % handler)
+ handler(workflow_id, workflow_run_id)
+
def __on_workflow_reg_message(self, data):
self.__on_response(WORKFLOW_REGISTER, data)
@@ -162,8 +192,11 @@
def __on_workflow_remove_run_message(self, data):
self.__on_response(WORKFLOW_REMOVE_RUN, data)
- def __on_workflow_notify_new_run_message(self, data):
- self.__on_response(WORKFLOW_NOTIFY_NEW_RUN, data)
+ def __on_workflow_report_new_run_message(self, data):
+ self.__on_response(WORKFLOW_REPORT_NEW_RUN, data)
+
+ def __on_workflow_report_run_state_message(self, data):
+ self.__on_response(WORKFLOW_REPORT_RUN_STATE, data)
def __check_pending_request(self, req_id):
"""
@@ -492,12 +525,12 @@
(workflow_id, workflow_run_id)
)
- def notify_new_workflow_run(self, workflow_id, workflow_run_id):
+ def report_new_workflow_run(self, workflow_id, workflow_run_id):
"""
- Notify a new workflow run
+ Report a new workflow run
"""
if workflow_id and workflow_run_id:
- result = self.__request(WORKFLOW_NOTIFY_NEW_RUN, {
+ result = self.__request(WORKFLOW_REPORT_NEW_RUN, {
'workflow_id': workflow_id,
'workflow_run_id': workflow_run_id
})
@@ -521,3 +554,34 @@
'invalid arguments workflow_id (%s), workflow_run_id (%s)' %
(workflow_id, workflow_run_id)
)
+
+ def report_workflow_run_state(self, workflow_id, workflow_run_id, state):
+ """
+ Report a new workflow run
+ """
+ if workflow_id and workflow_run_id and state:
+ result = self.__request(WORKFLOW_REPORT_RUN_STATE, {
+ 'workflow_id': workflow_id,
+ 'workflow_run_id': workflow_run_id,
+ 'state': state
+ })
+ if result['error']:
+ self.logger.error(
+ 'request (%s) failed with an error - %s' %
+ (result['req_id'], result['message'])
+ )
+ raise ClientResponseError(
+ 'request (%s) failed with an error - %s' %
+ (result['req_id'], result['message'])
+ )
+ else:
+ return result['result']
+ else:
+ self.logger.error(
+ 'invalid arguments workflow_id (%s), workflow_run_id (%s), state (%s)' %
+ (workflow_id, workflow_run_id, state)
+ )
+ raise ClientInputError(
+ 'invalid arguments workflow_id (%s), workflow_run_id (%s), state (%s)' %
+ (workflow_id, workflow_run_id, state)
+ )
diff --git a/test/dummy_server.py b/test/dummy_server.py
index 9c22968..f7a3411 100644
--- a/test/dummy_server.py
+++ b/test/dummy_server.py
@@ -27,7 +27,7 @@
from cord_workflow_controller_client.manager \
import (WORKFLOW_KICKSTART,
WORKFLOW_REGISTER, WORKFLOW_REGISTER_ESSENCE, WORKFLOW_LIST, WORKFLOW_LIST_RUN,
- WORKFLOW_CHECK, WORKFLOW_REMOVE, WORKFLOW_REMOVE_RUN, WORKFLOW_NOTIFY_NEW_RUN)
+ WORKFLOW_CHECK, WORKFLOW_REMOVE, WORKFLOW_REMOVE_RUN, WORKFLOW_REPORT_NEW_RUN)
from cord_workflow_controller_client.workflow_run \
import (WORKFLOW_RUN_NOTIFY_EVENT,
WORKFLOW_RUN_UPDATE_STATUS, WORKFLOW_RUN_COUNT_EVENTS, WORKFLOW_RUN_FETCH_EVENT)
@@ -427,7 +427,7 @@
log.info('returning a result for a new workflow run event to sid %s' % sid)
sio.emit(
- event=WORKFLOW_NOTIFY_NEW_RUN,
+ event=WORKFLOW_REPORT_NEW_RUN,
data=data,
room=sid
)
@@ -560,7 +560,7 @@
_handle_event_disconnect(sid)
# manager
- elif event == WORKFLOW_NOTIFY_NEW_RUN:
+ elif event == WORKFLOW_REPORT_NEW_RUN:
_handle_event_new_workflow_run(sid, args[1])
elif event == WORKFLOW_REGISTER_ESSENCE:
_handle_event_workflow_reg_essence(sid, args[1])
@@ -621,6 +621,8 @@
proc.kill()
process.kill()
p.join()
+ except BaseException:
+ pass
except psutil.NoSuchProcess:
pass
diff --git a/test/hello_workflow.json b/test/hello_workflow.json
index 9de71bc..fd407e7 100644
--- a/test/hello_workflow.json
+++ b/test/hello_workflow.json
@@ -5,19 +5,18 @@
"local_variable": "dag_hello"
},
"dependencies": {
- "onu_event_handler": {}
+ "onu_event_sensor": {}
},
"tasks": {
- "onu_event_handler": {
- "class": "XOSEventSensor",
- "dag_id": "hello_workflow",
+ "onu_event_sensor": {
+ "class": "CORDEventSensor",
+ "controller_conn_id": "local_cord_controller",
"dag": "dag_hello",
+ "dag_id": "hello_workflow",
"key_field": "serialNumber",
- "local_variable": "onu_event_handler",
+ "local_variable": "onu_event_sensor",
"poke_interval": 5,
- "provide_context": true,
- "python_callable": "ONU_event",
- "task_id": "onu_event_handler",
+ "task_id": "onu_event_sensor",
"topic": "onu.events"
}
}
diff --git a/test/test_manager.py b/test/test_manager.py
index d35a031..f3996a8 100644
--- a/test/test_manager.py
+++ b/test/test_manager.py
@@ -80,7 +80,7 @@
'workflow_id': workflow_id,
'workflow_run_id': workflow_run_id
}
- manager.notify_new_workflow_run(workflow_id, workflow_run_id)
+ manager.report_new_workflow_run(workflow_id, workflow_run_id)
manager.set_handlers({'kickstart': on_kickstart})
diff --git a/test/test_workflow_run.py b/test/test_workflow_run.py
index 80d6fcf..1736049 100644
--- a/test/test_workflow_run.py
+++ b/test/test_workflow_run.py
@@ -49,7 +49,7 @@
essence_path = os.path.join(code_dir, "hello_workflow.json")
essence = read_json_file(essence_path)
self.manager.register_workflow_essence(essence)
- self.manager.notify_new_workflow_run('hello_workflow', 'hello_workflow_123')
+ self.manager.report_new_workflow_run('hello_workflow', 'hello_workflow_123')
# wait for 2 seconds for registering a new workflow run
time.sleep(2)
@@ -160,7 +160,7 @@
self.assertTrue(count_queued >= 2, 'There must be more than 2 events queued')
# count_notified and count_queued may not have the same number temporarily
- for i in range(count_notified):
+ for _ in range(count_notified):
notification = self.notifications.pop(0)
topic = notification['topic']
event = run.fetch_event('task123', topic)