Implement bulk status update of workflow runs

Change-Id: I6b67048f502d42a2572a936944c9e54300076478
diff --git a/src/controllers/ws_manager.js b/src/controllers/ws_manager.js
index 752ee88..4ce95c5 100644
--- a/src/controllers/ws_manager.js
+++ b/src/controllers/ws_manager.js
@@ -32,10 +32,12 @@
         WORKFLOW_REMOVE: 'cord.workflow.ctlsvc.workflow.remove',
         WORKFLOW_REMOVE_RUN: 'cord.workflow.ctlsvc.workflow.run.remove',
         WORKFLOW_REPORT_NEW_RUN: 'cord.workflow.ctlsvc.workflow.report_new_run',
-        WORKFLOW_REPORT_RUN_STATE: 'cord.workflow.ctlsvc.workflow.report_run_state',
+        WORKFLOW_REPORT_RUN_STATUS: 'cord.workflow.ctlsvc.workflow.report_run_status',
+        WORKFLOW_REPORT_RUN_STATUS_BULK: 'cord.workflow.ctlsvc.workflow.report_run_status_bulk',
         // controller -> manager
         WORKFLOW_KICKSTART: 'cord.workflow.ctlsvc.workflow.kickstart',
-        WORKFLOW_CHECK_STATE: 'cord.workflow.ctlsvc.workflow.check.state'
+        WORKFLOW_CHECK_STATUS: 'cord.workflow.ctlsvc.workflow.check.status',
+        WORKFLOW_CHECK_STATUS_BULK: 'cord.workflow.ctlsvc.workflow.check.status_bulk',
     };
 
     // WebSocket interface for workflow registration
@@ -329,18 +331,18 @@
         return;
     };
 
-    // WebSocket interface for reporting workflow run state
+    // WebSocket interface for reporting workflow run status
     // Message format:
     // {
-    //     topic: 'cord.workflow.ctlsvc.workflow.report_run_state',
+    //     topic: 'cord.workflow.ctlsvc.workflow.report_run_status',
     //     message: {
     //         req_id: <req_id> // optional
     //         workflow_id: <workflow_id>,
     //         workflow_run_id: <workflow_run_id>,
-    //         state: one of ['success', 'running', 'failed', 'unknown']
+    //         status: one of ['success', 'running', 'failed', 'unknown']
     //     }
     // }
-    const reportWorkflowRunState = (topic, message, cb) => {
+    const reportWorkflowRunStatus = (topic, message, cb) => {
         const eventrouter = require('./eventrouter.js');
 
         let errorMessage;
@@ -368,24 +370,96 @@
             return;
         }
 
-        if(!('state' in message)) {
+        if(!('status' in message)) {
             // error
-            errorMessage = `field 'state' does not exist in message body - ${JSON.stringify(message)}`;
+            errorMessage = `field 'status' does not exist in message body - ${JSON.stringify(message)}`;
             logger.log('warn', `Return error - ${errorMessage}`);
             cb(errorMessage, false);
             return;
         }
 
         let workflowRunId = message.workflow_run_id;
-        let state = message.state;
+        let status = message.status;
 
         // there must be a workflow matching
-        // set workflow state
-        let result = eventrouter.setWorkflowRunState(workflowRunId, state);
+        // set workflow status
+        let result = eventrouter.setWorkflowRunStatus(workflowRunId, status);
         cb(null, result);
         return;
     }
 
+    // WebSocket interface for reporting workflow run status bulk
+    // Message format:
+    // {
+    //     topic: 'cord.workflow.ctlsvc.workflow.report_run_status_bulk',
+    //     message: {
+    //         req_id: <req_id> // optional
+    //         data: [{
+    //             workflow_id: <workflow_id>,
+    //             workflow_run_id: <workflow_run_id>,
+    //             status: one of ['success', 'running', 'failed', 'unknown']
+    //         }, ...]
+    //     }
+    // }
+    const reportWorkflowRunStatusBulk = (topic, message, cb) => {
+        const eventrouter = require('./eventrouter.js');
+
+        let errorMessage;
+        if(!message) {
+            // error
+            errorMessage = `Message body for topic ${topic} is null or empty`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('data' in message)) {
+            // error
+            errorMessage = `field 'data' does not exist in message body - ${JSON.stringify(message)}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let results = [];
+        for(let d in message.data) {
+            if(!('workflow_id' in d)) {
+                // error
+                errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(d)}`;
+                logger.log('warn', `Return error - ${errorMessage}`);
+                cb(errorMessage, false);
+                return;
+            }
+
+            if(!('workflow_run_id' in d)) {
+                // error
+                errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(d)}`;
+                logger.log('warn', `Return error - ${errorMessage}`);
+                cb(errorMessage, false);
+                return;
+            }
+
+            if(!('status' in d)) {
+                // error
+                errorMessage = `field 'status' does not exist in message body - ${JSON.stringify(d)}`;
+                logger.log('warn', `Return error - ${errorMessage}`);
+                cb(errorMessage, false);
+                return;
+            }
+
+            let workflowRunId = d.workflow_run_id;
+            let status = d.status;
+
+            // there must be a workflow matching
+            // set workflow status
+            let result = eventrouter.setWorkflowRunStatus(workflowRunId, status);
+            results.append(result);
+        }
+
+        cb(null, results);
+        return;
+    }
+
     const getRouter = () => {
         return {
             registerWorkflow: {
@@ -420,9 +494,13 @@
                 topic: serviceEvents.WORKFLOW_REPORT_NEW_RUN,
                 handler: reportNewWorkflowRun
             },
-            reportWorkflowRunState: {
-                topic: serviceEvents.WORKFLOW_REPORT_RUN_STATE,
-                handler: reportWorkflowRunState
+            reportWorkflowRunStatus: {
+                topic: serviceEvents.WORKFLOW_REPORT_RUN_STATUS,
+                handler: reportWorkflowRunStatus
+            },
+            reportWorkflowRunStatusBulk: {
+                topic: serviceEvents.WORKFLOW_REPORT_RUN_STATUS_BULK,
+                handler: reportWorkflowRunStatusBulk
             }
         };
     };
@@ -444,14 +522,14 @@
         return;
     };
 
-    const checkWorkflowState = (workflowId, workflowRunId) => {
+    const checkWorkflowRunStatus = (workflowId, workflowRunId) => {
         const eventrouter = require('./eventrouter.js');
 
         let clients = eventrouter.getWorkflowManagerClients();
         _.forOwn(clients, (client, _clientId) => {
             let socket = client.getSocket();
             if(socket) {
-                socket.emit(serviceEvents.WORKFLOW_CHECK_STATE, {
+                socket.emit(serviceEvents.WORKFLOW_CHECK_STATUS, {
                     workflow_id: workflowId,
                     workflow_run_id: workflowRunId
                 });
@@ -460,10 +538,29 @@
         return;
     };
 
+    const checkWorkflowRunStatusBulk = (requests) => {
+        // input is an array of
+        // {
+        //      workflow_id: <workflowId>,
+        //      workflow_run_id: <workflowRunId>
+        // }
+        const eventrouter = require('./eventrouter.js');
+
+        let clients = eventrouter.getWorkflowManagerClients();
+        _.forOwn(clients, (client, _clientId) => {
+            let socket = client.getSocket();
+            if(socket) {
+                socket.emit(serviceEvents.WORKFLOW_CHECK_STATUS_BULK, requests);
+            }
+        });
+        return;
+    };
+
     module.exports = {
         serviceEvents: serviceEvents,
         getRouter: getRouter,
         kickstartWorkflow: kickstartWorkflow,
-        checkWorkflowState: checkWorkflowState
+        checkWorkflowRunStatus: checkWorkflowRunStatus,
+        checkWorkflowRunStatusBulk: checkWorkflowRunStatusBulk
     };
 })();