Implement bulk status update of workflow runs

Change-Id: I6b67048f502d42a2572a936944c9e54300076478
diff --git a/package.json b/package.json
index 3d9f83b..7f30ec4 100644
--- a/package.json
+++ b/package.json
@@ -1,6 +1,6 @@
 {
     "name": "cord_workflow_controller",
-    "version": "0.3.2",
+    "version": "0.4.0",
     "description": "CORD Workflow Controller",
     "main": "src/server.js",
     "scripts": {
diff --git a/src/controllers/eventrouter.js b/src/controllers/eventrouter.js
index 4d9ee47..334c08b 100644
--- a/src/controllers/eventrouter.js
+++ b/src/controllers/eventrouter.js
@@ -45,6 +45,19 @@
         GREETING: 'cord.workflow.ctlsvc.greeting'
     };
 
+    setInterval(function () {
+        let requests = [];
+        _.forOwn(workflowRuns, (workflowRun, workflowRunId) => {
+            let obj = {
+                workflow_id: workflowRun.getWorkflowId(),
+                workflow_run_id: workflowRunId
+            };
+            requests.push(obj);
+        });
+
+        checkWorkflowRunStatusBulk(requests);
+    }, 5000);
+
     // add ws_probe events
     _.forOwn(ws_probe.serviceEvents, (wsServiceEvent, key) => {
         serviceEvents[key] = wsServiceEvent;
@@ -185,13 +198,13 @@
         return true;
     };
 
-    const setWorkflowRunState = (workflowRunId, state) => {
+    const setWorkflowRunStatus = (workflowRunId, status) => {
         if(!(workflowRunId in workflowRuns)) {
             logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
             return false;
         }
 
-        if(state in ['success', 'failed', 'end']) {
+        if(status in ['success', 'failed', 'end']) {
             removeWorkflowRun(workflowRunId);
         }
         return true;
@@ -212,6 +225,31 @@
         return true;
     };
 
+    /*
+    const checkWorkflowRunStatus = (workflowId, workflowRunId) => {
+        if(!(workflowId in workflows)) {
+            logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
+            return false;
+        }
+
+        if(!(workflowRunId in workflowRuns)) {
+            logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
+            return false;
+        }
+
+        ws_manager.checkWorkflowRunStatus(workflowId, workflowRunId);
+        return true;
+    };
+    */
+
+    const checkWorkflowRunStatusBulk = (requests) => {
+        if(requests) {
+            ws_manager.checkWorkflowRunStatusBulk(requests);
+            return true;
+        }
+        return false;
+    };
+
     const removeWorkflow = (workflowId) => {
         if(!(workflowId in workflows)) {
             logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
@@ -391,6 +429,8 @@
             let router = ws_probe.getRouter();
             _.forOwn(router, (routerElem, _key) => {
                 socket.on(routerElem.topic, (msg) => {
+                    logger.log('debug', `received a probe event ${routerElem.topic} - ${JSON.stringify(msg)}`);
+
                     // handle a common parameter - req_id
                     // when we get req_id, return the same req_id in response.
                     // this is to help identify a request from a response at client-side
@@ -450,6 +490,8 @@
             let router = ws_manager.getRouter();
             _.forOwn(router, (routerElem, _key) => {
                 socket.on(routerElem.topic, (msg) => {
+                    logger.log('debug', `received a manager event ${routerElem.topic} - ${JSON.stringify(msg)}`);
+
                     // handle a common parameter - req_id
                     // when we get req_id, return the same req_id in response.
                     // this is to help identify a request from a response at client-side
@@ -533,6 +575,8 @@
             let router = ws_workflowrun.getRouter();
             _.forOwn(router, (routerElem, _key) => {
                 socket.on(routerElem.topic, (msg) => {
+                    logger.log('debug', `received a workflow run event ${routerElem.topic} - ${JSON.stringify(msg)}`);
+
                     // handle a common parameter - req_id
                     // when we get req_id, return the same req_id in response.
                     // this is to help identify a request from a response at client-side
@@ -650,6 +694,6 @@
         clearWorkflowRuns: clearWorkflowRuns,
         updateWorkflowRunStatus: updateWorkflowRunStatus,
         setWorkflowRunKickstarted: setWorkflowRunKickstarted,
-        setWorkflowRunState: setWorkflowRunState
+        setWorkflowRunStatus: setWorkflowRunStatus
     };
 })();
diff --git a/src/controllers/ws_manager.js b/src/controllers/ws_manager.js
index 752ee88..4ce95c5 100644
--- a/src/controllers/ws_manager.js
+++ b/src/controllers/ws_manager.js
@@ -32,10 +32,12 @@
         WORKFLOW_REMOVE: 'cord.workflow.ctlsvc.workflow.remove',
         WORKFLOW_REMOVE_RUN: 'cord.workflow.ctlsvc.workflow.run.remove',
         WORKFLOW_REPORT_NEW_RUN: 'cord.workflow.ctlsvc.workflow.report_new_run',
-        WORKFLOW_REPORT_RUN_STATE: 'cord.workflow.ctlsvc.workflow.report_run_state',
+        WORKFLOW_REPORT_RUN_STATUS: 'cord.workflow.ctlsvc.workflow.report_run_status',
+        WORKFLOW_REPORT_RUN_STATUS_BULK: 'cord.workflow.ctlsvc.workflow.report_run_status_bulk',
         // controller -> manager
         WORKFLOW_KICKSTART: 'cord.workflow.ctlsvc.workflow.kickstart',
-        WORKFLOW_CHECK_STATE: 'cord.workflow.ctlsvc.workflow.check.state'
+        WORKFLOW_CHECK_STATUS: 'cord.workflow.ctlsvc.workflow.check.status',
+        WORKFLOW_CHECK_STATUS_BULK: 'cord.workflow.ctlsvc.workflow.check.status_bulk',
     };
 
     // WebSocket interface for workflow registration
@@ -329,18 +331,18 @@
         return;
     };
 
-    // WebSocket interface for reporting workflow run state
+    // WebSocket interface for reporting workflow run status
     // Message format:
     // {
-    //     topic: 'cord.workflow.ctlsvc.workflow.report_run_state',
+    //     topic: 'cord.workflow.ctlsvc.workflow.report_run_status',
     //     message: {
     //         req_id: <req_id> // optional
     //         workflow_id: <workflow_id>,
     //         workflow_run_id: <workflow_run_id>,
-    //         state: one of ['success', 'running', 'failed', 'unknown']
+    //         status: one of ['success', 'running', 'failed', 'unknown']
     //     }
     // }
-    const reportWorkflowRunState = (topic, message, cb) => {
+    const reportWorkflowRunStatus = (topic, message, cb) => {
         const eventrouter = require('./eventrouter.js');
 
         let errorMessage;
@@ -368,24 +370,96 @@
             return;
         }
 
-        if(!('state' in message)) {
+        if(!('status' in message)) {
             // error
-            errorMessage = `field 'state' does not exist in message body - ${JSON.stringify(message)}`;
+            errorMessage = `field 'status' does not exist in message body - ${JSON.stringify(message)}`;
             logger.log('warn', `Return error - ${errorMessage}`);
             cb(errorMessage, false);
             return;
         }
 
         let workflowRunId = message.workflow_run_id;
-        let state = message.state;
+        let status = message.status;
 
         // there must be a workflow matching
-        // set workflow state
-        let result = eventrouter.setWorkflowRunState(workflowRunId, state);
+        // set workflow status
+        let result = eventrouter.setWorkflowRunStatus(workflowRunId, status);
         cb(null, result);
         return;
     }
 
+    // WebSocket interface for reporting workflow run status bulk
+    // Message format:
+    // {
+    //     topic: 'cord.workflow.ctlsvc.workflow.report_run_status_bulk',
+    //     message: {
+    //         req_id: <req_id> // optional
+    //         data: [{
+    //             workflow_id: <workflow_id>,
+    //             workflow_run_id: <workflow_run_id>,
+    //             status: one of ['success', 'running', 'failed', 'unknown']
+    //         }, ...]
+    //     }
+    // }
+    const reportWorkflowRunStatusBulk = (topic, message, cb) => {
+        const eventrouter = require('./eventrouter.js');
+
+        let errorMessage;
+        if(!message) {
+            // error
+            errorMessage = `Message body for topic ${topic} is null or empty`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('data' in message)) {
+            // error
+            errorMessage = `field 'data' does not exist in message body - ${JSON.stringify(message)}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let results = [];
+        for(let d in message.data) {
+            if(!('workflow_id' in d)) {
+                // error
+                errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(d)}`;
+                logger.log('warn', `Return error - ${errorMessage}`);
+                cb(errorMessage, false);
+                return;
+            }
+
+            if(!('workflow_run_id' in d)) {
+                // error
+                errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(d)}`;
+                logger.log('warn', `Return error - ${errorMessage}`);
+                cb(errorMessage, false);
+                return;
+            }
+
+            if(!('status' in d)) {
+                // error
+                errorMessage = `field 'status' does not exist in message body - ${JSON.stringify(d)}`;
+                logger.log('warn', `Return error - ${errorMessage}`);
+                cb(errorMessage, false);
+                return;
+            }
+
+            let workflowRunId = d.workflow_run_id;
+            let status = d.status;
+
+            // there must be a workflow matching
+            // set workflow status
+            let result = eventrouter.setWorkflowRunStatus(workflowRunId, status);
+            results.append(result);
+        }
+
+        cb(null, results);
+        return;
+    }
+
     const getRouter = () => {
         return {
             registerWorkflow: {
@@ -420,9 +494,13 @@
                 topic: serviceEvents.WORKFLOW_REPORT_NEW_RUN,
                 handler: reportNewWorkflowRun
             },
-            reportWorkflowRunState: {
-                topic: serviceEvents.WORKFLOW_REPORT_RUN_STATE,
-                handler: reportWorkflowRunState
+            reportWorkflowRunStatus: {
+                topic: serviceEvents.WORKFLOW_REPORT_RUN_STATUS,
+                handler: reportWorkflowRunStatus
+            },
+            reportWorkflowRunStatusBulk: {
+                topic: serviceEvents.WORKFLOW_REPORT_RUN_STATUS_BULK,
+                handler: reportWorkflowRunStatusBulk
             }
         };
     };
@@ -444,14 +522,14 @@
         return;
     };
 
-    const checkWorkflowState = (workflowId, workflowRunId) => {
+    const checkWorkflowRunStatus = (workflowId, workflowRunId) => {
         const eventrouter = require('./eventrouter.js');
 
         let clients = eventrouter.getWorkflowManagerClients();
         _.forOwn(clients, (client, _clientId) => {
             let socket = client.getSocket();
             if(socket) {
-                socket.emit(serviceEvents.WORKFLOW_CHECK_STATE, {
+                socket.emit(serviceEvents.WORKFLOW_CHECK_STATUS, {
                     workflow_id: workflowId,
                     workflow_run_id: workflowRunId
                 });
@@ -460,10 +538,29 @@
         return;
     };
 
+    const checkWorkflowRunStatusBulk = (requests) => {
+        // input is an array of
+        // {
+        //      workflow_id: <workflowId>,
+        //      workflow_run_id: <workflowRunId>
+        // }
+        const eventrouter = require('./eventrouter.js');
+
+        let clients = eventrouter.getWorkflowManagerClients();
+        _.forOwn(clients, (client, _clientId) => {
+            let socket = client.getSocket();
+            if(socket) {
+                socket.emit(serviceEvents.WORKFLOW_CHECK_STATUS_BULK, requests);
+            }
+        });
+        return;
+    };
+
     module.exports = {
         serviceEvents: serviceEvents,
         getRouter: getRouter,
         kickstartWorkflow: kickstartWorkflow,
-        checkWorkflowState: checkWorkflowState
+        checkWorkflowRunStatus: checkWorkflowRunStatus,
+        checkWorkflowRunStatusBulk: checkWorkflowRunStatusBulk
     };
 })();
diff --git a/src/controllers/ws_workflowrun.js b/src/controllers/ws_workflowrun.js
index 4000f10..0f058cb 100644
--- a/src/controllers/ws_workflowrun.js
+++ b/src/controllers/ws_workflowrun.js
@@ -87,7 +87,6 @@
         }
 
         let result = eventrouter.updateWorkflowRunStatus(
-            message.workflow_id,
             message.workflow_run_id,
             message.task_id,
             message.status.toLowerCase()