Apply recent change on event emit API for probes
Change-Id: If0890c2d093ee25bb68b2f02df4bf453c9e370c3
diff --git a/VERSION b/VERSION
index 0ea3a94..0d91a54 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-0.2.0
+0.3.0
diff --git a/src/cord_workflow_controller_client/probe.py b/src/cord_workflow_controller_client/probe.py
index 0ef5288..bcbf372 100644
--- a/src/cord_workflow_controller_client/probe.py
+++ b/src/cord_workflow_controller_client/probe.py
@@ -23,10 +23,14 @@
import json
import socketio
-from .utils import get_noop_logger, gen_id
-from .errors import ClientInputError
+from .countdown_latch import CountDownLatch
+from .utils import get_noop_logger, gen_id, gen_seq_id
+from .errors import ClientRPCError, ClientInputError, ClientResponseError
+
+WAIT_TIMEOUT = 10 # 10 seconds
GREETING = 'cord.workflow.ctlsvc.greeting'
+EVENT_EMIT = 'cord.workflow.ctlsvc.event.emit'
class Probe(object):
@@ -43,17 +47,22 @@
else:
self.name = 'probe_%s' % gen_id()
+ self.req_id = gen_seq_id()
+
# set sio handlers
self.logger.debug('Setting event handlers to Socket.IO')
self.sio.on('connect', self.__on_sio_connect)
self.sio.on('disconnect', self.__on_sio_disconnect)
self.sio.on(GREETING, self.__on_greeting_message)
-
+ self.sio.on(EVENT_EMIT, self.__on_event_emit_message)
self.handlers = {
'connect': self.__noop_connect_handler,
'disconnect': self.__noop_disconnect_handler
}
+ # key is req_id
+ self.pending_requests = {}
+
def set_logger(self, logger):
self.logger = logger
@@ -78,9 +87,84 @@
def __noop_disconnect_handler(self):
self.logger.debug('no-op disconnect handler')
+ def __get_next_req_id(self):
+ req_id = self.req_id
+ self.req_id += 1
+ return req_id
+
def __on_greeting_message(self, data):
self.logger.debug('received a greeting message from the server')
+ def __on_event_emit_message(self, data):
+ self.__on_response(EVENT_EMIT, data)
+
+ def __check_pending_request(self, req_id):
+ """
+ Check a pending request
+ """
+ if req_id in self.pending_requests:
+ return True
+ return False
+
+ def __put_pending_request(self, api, params):
+ """
+ Put a pending request to a queue
+ """
+ req_id = self.__get_next_req_id()
+ latch = CountDownLatch()
+ params['req_id'] = req_id # inject req_id
+ self.sio.emit(api, params)
+ self.pending_requests[req_id] = {
+ 'req_id': req_id,
+ 'latch': latch,
+ 'api': api,
+ 'params': params,
+ 'result': None
+ }
+ return req_id
+
+ def __wait_response(self, req_id):
+ """
+ Wait for completion of a request
+ """
+ if req_id in self.pending_requests:
+ req = self.pending_requests[req_id]
+ # python v 3.2 or below does not return a result
+ # that tells whether it is timedout or not
+ return req['latch'].wait(WAIT_TIMEOUT)
+ else:
+ self.logger.error(
+ 'cannot find a pending request (%s) from a queue' % req_id
+ )
+ raise ClientRPCError(
+ req_id,
+ 'cannot find a pending request (%s) from a queue' % req_id
+ )
+
+ def __complete_request(self, req_id, result):
+ """
+ Compelete a pending request
+ """
+ if req_id in self.pending_requests:
+ req = self.pending_requests[req_id]
+ req['latch'].count_down()
+ req['result'] = result
+ return
+
+ self.logger.error(
+ 'cannot find a pending request (%s) from a queue' % req_id
+ )
+ raise ClientRPCError(
+ req_id,
+ 'cannot find a pending request (%s) from a queue' % req_id
+ )
+
+ def __pop_pending_request(self, req_name):
+ """
+ Pop a pending request from a queue
+ """
+ return self.pending_requests.pop(req_name, None)
+
def connect(self, url):
"""
Connect to the given url
@@ -100,6 +184,12 @@
"""
self.sio.disconnect()
+ def wait(self):
+ self.sio.wait()
+
+ def sleep(self, sec):
+ self.sio.sleep(sec)
+
def get_handlers(self):
return self.handlers
@@ -108,18 +198,78 @@
if k in new_handlers:
self.handlers[k] = new_handlers[k]
- def emit_event(self, event, body):
+ def __request(self, api, params={}):
+ if api and params:
+ req_id = self.__put_pending_request(api, params)
+ self.logger.debug('waiting for a response for req_id (%s)' % req_id)
+ self.__wait_response(req_id) # wait for completion
+ req = self.__pop_pending_request(req_id)
+ if req:
+ if req['latch'].get_count() > 0:
+ # timed out
+ self.logger.error('request (%s) timed out' % req_id)
+ raise ClientRPCError(
+ req_id,
+ 'request (%s) timed out' % req_id
+ )
+ else:
+ return req['result']
+ else:
+ self.logger.error('cannot find a pending request (%s) from a queue' % req_id)
+ raise ClientRPCError(
+ req_id,
+ 'cannot find a pending request (%s) from a queue' % req_id
+ )
+ else:
+ self.logger.error(
+ 'invalid arguments api (%s), params (%s)' %
+ (api, json.dumps(params))
+ )
+ raise ClientInputError(
+ 'invalid arguments api (%s), params (%s)' %
+ (api, json.dumps(params))
+ )
+
+ def __on_response(self, api, result):
+ if result and 'req_id' in result:
+ self.logger.debug('completing a request (%s)' % result['req_id'])
+ self.__complete_request(result['req_id'], result)
+ else:
+ self.logger.error(
+ 'invalid arguments api (%s), result (%s)' %
+ (api, json.dumps(result))
+ )
+ raise ClientInputError(
+ 'invalid arguments api (%s), result (%s)' %
+ (api, json.dumps(result))
+ )
+
+ def emit_event(self, topic, message):
"""
Emit event to Workflow Controller
"""
- if event and body:
- self.sio.emit(event, body)
+ if topic and message:
+ result = self.__request(EVENT_EMIT, {
+ 'topic': topic,
+ 'message': message
+ })
+ if result['error']:
+ self.logger.error(
+ 'request (%s) failed with an error - %s' %
+ (result['req_id'], result['message'])
+ )
+ raise ClientResponseError(
+ 'request (%s) failed with an error - %s' %
+ (result['req_id'], result['message'])
+ )
+ else:
+ return result['result']
else:
self.logger.error(
- 'invalid arguments event(%s), body(%s)' %
- (event, json.dumps(body))
+ 'invalid arguments topic(%s), message(%s)' %
+ (topic, json.dumps(message))
)
raise ClientInputError(
- 'invalid arguments event(%s), body(%s)' %
- (event, json.dumps(body))
+ 'invalid arguments topic(%s), message(%s)' %
+ (topic, json.dumps(message))
)
diff --git a/src/cord_workflow_controller_client/workflow_run.py b/src/cord_workflow_controller_client/workflow_run.py
index 9d3cf99..ac6f3a8 100644
--- a/src/cord_workflow_controller_client/workflow_run.py
+++ b/src/cord_workflow_controller_client/workflow_run.py
@@ -106,7 +106,7 @@
return req_id
def __on_greeting_message(self, data):
- self.logger.debug('received a gretting message from the server')
+ self.logger.debug('received a greeting message from the server')
def __on_notify_event_message(self, data):
"""
diff --git a/test/dummy_server.py b/test/dummy_server.py
index f7a3411..d127cbd 100644
--- a/test/dummy_server.py
+++ b/test/dummy_server.py
@@ -23,7 +23,7 @@
from geventwebsocket.handler import WebSocketHandler
from multiprocessing import Process
from multistructlog import create_logger
-from cord_workflow_controller_client.probe import GREETING
+from cord_workflow_controller_client.probe import GREETING, EVENT_EMIT
from cord_workflow_controller_client.manager \
import (WORKFLOW_KICKSTART,
WORKFLOW_REGISTER, WORKFLOW_REGISTER_ESSENCE, WORKFLOW_LIST, WORKFLOW_LIST_RUN,
@@ -545,6 +545,33 @@
)
+def _handle_event_emit(sid, body):
+ data = {
+ 'req_id': _get_req_id(body)
+ }
+
+ if 'topic' in body and 'message' in body:
+ # workflow_id = body['workflow_id']
+ topic = body['topic']
+ message = body['message']
+
+ log.info('probe topic %s - message %s' % (topic, message))
+
+ data['error'] = False
+ data['result'] = True
+ else:
+ data['error'] = True
+ data['result'] = False
+ data['message'] = 'topic or message is not in the message body'
+
+ log.info('returning a result for event emit to sid %s' % sid)
+ sio.emit(
+ event=EVENT_EMIT,
+ data=data,
+ room=sid
+ )
+
+
def _handle_event(event, sid, body):
log.info('event %s - body %s (%s)' % (event, body, type(body)))
@@ -584,6 +611,8 @@
_handle_event_workflow_run_count_events(sid, args[1])
elif event == WORKFLOW_RUN_FETCH_EVENT:
_handle_event_workflow_run_fetch_event(sid, args[1])
+ elif event == EVENT_EMIT:
+ _handle_event_emit(sid, args[1])
else:
_handle_event(event, args[0], args[1])