Apply recent change on event emit API for probes

Change-Id: If0890c2d093ee25bb68b2f02df4bf453c9e370c3
diff --git a/VERSION b/VERSION
index 0ea3a94..0d91a54 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-0.2.0
+0.3.0
diff --git a/src/cord_workflow_controller_client/probe.py b/src/cord_workflow_controller_client/probe.py
index 0ef5288..bcbf372 100644
--- a/src/cord_workflow_controller_client/probe.py
+++ b/src/cord_workflow_controller_client/probe.py
@@ -23,10 +23,14 @@
 import json
 import socketio
 
-from .utils import get_noop_logger, gen_id
-from .errors import ClientInputError
+from .countdown_latch import CountDownLatch
+from .utils import get_noop_logger, gen_id, gen_seq_id
+from .errors import ClientRPCError, ClientInputError, ClientResponseError
+
+WAIT_TIMEOUT = 10  # 10 seconds
 
 GREETING = 'cord.workflow.ctlsvc.greeting'
+EVENT_EMIT = 'cord.workflow.ctlsvc.event.emit'
 
 
 class Probe(object):
@@ -43,17 +47,22 @@
         else:
             self.name = 'probe_%s' % gen_id()
 
+        self.req_id = gen_seq_id()
+
         # set sio handlers
         self.logger.debug('Setting event handlers to Socket.IO')
         self.sio.on('connect', self.__on_sio_connect)
         self.sio.on('disconnect', self.__on_sio_disconnect)
         self.sio.on(GREETING, self.__on_greeting_message)
-
+        self.sio.on(EVENT_EMIT, self.__on_event_emit_message)
         self.handlers = {
             'connect': self.__noop_connect_handler,
             'disconnect': self.__noop_disconnect_handler
         }
 
+        # key is req_id
+        self.pending_requests = {}
+
     def set_logger(self, logger):
         self.logger = logger
 
@@ -78,9 +87,84 @@
     def __noop_disconnect_handler(self):
         self.logger.debug('no-op disconnect handler')
 
+    def __get_next_req_id(self):
+        req_id = self.req_id
+        self.req_id += 1
+        return req_id
+
     def __on_greeting_message(self, data):
         self.logger.debug('received a greeting message from the server')
 
+    def __on_event_emit_message(self, data):
+        self.__on_response(EVENT_EMIT, data)
+
+    def __check_pending_request(self, req_id):
+        """
+        Check a pending request
+        """
+        if req_id in self.pending_requests:
+            return True
+        return False
+
+    def __put_pending_request(self, api, params):
+        """
+        Put a pending request to a queue
+        """
+        req_id = self.__get_next_req_id()
+        latch = CountDownLatch()
+        params['req_id'] = req_id  # inject req_id
+        self.sio.emit(api, params)
+        self.pending_requests[req_id] = {
+            'req_id': req_id,
+            'latch': latch,
+            'api': api,
+            'params': params,
+            'result': None
+        }
+        return req_id
+
+    def __wait_response(self, req_id):
+        """
+        Wait for completion of a request
+        """
+        if req_id in self.pending_requests:
+            req = self.pending_requests[req_id]
+            # python v 3.2 or below does not return a result
+            # that tells whether it is timedout or not
+            return req['latch'].wait(WAIT_TIMEOUT)
+        else:
+            self.logger.error(
+                'cannot find a pending request (%s) from a queue' % req_id
+            )
+            raise ClientRPCError(
+                req_id,
+                'cannot find a pending request (%s) from a queue' % req_id
+            )
+
+    def __complete_request(self, req_id, result):
+        """
+        Compelete a pending request
+        """
+        if req_id in self.pending_requests:
+            req = self.pending_requests[req_id]
+            req['latch'].count_down()
+            req['result'] = result
+            return
+
+        self.logger.error(
+            'cannot find a pending request (%s) from a queue' % req_id
+        )
+        raise ClientRPCError(
+            req_id,
+            'cannot find a pending request (%s) from a queue' % req_id
+        )
+
+    def __pop_pending_request(self, req_name):
+        """
+        Pop a pending request from a queue
+        """
+        return self.pending_requests.pop(req_name, None)
+
     def connect(self, url):
         """
         Connect to the given url
@@ -100,6 +184,12 @@
         """
         self.sio.disconnect()
 
+    def wait(self):
+        self.sio.wait()
+
+    def sleep(self, sec):
+        self.sio.sleep(sec)
+
     def get_handlers(self):
         return self.handlers
 
@@ -108,18 +198,78 @@
             if k in new_handlers:
                 self.handlers[k] = new_handlers[k]
 
-    def emit_event(self, event, body):
+    def __request(self, api, params={}):
+        if api and params:
+            req_id = self.__put_pending_request(api, params)
+            self.logger.debug('waiting for a response for req_id (%s)' % req_id)
+            self.__wait_response(req_id)  # wait for completion
+            req = self.__pop_pending_request(req_id)
+            if req:
+                if req['latch'].get_count() > 0:
+                    # timed out
+                    self.logger.error('request (%s) timed out' % req_id)
+                    raise ClientRPCError(
+                        req_id,
+                        'request (%s) timed out' % req_id
+                    )
+                else:
+                    return req['result']
+            else:
+                self.logger.error('cannot find a pending request (%s) from a queue' % req_id)
+                raise ClientRPCError(
+                    req_id,
+                    'cannot find a pending request (%s) from a queue' % req_id
+                )
+        else:
+            self.logger.error(
+                'invalid arguments api (%s), params (%s)' %
+                (api, json.dumps(params))
+            )
+            raise ClientInputError(
+                'invalid arguments api (%s), params (%s)' %
+                (api, json.dumps(params))
+            )
+
+    def __on_response(self, api, result):
+        if result and 'req_id' in result:
+            self.logger.debug('completing a request (%s)' % result['req_id'])
+            self.__complete_request(result['req_id'], result)
+        else:
+            self.logger.error(
+                'invalid arguments api (%s), result (%s)' %
+                (api, json.dumps(result))
+            )
+            raise ClientInputError(
+                'invalid arguments api (%s), result (%s)' %
+                (api, json.dumps(result))
+            )
+
+    def emit_event(self, topic, message):
         """
         Emit event to Workflow Controller
         """
-        if event and body:
-            self.sio.emit(event, body)
+        if topic and message:
+            result = self.__request(EVENT_EMIT, {
+                'topic': topic,
+                'message': message
+            })
+            if result['error']:
+                self.logger.error(
+                    'request (%s) failed with an error - %s' %
+                    (result['req_id'], result['message'])
+                )
+                raise ClientResponseError(
+                    'request (%s) failed with an error - %s' %
+                    (result['req_id'], result['message'])
+                )
+            else:
+                return result['result']
         else:
             self.logger.error(
-                'invalid arguments event(%s), body(%s)' %
-                (event, json.dumps(body))
+                'invalid arguments topic(%s), message(%s)' %
+                (topic, json.dumps(message))
             )
             raise ClientInputError(
-                'invalid arguments event(%s), body(%s)' %
-                (event, json.dumps(body))
+                'invalid arguments topic(%s), message(%s)' %
+                (topic, json.dumps(message))
             )
diff --git a/src/cord_workflow_controller_client/workflow_run.py b/src/cord_workflow_controller_client/workflow_run.py
index 9d3cf99..ac6f3a8 100644
--- a/src/cord_workflow_controller_client/workflow_run.py
+++ b/src/cord_workflow_controller_client/workflow_run.py
@@ -106,7 +106,7 @@
         return req_id
 
     def __on_greeting_message(self, data):
-        self.logger.debug('received a gretting message from the server')
+        self.logger.debug('received a greeting message from the server')
 
     def __on_notify_event_message(self, data):
         """
diff --git a/test/dummy_server.py b/test/dummy_server.py
index f7a3411..d127cbd 100644
--- a/test/dummy_server.py
+++ b/test/dummy_server.py
@@ -23,7 +23,7 @@
 from geventwebsocket.handler import WebSocketHandler
 from multiprocessing import Process
 from multistructlog import create_logger
-from cord_workflow_controller_client.probe import GREETING
+from cord_workflow_controller_client.probe import GREETING, EVENT_EMIT
 from cord_workflow_controller_client.manager \
     import (WORKFLOW_KICKSTART,
             WORKFLOW_REGISTER, WORKFLOW_REGISTER_ESSENCE, WORKFLOW_LIST, WORKFLOW_LIST_RUN,
@@ -545,6 +545,33 @@
     )
 
 
+def _handle_event_emit(sid, body):
+    data = {
+        'req_id': _get_req_id(body)
+    }
+
+    if 'topic' in body and 'message' in body:
+        # workflow_id = body['workflow_id']
+        topic = body['topic']
+        message = body['message']
+
+        log.info('probe topic %s - message %s' % (topic, message))
+
+        data['error'] = False
+        data['result'] = True
+    else:
+        data['error'] = True
+        data['result'] = False
+        data['message'] = 'topic or message is not in the message body'
+
+    log.info('returning a result for event emit to sid %s' % sid)
+    sio.emit(
+        event=EVENT_EMIT,
+        data=data,
+        room=sid
+    )
+
+
 def _handle_event(event, sid, body):
     log.info('event %s - body %s (%s)' % (event, body, type(body)))
 
@@ -584,6 +611,8 @@
             _handle_event_workflow_run_count_events(sid, args[1])
         elif event == WORKFLOW_RUN_FETCH_EVENT:
             _handle_event_workflow_run_fetch_event(sid, args[1])
+        elif event == EVENT_EMIT:
+            _handle_event_emit(sid, args[1])
         else:
             _handle_event(event, args[0], args[1])