Reflect changes on Airflow sensor/operator and essence
Fix spontaneous failures of test cases due to delayed client disconnection
Rename event '*.notify_*' to '*.report_*'
Add a new function to report status of workflow runs
Bump up version

Change-Id: I4fe25ec504751c6ea7a196c56ee4d157bab35abd
diff --git a/package.json b/package.json
index 676fcb1..fe064f5 100644
--- a/package.json
+++ b/package.json
@@ -1,6 +1,6 @@
 {
     "name": "cord_workflow_controller",
-    "version": "0.1.2",
+    "version": "0.2.0",
     "description": "CORD Workflow Controller",
     "main": "src/server.js",
     "scripts": {
@@ -45,4 +45,4 @@
       "mocha-multi-reporters": "^1.1.7",
       "mocha-junit-reporter": "^1.23.0"
     }
-  }
\ No newline at end of file
+  }
diff --git a/spec/eventrouter.spec.js b/spec/eventrouter.spec.js
index acf1e39..0b2a217 100644
--- a/spec/eventrouter.spec.js
+++ b/spec/eventrouter.spec.js
@@ -40,7 +40,7 @@
 
     var receivedKickstartMessages = [[],[]];
 
-    describe('Workflow kickstart test', function() {
+    describe('Event Router test', function() {
         this.slow(5000);
 
         before(function() {
@@ -70,12 +70,12 @@
                 (callback) => {
                     // connect first workflow manager to the server
                     // this manager will kickstart a workflow
-                    let workflowManagerClient = io.connect(`http://localhost:${port}`, {
+                    let workflowManagerClient1 = io.connect(`http://localhost:${port}`, {
                         query: 'id=workflow_manager_id1&type=workflow_manager' +
                                 '&name=manager1@xos.org'
                     });
 
-                    workflowManagerClient.on(eventrouter.serviceEvents.WORKFLOW_KICKSTART, (message) => {
+                    workflowManagerClient1.on(eventrouter.serviceEvents.WORKFLOW_KICKSTART, (message) => {
                         // save it for check
                         receivedKickstartMessages[0].push(message);
 
@@ -87,45 +87,45 @@
 
                         setTimeout(() => {
                             // call-back
-                            workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_NOTIFY_NEW_RUN, {
+                            workflowManagerClient1.emit(eventrouter.serviceEvents.WORKFLOW_REPORT_NEW_RUN, {
                                 workflow_id: message.workflow_id,
                                 workflow_run_id: message.workflow_run_id
                             })
                         }, 1000);
                     });
 
-                    workflowManagerClient.on('connect', () => {
+                    workflowManagerClient1.on('connect', () => {
                         callback(null, true);
                     });
 
-                    workflowManagerClients.push(workflowManagerClient);
+                    workflowManagerClients.push(workflowManagerClient1);
                     return;
                 },
                 (callback) => {
                     // connect second workflow manager to the server
                     // this manager will not kickstart a workflow
-                    let workflowManagerClient = io.connect(`http://localhost:${port}`, {
+                    let workflowManagerClient2 = io.connect(`http://localhost:${port}`, {
                         query: 'id=workflow_manager_id2&type=workflow_manager' +
                                 '&name=manager2@xos.org'
                     });
 
-                    workflowManagerClient.on(eventrouter.serviceEvents.WORKFLOW_KICKSTART, (message) => {
+                    workflowManagerClient2.on(eventrouter.serviceEvents.WORKFLOW_KICKSTART, (message) => {
                         receivedKickstartMessages[1].push(message);
 
                         setTimeout(() => {
                             // call-back
-                            workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_NOTIFY_NEW_RUN, {
+                            workflowManagerClient2.emit(eventrouter.serviceEvents.WORKFLOW_REPORT_NEW_RUN, {
                                 workflow_id: message.workflow_id,
                                 workflow_run_id: message.workflow_run_id
                             })
                         }, 1000);
                     });
 
-                    workflowManagerClient.on('connect', () => {
+                    workflowManagerClient2.on('connect', () => {
                         callback(null, true);
                     });
 
-                    workflowManagerClients.push(workflowManagerClient);
+                    workflowManagerClients.push(workflowManagerClient2);
                     return;
                 },
                 (callback) => {
@@ -163,7 +163,9 @@
             return;
         });
 
-        afterEach(function() {
+        afterEach(function(done) {
+            this.timeout(5000);
+
             // remove workflow runs
             _.forOwn(workflowRunInfos, (workflowRunInfo) => {
                 workflowManagerClients[0].emit(server.serviceEvents.WORKFLOW_REMOVE_RUN, {
@@ -205,6 +207,11 @@
                 probeClient.disconnect();
             }
             probeClient = null;
+
+            setTimeout(() => {
+                // this gives enough time to complete disconnection for clients
+                done();
+            }, 2000);
         });
 
         it('should have two workflows', function(done) {
diff --git a/spec/test_clients_workflow_essence.json b/spec/test_clients_workflow_essence.json
index 90d8bfc..2d49877 100644
--- a/spec/test_clients_workflow_essence.json
+++ b/spec/test_clients_workflow_essence.json
@@ -18,27 +18,25 @@
         },
         "tasks": {
             "task1": {
-                "class": "XOSEventSensor",
+                "class": "CORDEventSensor",
+                "controller_conn_id": "local_cord_controller",
                 "dag_id": "test_clients_workflow",
                 "dag": "dag_test_clients_workflow",
                 "key_field": "serialNumber",
                 "local_variable": "task1",
                 "poke_interval": 5,
-                "provide_context": true,
-                "python_callable": "task1_func",
                 "task_id": "task1",
                 "topic": "onu.events"
             },
             "task2": {
-                "class": "XOSModelSensor",
+                "class": "CORDModelSensor",
+                "controller_conn_id": "local_cord_controller",
                 "dag_id": "test_clients_workflow",
                 "dag": "dag_test_clients_workflow",
                 "key_field": "serialNumber",
                 "local_variable": "task2",
                 "model_name": "AttWorkflowDriverServiceInstance",
                 "poke_interval": 5,
-                "provide_context": true,
-                "python_callable": "task2_func",
                 "task_id": "task2"
             }
         }
diff --git a/spec/test_multi_workflow_essence.json b/spec/test_multi_workflow_essence.json
index 7d033f5..46b11c1 100644
--- a/spec/test_multi_workflow_essence.json
+++ b/spec/test_multi_workflow_essence.json
@@ -31,31 +31,28 @@
                 "dag": "dag_must_not_be_called",
                 "local_variable": "must_not_be_called_handler",
                 "poke_interval": 5,
-                "provide_context": true,
                 "task_id": "must_not_be_called_handler"
             },
             "onu_event_handler": {
-                "class": "XOSEventSensor",
+                "class": "CORDEventSensor",
+                "controller_conn_id": "local_cord_controller",
                 "dag_id": "must_not_be_called",
                 "dag": "dag_must_not_be_called",
                 "key_field": "serialNumber",
                 "local_variable": "onu_event_handler",
                 "poke_interval": 5,
-                "provide_context": true,
-                "python_callable": "ONU_event",
                 "task_id": "onu_event_handler",
                 "topic": "onu.events"
             },
             "onu_model_event_handler": {
-                "class": "XOSModelSensor",
+                "class": "CORDModelSensor",
+                "controller_conn_id": "local_cord_controller",
                 "dag_id": "must_not_be_called",
                 "dag": "dag_must_not_be_called",
                 "key_field": "serialNumber",
                 "local_variable": "onu_model_event_handler",
                 "model_name": "AttWorkflowDriverServiceInstance",
                 "poke_interval": 5,
-                "provide_context": true,
-                "python_callable": "DriverService_event",
                 "task_id": "onu_model_event_handler"
             }
         }
@@ -87,27 +84,25 @@
         },
         "tasks": {
             "onu_event_handler": {
-                "class": "XOSEventSensor",
+                "class": "CORDEventSensor",
+                "controller_conn_id": "local_cord_controller",
                 "dag_id": "must_not_be_called",
                 "dag": "dag_should_be_called",
                 "key_field": "serialNumber",
                 "local_variable": "onu_event_handler",
                 "poke_interval": 5,
-                "provide_context": true,
-                "python_callable": "ONU_event",
                 "task_id": "onu_event_handler",
                 "topic": "onu.events"
             },
             "onu_model_event_handler": {
-                "class": "XOSModelSensor",
+                "class": "CORDModelSensor",
+                "controller_conn_id": "local_cord_controller",
                 "dag_id": "must_not_be_called",
                 "dag": "dag_should_be_called",
                 "key_field": "serialNumber",
                 "local_variable": "onu_model_event_handler",
                 "model_name": "AttWorkflowDriverServiceInstance",
                 "poke_interval": 5,
-                "provide_context": true,
-                "python_callable": "DriverService_event",
                 "task_id": "onu_model_event_handler"
             },
             "can_be_stuck_handler": {
@@ -116,7 +111,6 @@
                 "dag": "dag_should_be_called",
                 "local_variable": "can_be_stuck_handler",
                 "poke_interval": 5,
-                "provide_context": true,
                 "task_id": "can_be_stuck_handler"
             }
         }
diff --git a/spec/clients.spec.js b/spec/websocket_clients.spec.js
similarity index 99%
rename from spec/clients.spec.js
rename to spec/websocket_clients.spec.js
index 1a40975..b1f6333 100644
--- a/spec/clients.spec.js
+++ b/spec/websocket_clients.spec.js
@@ -80,7 +80,7 @@
                         workflowRunId = message.workflow_run_id;
 
                         // call-back
-                        workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_NOTIFY_NEW_RUN, {
+                        workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_REPORT_NEW_RUN, {
                             workflow_id: workflowId,
                             workflow_run_id: workflowRunId
                         })
diff --git a/src/controllers/eventrouter.js b/src/controllers/eventrouter.js
index 975db2e..4919669 100644
--- a/src/controllers/eventrouter.js
+++ b/src/controllers/eventrouter.js
@@ -179,6 +179,18 @@
         return true;
     };
 
+    const setWorkflowRunState = (workflowRunId, state) => {
+        if(!(workflowRunId in workflowRuns)) {
+            logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
+            return false;
+        }
+
+        if(state in ['success', 'failed', 'end']) {
+            removeWorkflowRun(workflowRunId);
+        }
+        return true;
+    };
+
     const kickstart = (workflowId, workflowRunId) => {
         if(!(workflowId in workflows)) {
             logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
@@ -191,6 +203,7 @@
         }
 
         ws_manager.kickstartWorkflow(workflowId, workflowRunId);
+        return true;
     };
 
     const removeWorkflow = (workflowId) => {
@@ -608,5 +621,6 @@
         clearWorkflowRuns: clearWorkflowRuns,
         updateWorkflowRunStatus: updateWorkflowRunStatus,
         setWorkflowRunKickstarted: setWorkflowRunKickstarted,
+        setWorkflowRunState: setWorkflowRunState
     };
 })();
diff --git a/src/controllers/ws_manager.js b/src/controllers/ws_manager.js
index 3d81bac..752ee88 100644
--- a/src/controllers/ws_manager.js
+++ b/src/controllers/ws_manager.js
@@ -31,9 +31,11 @@
         WORKFLOW_CHECK: 'cord.workflow.ctlsvc.workflow.check',
         WORKFLOW_REMOVE: 'cord.workflow.ctlsvc.workflow.remove',
         WORKFLOW_REMOVE_RUN: 'cord.workflow.ctlsvc.workflow.run.remove',
-        WORKFLOW_NOTIFY_NEW_RUN: 'cord.workflow.ctlsvc.workflow.notify_new_run',
+        WORKFLOW_REPORT_NEW_RUN: 'cord.workflow.ctlsvc.workflow.report_new_run',
+        WORKFLOW_REPORT_RUN_STATE: 'cord.workflow.ctlsvc.workflow.report_run_state',
         // controller -> manager
-        WORKFLOW_KICKSTART: 'cord.workflow.ctlsvc.workflow.kickstart'
+        WORKFLOW_KICKSTART: 'cord.workflow.ctlsvc.workflow.kickstart',
+        WORKFLOW_CHECK_STATE: 'cord.workflow.ctlsvc.workflow.check.state'
     };
 
     // WebSocket interface for workflow registration
@@ -233,7 +235,7 @@
         let result = eventrouter.removeWorkflow(workflowId);
         cb(null, result);
         return;
-    }
+    };
 
     // WebSocket interface for workflow run removal
     // Message format:
@@ -278,19 +280,19 @@
         let result = eventrouter.removeWorkflowRun(workflowRunId);
         cb(null, result);
         return;
-    }
+    };
 
-    // WebSocket interface for notifying a new workflow run
+    // WebSocket interface for reporting a new workflow run
     // Message format:
     // {
-    //     topic: 'cord.workflow.ctlsvc.workflow.notify_new_run',
+    //     topic: 'cord.workflow.ctlsvc.workflow.report_new_run',
     //     message: {
     //         req_id: <req_id> // optional
     //         workflow_id: <workflow_id>,
     //         workflow_run_id: <workflow_run_id>
     //     }
     // }
-    const notifyNewWorkflowRun = (topic, message, cb) => {
+    const reportNewWorkflowRun = (topic, message, cb) => {
         const eventrouter = require('./eventrouter.js');
 
         let errorMessage;
@@ -325,6 +327,63 @@
         let result = eventrouter.setWorkflowRunKickstarted(workflowRunId);
         cb(null, result);
         return;
+    };
+
+    // WebSocket interface for reporting workflow run state
+    // Message format:
+    // {
+    //     topic: 'cord.workflow.ctlsvc.workflow.report_run_state',
+    //     message: {
+    //         req_id: <req_id> // optional
+    //         workflow_id: <workflow_id>,
+    //         workflow_run_id: <workflow_run_id>,
+    //         state: one of ['success', 'running', 'failed', 'unknown']
+    //     }
+    // }
+    const reportWorkflowRunState = (topic, message, cb) => {
+        const eventrouter = require('./eventrouter.js');
+
+        let errorMessage;
+        if(!message) {
+            // error
+            errorMessage = `Message body for topic ${topic} is null or empty`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('workflow_id' in message)) {
+            // error
+            errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('workflow_run_id' in message)) {
+            // error
+            errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(message)}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('state' in message)) {
+            // error
+            errorMessage = `field 'state' does not exist in message body - ${JSON.stringify(message)}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let workflowRunId = message.workflow_run_id;
+        let state = message.state;
+
+        // there must be a workflow matching
+        // set workflow state
+        let result = eventrouter.setWorkflowRunState(workflowRunId, state);
+        cb(null, result);
+        return;
     }
 
     const getRouter = () => {
@@ -357,9 +416,13 @@
                 topic: serviceEvents.WORKFLOW_REMOVE_RUN,
                 handler: removeWorkflowRun
             },
-            notifyNewWorkflowRun: {
-                topic: serviceEvents.WORKFLOW_NOTIFY_NEW_RUN,
-                handler: notifyNewWorkflowRun
+            reportNewWorkflowRun: {
+                topic: serviceEvents.WORKFLOW_REPORT_NEW_RUN,
+                handler: reportNewWorkflowRun
+            },
+            reportWorkflowRunState: {
+                topic: serviceEvents.WORKFLOW_REPORT_RUN_STATE,
+                handler: reportWorkflowRunState
             }
         };
     };
@@ -381,9 +444,26 @@
         return;
     };
 
+    const checkWorkflowState = (workflowId, workflowRunId) => {
+        const eventrouter = require('./eventrouter.js');
+
+        let clients = eventrouter.getWorkflowManagerClients();
+        _.forOwn(clients, (client, _clientId) => {
+            let socket = client.getSocket();
+            if(socket) {
+                socket.emit(serviceEvents.WORKFLOW_CHECK_STATE, {
+                    workflow_id: workflowId,
+                    workflow_run_id: workflowRunId
+                });
+            }
+        });
+        return;
+    };
+
     module.exports = {
         serviceEvents: serviceEvents,
         getRouter: getRouter,
-        kickstartWorkflow: kickstartWorkflow
+        kickstartWorkflow: kickstartWorkflow,
+        checkWorkflowState: checkWorkflowState
     };
 })();
diff --git a/src/types/workflowtask.js b/src/types/workflowtask.js
index 8e18700..9216a92 100644
--- a/src/types/workflowtask.js
+++ b/src/types/workflowtask.js
@@ -21,7 +21,6 @@
     const logger = require('../config/logger.js');
 
     const CORD_SENSORS = [
-        'XOSEventSensor', 'XOSModelSensor',
         'CORDEventSensor', 'CORDModelSensor'
     ];
 
@@ -133,7 +132,7 @@
                 return false;
             }
 
-            // general Airflow operators other than XOS operators don't have these fields.
+            // general Airflow operators other than XOS/CORD operators don't have these fields.
             //
             // if(!this.topic) {
             //     logger.log('error', 'topic is not given');
diff --git a/src/workflows/hello_workflow.json b/src/workflows/hello_workflow.json
index 9de71bc..4d86250 100644
--- a/src/workflows/hello_workflow.json
+++ b/src/workflows/hello_workflow.json
@@ -9,14 +9,13 @@
         },
         "tasks": {
             "onu_event_handler": {
-                "class": "XOSEventSensor",
+                "class": "CORDEventSensor",
+                "controller_conn_id": "local_cord_controller",
                 "dag_id": "hello_workflow",
                 "dag": "dag_hello",
                 "key_field": "serialNumber",
                 "local_variable": "onu_event_handler",
                 "poke_interval": 5,
-                "provide_context": true,
-                "python_callable": "ONU_event",
                 "task_id": "onu_event_handler",
                 "topic": "onu.events"
             }