Reflect changes on Airflow sensors/operators and essence
Rename event '*.notify_*' to '*.report_*'
Add a new function to report status of workflow runs

Change-Id: Ib7eb9df787ecba286bbdfbf4e9aed62a193fed7c
diff --git a/VERSION b/VERSION
index 6c6aa7c..17e51c3 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-0.1.0
\ No newline at end of file
+0.1.1
diff --git a/src/cord_workflow_controller_client/manager.py b/src/cord_workflow_controller_client/manager.py
index d788fe9..2e5645c 100644
--- a/src/cord_workflow_controller_client/manager.py
+++ b/src/cord_workflow_controller_client/manager.py
@@ -32,6 +32,7 @@
 # controller -> manager
 GREETING = 'cord.workflow.ctlsvc.greeting'
 WORKFLOW_KICKSTART = 'cord.workflow.ctlsvc.workflow.kickstart'
+WORKFLOW_CHECK_STATE = 'cord.workflow.ctlsvc.workflow.check.state'
 
 # manager -> controller -> manager
 WORKFLOW_REGISTER = 'cord.workflow.ctlsvc.workflow.register'
@@ -41,7 +42,8 @@
 WORKFLOW_CHECK = 'cord.workflow.ctlsvc.workflow.check'
 WORKFLOW_REMOVE = 'cord.workflow.ctlsvc.workflow.remove'
 WORKFLOW_REMOVE_RUN = 'cord.workflow.ctlsvc.workflow.run.remove'
-WORKFLOW_NOTIFY_NEW_RUN = 'cord.workflow.ctlsvc.workflow.notify_new_run'
+WORKFLOW_REPORT_NEW_RUN = 'cord.workflow.ctlsvc.workflow.report_new_run'
+WORKFLOW_REPORT_RUN_STATE = 'cord.workflow.ctlsvc.workflow.report_run_state'
 
 
 class Manager(object):
@@ -65,6 +67,7 @@
         self.sio.on('connect', self.__on_sio_connect)
         self.sio.on('disconnect', self.__on_sio_disconnect)
         self.sio.on(WORKFLOW_KICKSTART, self.__on_kickstart_message)
+        self.sio.on(WORKFLOW_CHECK_STATE, self.__on_check_state_message)
         self.sio.on(GREETING, self.__on_greeting_message)
         self.sio.on(WORKFLOW_REGISTER, self.__on_workflow_reg_message)
         self.sio.on(WORKFLOW_REGISTER_ESSENCE, self.__on_workflow_reg_essence_message)
@@ -73,12 +76,14 @@
         self.sio.on(WORKFLOW_CHECK, self.__on_workflow_check_message)
         self.sio.on(WORKFLOW_REMOVE, self.__on_workflow_remove_message)
         self.sio.on(WORKFLOW_REMOVE_RUN, self.__on_workflow_remove_run_message)
-        self.sio.on(WORKFLOW_NOTIFY_NEW_RUN, self.__on_workflow_notify_new_run_message)
+        self.sio.on(WORKFLOW_REPORT_NEW_RUN, self.__on_workflow_report_new_run_message)
+        self.sio.on(WORKFLOW_REPORT_RUN_STATE, self.__on_workflow_report_run_state_message)
 
         self.handlers = {
             'connect': self.__noop_connect_handler,
             'disconnect': self.__noop_disconnect_handler,
-            'kickstart': self.__noop_kickstart_handler
+            'kickstart': self.__noop_kickstart_handler,
+            'check_state': self.__noop_check_state_handler
         }
 
         # key is req_id
@@ -111,6 +116,9 @@
     def __noop_kickstart_handler(self, workflow_id, workflow_run_id):
         self.logger.debug('no-op kickstart handler')
 
+    def __noop_check_state_handler(self, workflow_id, workflow_run_id):
+        self.logger.debug('no-op check-state handler')
+
     def __get_next_req_id(self):
         req_id = self.req_id
         self.req_id += 1
@@ -141,6 +149,28 @@
                 self.logger.info('calling a kickstart handler - %s' % handler)
                 handler(workflow_id, workflow_run_id)
 
+    def __on_check_state_message(self, data):
+        """
+        Handler for a check-state event
+        REQ = {
+            'workflow_id': <workflow_id>,
+            'workflow_run_id': <workflow_run_id>
+        }
+        """
+        self.logger.info('received a check-state message from the server')
+        workflow_id = data['workflow_id']
+        workflow_run_id = data['workflow_run_id']
+
+        self.logger.info(
+            'a check-state message - workflow_id (%s), workflow_run_id (%s)' %
+            (workflow_id, workflow_run_id)
+        )
+        if workflow_id and workflow_run_id:
+            handler = self.handlers['check_state']
+            if callable(handler):
+                self.logger.info('calling a check-state handler - %s' % handler)
+                handler(workflow_id, workflow_run_id)
+
     def __on_workflow_reg_message(self, data):
         self.__on_response(WORKFLOW_REGISTER, data)
 
@@ -162,8 +192,11 @@
     def __on_workflow_remove_run_message(self, data):
         self.__on_response(WORKFLOW_REMOVE_RUN, data)
 
-    def __on_workflow_notify_new_run_message(self, data):
-        self.__on_response(WORKFLOW_NOTIFY_NEW_RUN, data)
+    def __on_workflow_report_new_run_message(self, data):
+        self.__on_response(WORKFLOW_REPORT_NEW_RUN, data)
+
+    def __on_workflow_report_run_state_message(self, data):
+        self.__on_response(WORKFLOW_REPORT_RUN_STATE, data)
 
     def __check_pending_request(self, req_id):
         """
@@ -492,12 +525,12 @@
                 (workflow_id, workflow_run_id)
             )
 
-    def notify_new_workflow_run(self, workflow_id, workflow_run_id):
+    def report_new_workflow_run(self, workflow_id, workflow_run_id):
         """
-        Notify a new workflow run
+        Report a new workflow run
         """
         if workflow_id and workflow_run_id:
-            result = self.__request(WORKFLOW_NOTIFY_NEW_RUN, {
+            result = self.__request(WORKFLOW_REPORT_NEW_RUN, {
                 'workflow_id': workflow_id,
                 'workflow_run_id': workflow_run_id
             })
@@ -521,3 +554,34 @@
                 'invalid arguments workflow_id (%s), workflow_run_id (%s)' %
                 (workflow_id, workflow_run_id)
             )
+
+    def report_workflow_run_state(self, workflow_id, workflow_run_id, state):
+        """
+        Report a new workflow run
+        """
+        if workflow_id and workflow_run_id and state:
+            result = self.__request(WORKFLOW_REPORT_RUN_STATE, {
+                'workflow_id': workflow_id,
+                'workflow_run_id': workflow_run_id,
+                'state': state
+            })
+            if result['error']:
+                self.logger.error(
+                    'request (%s) failed with an error - %s' %
+                    (result['req_id'], result['message'])
+                )
+                raise ClientResponseError(
+                    'request (%s) failed with an error - %s' %
+                    (result['req_id'], result['message'])
+                )
+            else:
+                return result['result']
+        else:
+            self.logger.error(
+                'invalid arguments workflow_id (%s), workflow_run_id (%s), state (%s)' %
+                (workflow_id, workflow_run_id, state)
+            )
+            raise ClientInputError(
+                'invalid arguments workflow_id (%s), workflow_run_id (%s), state (%s)' %
+                (workflow_id, workflow_run_id, state)
+            )
diff --git a/test/dummy_server.py b/test/dummy_server.py
index 9c22968..f7a3411 100644
--- a/test/dummy_server.py
+++ b/test/dummy_server.py
@@ -27,7 +27,7 @@
 from cord_workflow_controller_client.manager \
     import (WORKFLOW_KICKSTART,
             WORKFLOW_REGISTER, WORKFLOW_REGISTER_ESSENCE, WORKFLOW_LIST, WORKFLOW_LIST_RUN,
-            WORKFLOW_CHECK, WORKFLOW_REMOVE, WORKFLOW_REMOVE_RUN, WORKFLOW_NOTIFY_NEW_RUN)
+            WORKFLOW_CHECK, WORKFLOW_REMOVE, WORKFLOW_REMOVE_RUN, WORKFLOW_REPORT_NEW_RUN)
 from cord_workflow_controller_client.workflow_run \
     import (WORKFLOW_RUN_NOTIFY_EVENT,
             WORKFLOW_RUN_UPDATE_STATUS, WORKFLOW_RUN_COUNT_EVENTS, WORKFLOW_RUN_FETCH_EVENT)
@@ -427,7 +427,7 @@
 
     log.info('returning a result for a new workflow run event to sid %s' % sid)
     sio.emit(
-        event=WORKFLOW_NOTIFY_NEW_RUN,
+        event=WORKFLOW_REPORT_NEW_RUN,
         data=data,
         room=sid
     )
@@ -560,7 +560,7 @@
             _handle_event_disconnect(sid)
 
         # manager
-        elif event == WORKFLOW_NOTIFY_NEW_RUN:
+        elif event == WORKFLOW_REPORT_NEW_RUN:
             _handle_event_new_workflow_run(sid, args[1])
         elif event == WORKFLOW_REGISTER_ESSENCE:
             _handle_event_workflow_reg_essence(sid, args[1])
@@ -621,6 +621,8 @@
             proc.kill()
         process.kill()
         p.join()
+    except BaseException:
+        pass
     except psutil.NoSuchProcess:
         pass
 
diff --git a/test/hello_workflow.json b/test/hello_workflow.json
index 9de71bc..fd407e7 100644
--- a/test/hello_workflow.json
+++ b/test/hello_workflow.json
@@ -5,19 +5,18 @@
             "local_variable": "dag_hello"
         },
         "dependencies": {
-            "onu_event_handler": {}
+            "onu_event_sensor": {}
         },
         "tasks": {
-            "onu_event_handler": {
-                "class": "XOSEventSensor",
-                "dag_id": "hello_workflow",
+            "onu_event_sensor": {
+                "class": "CORDEventSensor",
+                "controller_conn_id": "local_cord_controller",
                 "dag": "dag_hello",
+                "dag_id": "hello_workflow",
                 "key_field": "serialNumber",
-                "local_variable": "onu_event_handler",
+                "local_variable": "onu_event_sensor",
                 "poke_interval": 5,
-                "provide_context": true,
-                "python_callable": "ONU_event",
-                "task_id": "onu_event_handler",
+                "task_id": "onu_event_sensor",
                 "topic": "onu.events"
             }
         }
diff --git a/test/test_manager.py b/test/test_manager.py
index d35a031..f3996a8 100644
--- a/test/test_manager.py
+++ b/test/test_manager.py
@@ -80,7 +80,7 @@
                     'workflow_id': workflow_id,
                     'workflow_run_id': workflow_run_id
                 }
-                manager.notify_new_workflow_run(workflow_id, workflow_run_id)
+                manager.report_new_workflow_run(workflow_id, workflow_run_id)
 
             manager.set_handlers({'kickstart': on_kickstart})
 
diff --git a/test/test_workflow_run.py b/test/test_workflow_run.py
index 80d6fcf..1736049 100644
--- a/test/test_workflow_run.py
+++ b/test/test_workflow_run.py
@@ -49,7 +49,7 @@
         essence_path = os.path.join(code_dir, "hello_workflow.json")
         essence = read_json_file(essence_path)
         self.manager.register_workflow_essence(essence)
-        self.manager.notify_new_workflow_run('hello_workflow', 'hello_workflow_123')
+        self.manager.report_new_workflow_run('hello_workflow', 'hello_workflow_123')
 
         # wait for 2 seconds for registering a new workflow run
         time.sleep(2)
@@ -160,7 +160,7 @@
             self.assertTrue(count_queued >= 2, 'There must be more than 2 events queued')
 
             # count_notified and count_queued may not have the same number temporarily
-            for i in range(count_notified):
+            for _ in range(count_notified):
                 notification = self.notifications.pop(0)
                 topic = notification['topic']
                 event = run.fetch_event('task123', topic)