Implement bulk status update of workflow runs
Change-Id: I0763fea2bb4c524c2e84d58ba09d14e65b43f3bb
diff --git a/VERSION b/VERSION
index 0d91a54..1d0ba9e 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-0.3.0
+0.4.0
diff --git a/src/cord_workflow_controller_client/manager.py b/src/cord_workflow_controller_client/manager.py
index 2e5645c..7dab156 100644
--- a/src/cord_workflow_controller_client/manager.py
+++ b/src/cord_workflow_controller_client/manager.py
@@ -32,7 +32,8 @@
# controller -> manager
GREETING = 'cord.workflow.ctlsvc.greeting'
WORKFLOW_KICKSTART = 'cord.workflow.ctlsvc.workflow.kickstart'
-WORKFLOW_CHECK_STATE = 'cord.workflow.ctlsvc.workflow.check.state'
+WORKFLOW_CHECK_STATUS = 'cord.workflow.ctlsvc.workflow.check.status'
+WORKFLOW_CHECK_STATUS_BULK = 'cord.workflow.ctlsvc.workflow.check.status_bulk'
# manager -> controller -> manager
WORKFLOW_REGISTER = 'cord.workflow.ctlsvc.workflow.register'
@@ -43,7 +44,8 @@
WORKFLOW_REMOVE = 'cord.workflow.ctlsvc.workflow.remove'
WORKFLOW_REMOVE_RUN = 'cord.workflow.ctlsvc.workflow.run.remove'
WORKFLOW_REPORT_NEW_RUN = 'cord.workflow.ctlsvc.workflow.report_new_run'
-WORKFLOW_REPORT_RUN_STATE = 'cord.workflow.ctlsvc.workflow.report_run_state'
+WORKFLOW_REPORT_RUN_STATUS = 'cord.workflow.ctlsvc.workflow.report_run_status'
+WORKFLOW_REPORT_RUN_STATUS_BULK = 'cord.workflow.ctlsvc.workflow.report_run_status_bulk'
class Manager(object):
@@ -67,7 +69,8 @@
self.sio.on('connect', self.__on_sio_connect)
self.sio.on('disconnect', self.__on_sio_disconnect)
self.sio.on(WORKFLOW_KICKSTART, self.__on_kickstart_message)
- self.sio.on(WORKFLOW_CHECK_STATE, self.__on_check_state_message)
+ self.sio.on(WORKFLOW_CHECK_STATUS, self.__on_check_status_message)
+ self.sio.on(WORKFLOW_CHECK_STATUS_BULK, self.__on_check_status_bulk_message)
self.sio.on(GREETING, self.__on_greeting_message)
self.sio.on(WORKFLOW_REGISTER, self.__on_workflow_reg_message)
self.sio.on(WORKFLOW_REGISTER_ESSENCE, self.__on_workflow_reg_essence_message)
@@ -77,13 +80,15 @@
self.sio.on(WORKFLOW_REMOVE, self.__on_workflow_remove_message)
self.sio.on(WORKFLOW_REMOVE_RUN, self.__on_workflow_remove_run_message)
self.sio.on(WORKFLOW_REPORT_NEW_RUN, self.__on_workflow_report_new_run_message)
- self.sio.on(WORKFLOW_REPORT_RUN_STATE, self.__on_workflow_report_run_state_message)
+ self.sio.on(WORKFLOW_REPORT_RUN_STATUS, self.__on_workflow_report_run_status_message)
+ self.sio.on(WORKFLOW_REPORT_RUN_STATUS_BULK, self.__on_workflow_report_run_status_bulk_message)
self.handlers = {
'connect': self.__noop_connect_handler,
'disconnect': self.__noop_disconnect_handler,
'kickstart': self.__noop_kickstart_handler,
- 'check_state': self.__noop_check_state_handler
+ 'check_status': self.__noop_check_status_handler,
+ 'check_status_bulk': self.__noop_check_status_bulk_handler,
}
# key is req_id
@@ -116,8 +121,11 @@
def __noop_kickstart_handler(self, workflow_id, workflow_run_id):
self.logger.debug('no-op kickstart handler')
- def __noop_check_state_handler(self, workflow_id, workflow_run_id):
- self.logger.debug('no-op check-state handler')
+ def __noop_check_status_handler(self, workflow_id, workflow_run_id):
+ self.logger.debug('no-op check-status handler')
+
+ def __noop_check_status_bulk_handler(self, requests):
+ self.logger.debug('no-op check-status-bulk handler')
def __get_next_req_id(self):
req_id = self.req_id
@@ -149,28 +157,43 @@
self.logger.info('calling a kickstart handler - %s' % handler)
handler(workflow_id, workflow_run_id)
- def __on_check_state_message(self, data):
+ def __on_check_status_message(self, data):
"""
- Handler for a check-state event
+ Handler for a check-status event
REQ = {
'workflow_id': <workflow_id>,
'workflow_run_id': <workflow_run_id>
}
"""
- self.logger.info('received a check-state message from the server')
+ self.logger.info('received a check-status message from the server')
workflow_id = data['workflow_id']
workflow_run_id = data['workflow_run_id']
self.logger.info(
- 'a check-state message - workflow_id (%s), workflow_run_id (%s)' %
+ 'a check-status message - workflow_id (%s), workflow_run_id (%s)' %
(workflow_id, workflow_run_id)
)
if workflow_id and workflow_run_id:
- handler = self.handlers['check_state']
+ handler = self.handlers['check_status']
if callable(handler):
- self.logger.info('calling a check-state handler - %s' % handler)
+ self.logger.info('calling a check-status handler - %s' % handler)
handler(workflow_id, workflow_run_id)
+ def __on_check_status_bulk_message(self, data):
+ """
+ Handler for a check-status-bulk event
+ REQ = [{
+ 'workflow_id': <workflow_id>,
+ 'workflow_run_id': <workflow_run_id>
+ }, ...]
+ """
+ self.logger.info('received a check-status-bulk message from the server')
+ if data:
+ handler = self.handlers['check_status_bulk']
+ if callable(handler):
+ self.logger.info('calling a check-status handler - %s' % handler)
+ handler(data)
+
def __on_workflow_reg_message(self, data):
self.__on_response(WORKFLOW_REGISTER, data)
@@ -195,8 +218,11 @@
def __on_workflow_report_new_run_message(self, data):
self.__on_response(WORKFLOW_REPORT_NEW_RUN, data)
- def __on_workflow_report_run_state_message(self, data):
- self.__on_response(WORKFLOW_REPORT_RUN_STATE, data)
+ def __on_workflow_report_run_status_message(self, data):
+ self.__on_response(WORKFLOW_REPORT_RUN_STATUS, data)
+
+ def __on_workflow_report_run_status_bulk_message(self, data):
+ self.__on_response(WORKFLOW_REPORT_RUN_STATUS_BULK, data)
def __check_pending_request(self, req_id):
"""
@@ -555,15 +581,15 @@
(workflow_id, workflow_run_id)
)
- def report_workflow_run_state(self, workflow_id, workflow_run_id, state):
+ def report_workflow_run_status(self, workflow_id, workflow_run_id, status):
"""
- Report a new workflow run
+ Report status of a workflow run
"""
- if workflow_id and workflow_run_id and state:
- result = self.__request(WORKFLOW_REPORT_RUN_STATE, {
+ if workflow_id and workflow_run_id and status:
+ result = self.__request(WORKFLOW_REPORT_RUN_STATUS, {
'workflow_id': workflow_id,
'workflow_run_id': workflow_run_id,
- 'state': state
+ 'status': status
})
if result['error']:
self.logger.error(
@@ -578,10 +604,40 @@
return result['result']
else:
self.logger.error(
- 'invalid arguments workflow_id (%s), workflow_run_id (%s), state (%s)' %
- (workflow_id, workflow_run_id, state)
+ 'invalid arguments workflow_id (%s), workflow_run_id (%s), status (%s)' %
+ (workflow_id, workflow_run_id, status)
)
raise ClientInputError(
- 'invalid arguments workflow_id (%s), workflow_run_id (%s), state (%s)' %
- (workflow_id, workflow_run_id, state)
+ 'invalid arguments workflow_id (%s), workflow_run_id (%s), status (%s)' %
+ (workflow_id, workflow_run_id, status)
+ )
+
+ def report_workflow_run_status_bulk(self, data):
+ """
+ Report statuses of a workflow run
+ """
+
+ if data:
+ result = self.__request(WORKFLOW_REPORT_RUN_STATUS_BULK, {
+ 'data': data
+ })
+ if result['error']:
+ self.logger.error(
+ 'request (%s) failed with an error - %s' %
+ (result['req_id'], result['message'])
+ )
+ raise ClientResponseError(
+ 'request (%s) failed with an error - %s' %
+ (result['req_id'], result['message'])
+ )
+ else:
+ return result['result']
+ else:
+ self.logger.error(
+ 'invalid arguments data (%s)' %
+ json.dumps(data)
+ )
+ raise ClientInputError(
+ 'invalid arguments data (%s)' %
+ json.dumps(data)
)