Add message routing testcases and related bugfixes
- Handle XOS sensor tasks specially to route events correctly
- Add message counting API for test
- Add req_id optional field to manager request API for client-side req-res mapping
- Fix several bugs related to message routing

Change-Id: Ie18cbc63926b352bd7655797655194ece9506c6b
diff --git a/src/controllers/eventrouter.js b/src/controllers/eventrouter.js
index 82d27db..9a54181 100644
--- a/src/controllers/eventrouter.js
+++ b/src/controllers/eventrouter.js
@@ -58,6 +58,10 @@
     //    io = ioInstance;
     //};
 
+    const checkObject = (obj) => {
+        return Object.prototype.toString.call(obj) === '[object Object]';
+    };
+
     const destroy = () => {
         removeClients();
         clearWorkflowRuns();
@@ -178,12 +182,25 @@
         }
 
         // check if there are workflow runs
-        _.forOwn(workflowRuns, (workflowRun, _workflowRunId) => {
+        for(let key in workflowRuns) {
+            if (!workflowRuns.hasOwnProperty(key)) {
+                continue;
+            }
+
+            let workflowRun = workflowRuns[key];
             if(workflowRun.getWorkflowId() === workflowId) {
                 logger.log('warn', `there exists a workflow run for a workflow id - ${workflowId}`);
                 return false;
             }
-        });
+        }
+
+        // we don't use below code becuase it cannot properly stop and return value with 'return'
+        // _.forOwn(workflowRuns, (workflowRun, _workflowRunId) => {
+        //     if(workflowRun.getWorkflowId() === workflowId) {
+        //         logger.log('warn', `there exists a workflow run for a workflow id - ${workflowId}`);
+        //         return false;
+        //     }
+        // });
 
         delete workflows[workflowId];
         return true;
@@ -207,6 +224,8 @@
         // to check if there are workflow runs for the events
         let workflowIdsRunning = [];
 
+        logger.log('debug', `event is raised : topic ${topic}, message ${JSON.stringify(message)}`);
+
         // route event to running instances
         _.forOwn(workflowRuns, (workflowRun, workflowRunId) => {
             let workflowId = workflowRun.getWorkflowId();
@@ -217,6 +236,7 @@
             //      (already finished tasks are not counted)
             // 2) the task's key field and value
             if(workflowRun.isEventAcceptable(workflow, topic, message)) {
+                //console.log(`event ${topic} is routed to workflow run ${workflowRunId}`);
                 logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
                 workflowRun.enqueueEvent(topic, message);
 
@@ -234,6 +254,8 @@
                 if(!workflowIdsRunning.includes(workflowId)) {
                     // we need to buffer the event until workflow run is brought up
                     let workflowRun = WorkflowRun.WorkflowRun.makeNewRun(workflow);
+                    workflowRun.updateEventKeyFieldValueFromMessage(topic, message);
+
                     let workflowRunId = workflowRun.getId();
 
                     // register for management
@@ -250,6 +272,17 @@
         });
     };
 
+    const countQueuedEvents = (workflowRunId) => {
+        // this counts queued events
+        if(!(workflowRunId in workflowRuns)) {
+            logger.log('warn', `workflow run ${workflowRunId} does not exist`);
+            return null;
+        }
+
+        let workflowRun = workflowRuns[workflowRunId];
+        return workflowRun.lengthEventQueue();
+    };
+
     const fetchEvent = (workflowRunId, taskId, topic) => {
         // this returns an event or an empty obj when there is no message
         if(!(workflowRunId in workflowRuns)) {
@@ -338,10 +371,17 @@
             // manager protocol:
             // REQ:
             //      topic: operation
-            //      message: <data>
+            //      message: {
+            //          req_id: <req_id>,
+            //          <data>...
+            //      }
             // RES:
             //      topic: topic sent
-            //      message: {result: <true/false>, message: <error message> }F
+            //      message: {
+            //          req_id: <req_id>,
+            //          result: <true/false>,
+            //          message: <error message>
+            //      }
             allClients[clientId] = c;
             workflowManagerClients[clientId] = c;
 
@@ -349,11 +389,23 @@
             let router = ws_manager.getRouter();
             _.forOwn(router, (routerElem, _key) => {
                 socket.on(routerElem.topic, (msg) => {
-                    routerElem.handler(routerElem.topic, msg, (err, result) => {
+                    // handle a common parameter - req_id
+                    // when we get req_id, return the same req_id in response.
+                    // this is to help identify a request from a response at client-side
+                    let req_id = 101010; // default number, signiture
+                    if(msg && checkObject(msg)) {
+                        if('req_id' in msg) {
+                            req_id = msg.req_id;
+                        }
+                    }
+
+                    routerElem.handler(routerElem.topic, msg || {}, (err, result) => {
                         if(err) {
                             logger.log('warn', `unable to handle a message - ${err}`);
                             socket.emit(routerElem.topic, {
-                                result: false,
+                                req_id: req_id,
+                                error: true,
+                                result: result,
                                 message: err
                             });
                             return;
@@ -361,6 +413,8 @@
 
                         if(routerElem.return === undefined || routerElem.return) {
                             socket.emit(routerElem.topic, {
+                                req_id: req_id,
+                                error: false,
                                 result: result
                             });
                         }
@@ -492,6 +546,7 @@
         clientType: Client.Type,
         //setIO: setIO,
         sendEvent: sendEvent,
+        countQueuedEvents: countQueuedEvents,
         fetchEvent: fetchEvent,
         addClient: addClient,
         removeClient: removeClient,
diff --git a/src/controllers/ws_manager.js b/src/controllers/ws_manager.js
index b3e8877..5c77ebd 100644
--- a/src/controllers/ws_manager.js
+++ b/src/controllers/ws_manager.js
@@ -37,9 +37,12 @@
     // Message format:
     // {
     //     topic: 'cord.workflow.ctlsvc.workflow.reg',
-    //     message: <workflow>
+    //     message: {
+    //         req_id: <req_id> // optional
+    //         workflow: <workflow>
+    //     }
     // }
-    const registWorkflow = (topic, message, cb) => {
+    const registerWorkflow = (topic, message, cb) => {
         const distributor = require('./eventrouter.js/index.js');
 
         let errorMessage;
@@ -51,7 +54,15 @@
             return;
         }
 
-        let workflow = message;
+        if(!('workflow' in message)) {
+            // error
+            errorMessage = `field 'workflow' does not exist in message body - ${JSON.stringify(message)}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let workflow = message.workflow;
 
         logger.log('debug', `workflow - ${JSON.stringify(workflow)}`);
 
@@ -69,8 +80,11 @@
     // WebSocket interface for workflow registration (via essence)
     // Message format:
     // {
-    //     topic: 'cord.workflow.ctlsvc.workflow.reg',
-    //     message: <workflow essence>
+    //     topic: 'cord.workflow.ctlsvc.workflow.reg_essence',
+    //     message: {
+    //         req_id: <req_id> // optional
+    //         essence: <workflow essence>
+    //     }
     // }
     const registerWorkflowEssence = (topic, message, cb) => {
         const eventrouter = require('./eventrouter.js');
@@ -83,7 +97,15 @@
             return;
         }
 
-        let essence = message;
+        if(!('essence' in message)) {
+            // error
+            errorMessage = `field 'essence' does not exist in message body - ${JSON.stringify(message)}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let essence = message.essence;
         let result = true;
         let errorResults = [];
 
@@ -112,9 +134,11 @@
     // Message format:
     // {
     //     topic: 'cord.workflow.ctlsvc.workflow.list',
-    //     message: null
+    //     message: {
+    //         req_id: <req_id> // optional
+    //     }
     // }
-    const listWorkflows = (topic, message, cb) => {
+    const listWorkflows = (_topic, _message, cb) => {
         const eventrouter = require('./eventrouter.js');
 
         let result = eventrouter.listWorkflows();
@@ -126,9 +150,11 @@
     // Message format:
     // {
     //     topic: 'cord.workflow.ctlsvc.workflow.list',
-    //     message: null
+    //     message: {
+    //         req_id: <req_id> // optional
+    //     }
     // }
-    const listWorkflowRuns = (topic, message, cb) => {
+    const listWorkflowRuns = (_topic, _message, cb) => {
         const eventrouter = require('./eventrouter.js');
 
         let result = eventrouter.listWorkflowRuns();
@@ -140,7 +166,10 @@
     // Message format:
     // {
     //     topic: 'cord.workflow.ctlsvc.workflow.check',
-    //     message: <workflow_id>
+    //     message: {
+    //         req_id: <req_id> // optional
+    //         workflow_id: <workflow_id>
+    //     }
     // }
     const checkWorkflow = (topic, message, cb) => {
         const eventrouter = require('./eventrouter.js');
@@ -154,7 +183,15 @@
             return;
         }
 
-        let workflowId = message;
+        if(!('workflow_id' in message)) {
+            // error
+            errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let workflowId = message.workflow_id;
         let result = eventrouter.checkWorkflow(workflowId);
         cb(null, result);
         return;
@@ -165,8 +202,9 @@
     // {
     //     topic: 'cord.workflow.ctlsvc.workflow.kickstart',
     //     message: {
-    //          workflow_id: <workflow_id>,
-    //          workflow_run_id: <workflow_run_id>
+    //         req_id: <req_id> // optional
+    //         workflow_id: <workflow_id>,
+    //         workflow_run_id: <workflow_run_id>
     //     }
     // }
     const notifyWorkflowStart = (topic, message, cb) => {
@@ -210,12 +248,32 @@
     // Message format:
     // {
     //     topic: 'cord.workflow.ctlsvc.workflow.remove',
-    //     message: <workflow_id>
+    //     message: {
+    //         req_id: <req_id> // optional
+    //         workflow_id: <workflow_id>
+    //     }
     // }
     const removeWorkflow = (topic, message, cb) => {
         const eventrouter = require('./eventrouter.js');
 
-        let workflowId = message;
+        let errorMessage;
+        if(!message) {
+            // error
+            errorMessage = `Message body for topic ${topic} is null or empty`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('workflow_id' in message)) {
+            // error
+            errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let workflowId = message.workflow_id;
         let result = eventrouter.removeWorkflow(workflowId);
         cb(null, result);
         return;
@@ -226,8 +284,9 @@
     // {
     //     topic: 'cord.workflow.ctlsvc.workflow.run.remove',
     //     message: {
-    //          workflow_id: <workflow_id>,
-    //          workflow_run_id: <workflow_run_id>
+    //         req_id: <req_id> // optional
+    //         workflow_id: <workflow_id>,
+    //         workflow_run_id: <workflow_run_id>
     //     }
     // }
     const removeWorkflowRun = (topic, message, cb) => {
@@ -267,9 +326,9 @@
 
     const getRouter = () => {
         return {
-            registWorkflow: {
+            registerWorkflow: {
                 topic: serviceEvents.WORKFLOW_REG,
-                handler: registWorkflow
+                handler: registerWorkflow
             },
             registerWorkflowEssence: {
                 topic: serviceEvents.WORKFLOW_REG_ESSENCE,
diff --git a/src/controllers/ws_workflowrun.js b/src/controllers/ws_workflowrun.js
index f7ee474..036b1ef 100644
--- a/src/controllers/ws_workflowrun.js
+++ b/src/controllers/ws_workflowrun.js
@@ -22,13 +22,14 @@
 
     let serviceEvents = {
         WORKFLOW_RUN_UPDATE_STATUS: 'cord.workflow.ctlsvc.workflow.run.status',
+        WORKFLOW_RUN_COUNT_EVENTS: 'cord.workflow.ctlsvc.workflow.run.count',
         WORKFLOW_RUN_FETCH_EVENT: 'cord.workflow.ctlsvc.workflow.run.fetch'
     };
 
     // WebSocket interface for workflow status update
     // Message format:
     // {
-    //     topic: 'cord.workflow.ctlsvc.workflowrun.status',
+    //     topic: 'cord.workflow.ctlsvc.workflow.run.status',
     //     message: {
     //          workflow_id: <workflow_id>,
     //          workflow_run_id: <workflow_run_id>,
@@ -90,10 +91,52 @@
         return;
     };
 
-    // WebSocket interface for workflow status update
+    // WebSocket interface for counting queued events
     // Message format:
     // {
-    //     topic: 'cord.workflow.ctlsvc.workflowrun.fetch',
+    //     topic: 'cord.workflow.ctlsvc.workflow.run.count',
+    //     message: {
+    //          workflow_id: <workflow_id>,
+    //          workflow_run_id: <workflow_run_id>
+    //     }
+    // }
+    const countQueuedEvents = (topic, message, cb) => {
+        const eventrouter = require('./eventrouter.js');
+
+        let errorMessage;
+        if(!message) {
+            // error
+            errorMessage = `Message body for topic ${topic} is null or empty`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('workflow_id' in message)) {
+            // error
+            errorMessage = `workflow_id field is not in message body - ${message}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('workflow_run_id' in message)) {
+            // error
+            errorMessage = `workflow_run_id field is not in message body - ${message}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let count = eventrouter.countQueuedEvents(message.workflow_run_id);
+        cb(null, count);
+        return;
+    };
+
+    // WebSocket interface for fetching an event
+    // Message format:
+    // {
+    //     topic: 'cord.workflow.ctlsvc.workflow.run.fetch',
     //     message: {
     //          workflow_id: <workflow_id>,
     //          workflow_run_id: <workflow_run_id>,
@@ -169,6 +212,10 @@
                 topic: serviceEvents.WORKFLOW_RUN_UPDATE_STATUS,
                 handler: updateWorkflowRunStatus
             },
+            countQueuedEvents: {
+                topic: serviceEvents.WORKFLOW_RUN_COUNT_EVENTS,
+                handler: countQueuedEvents
+            },
             fetchEvent: {
                 topic: serviceEvents.WORKFLOW_RUN_FETCH_EVENT,
                 handler: fetchEvent
diff --git a/src/types/workflowrun.js b/src/types/workflowrun.js
index 65027ff..4b0b6b0 100644
--- a/src/types/workflowrun.js
+++ b/src/types/workflowrun.js
@@ -79,7 +79,9 @@
                 workflowRun.addRunTask(runTask);
 
                 // set key_field / value
-                workflowRun.setEventKeyFieldValue(task.getTopic(), task.getKeyField(), null); // init
+                if(task.isCORDTask()) {
+                    workflowRun.setEventKeyFieldValue(task.getTopic(), task.getKeyField(), null); // init
+                }
             });
             return workflowRun;
         }
@@ -152,22 +154,21 @@
             return true;
         }
 
-        isEventKeyFieldValueAcceptable(topic, field, value) {
+        updateEventKeyFieldValueFromMessage(topic, message) {
             if(!(topic in this.eventKeyFieldValues)) {
-                // topic does not exist
+                logger.log('warn', `cannot find a topic ${topic} in event key field values`);
                 return false;
             }
 
             let keyFieldValues = this.eventKeyFieldValues[topic];
-            let index = _.findIndex(keyFieldValues, (keyFieldValue) => {
-                return (keyFieldValue.field === field) &&
-                    ((!keyFieldValue.value) || (keyFieldValue.value === value));
+            keyFieldValues.forEach((keyFieldValue) => {
+                if(keyFieldValue.field in message) {
+                    // has same field in the message
+                    // set value
+                    keyFieldValue['value'] = message[keyFieldValue.field];
+                }
             });
-
-            if(index >= 0) {
-                return true;
-            }
-            return false;
+            return true;
         }
 
         isEventAcceptableByKeyFieldValue(topic, message) {
@@ -176,17 +177,43 @@
                 return false;
             }
 
-            let keyFieldValues = this.eventKeyFieldValues[topic];
-            keyFieldValues.forEach((keyFieldValue) => {
-                if(keyFieldValue.field in message) {
-                    // has same field in the message
-                    // check value
-                    if(keyFieldValue.value === message[keyFieldValue.field]) {
-                        // has the same value
-                        return true;
+            // check all key-field values
+            for(let key in this.eventKeyFieldValues) {
+                if (!this.eventKeyFieldValues.hasOwnProperty(key)) {
+                    continue;
+                }
+
+                let keyFieldValues = this.eventKeyFieldValues[key];
+                let arrayLength = keyFieldValues.length;
+                for (var i = 0; i < arrayLength; i++) {
+                    let keyFieldValue = keyFieldValues[i];
+
+                    if(keyFieldValue.field in message) {
+                        // has same field in the message
+                        // check value
+                        if(keyFieldValue.value === message[keyFieldValue.field]) {
+                            // has the same value
+                            return true;
+                        }
                     }
                 }
-            });
+            }
+
+            // We cannot break the loop when we get the result.
+            // because return/break does not work with handler functions
+            // _.forOwn(this.eventKeyFieldValues, (keyFieldValues, _topic) => {
+            //     keyFieldValues.forEach((keyFieldValue) => {
+            //         if(keyFieldValue.field in message) {
+            //             // has same field in the message
+            //             // check value
+            //             if(keyFieldValue.value === message[keyFieldValue.field]) {
+            //                 // has the same value
+            //                 result = true;
+            //             }
+            //         }
+            //     });
+            // });
+
             return false;
         }
 
@@ -295,8 +322,11 @@
             // 2) the task's key field and value
             if(this.isTopicAcceptable(workflow, topic) &&
                 this.isEventAcceptableByKeyFieldValue(topic, message)) {
+                // update key-field values for my topic
+                this.updateEventKeyFieldValueFromMessage(topic, message);
                 return true;
             }
+
             return false;
         }
 
diff --git a/src/types/workflowtask.js b/src/types/workflowtask.js
index efde42b..8e18700 100644
--- a/src/types/workflowtask.js
+++ b/src/types/workflowtask.js
@@ -20,9 +20,15 @@
 
     const logger = require('../config/logger.js');
 
+    const CORD_SENSORS = [
+        'XOSEventSensor', 'XOSModelSensor',
+        'CORDEventSensor', 'CORDModelSensor'
+    ];
+
     class WorkflowTask {
         constructor(id, kickstart=false) {
             this.id = id;
+            this.sensorClass = null;
             this.topic = null;
             this.kickstart = kickstart;
             this.keyField = null;
@@ -40,20 +46,22 @@
                     return null;
                 }
 
+                if('class' in essence) {
+                    workflowTask.setSensorClass(essence.class);
+                }
+
                 if('topic' in essence) {
                     workflowTask.setTopic(essence.topic);
                 }
 
                 if('model_name' in essence) {
-                    workflowTask.setTopic('datamodel.' + essence.model_name + '.create');
-                    workflowTask.setTopic('datamodel.' + essence.model_name + '.update');
-                    workflowTask.setTopic('datamodel.' + essence.model_name + '.delete');
+                    workflowTask.setTopic('datamodel.' + essence.model_name);
                 }
 
                 if('key_field' in essence) {
                     workflowTask.setKeyField(essence.key_field);
                 }
-    
+
                 workflowTask.setEssence(essence);
                 return workflowTask;
             }
@@ -70,6 +78,23 @@
             return this.id;
         }
 
+        setSensorClass(sensorClass) {
+            this.sensorClass = sensorClass;
+        }
+
+        getSensorClass() {
+            return this.sensorClass;
+        }
+
+        isCORDTask() {
+            if(this.sensorClass) {
+                if(CORD_SENSORS.includes(this.sensorClass)) {
+                    return true;
+                }
+            }
+            return false;
+        }
+
         setTopic(topic) {
             this.topic = topic;
         }
@@ -89,7 +114,7 @@
         setKeyField(keyField) {
             this.keyField = keyField;
         }
-        
+
         getKeyField() {
             return this.keyField;
         }
@@ -109,21 +134,21 @@
             }
 
             // general Airflow operators other than XOS operators don't have these fields.
-            // 
+            //
             // if(!this.topic) {
             //     logger.log('error', 'topic is not given');
             //     return false;
             // }
-    
+
             // if(!this.keyField) {
             //     logger.log('error', 'keyField is not given');
             //     return false;
             // }
-    
+
             return true;
         }
     }
-    
+
     module.exports = {
         WorkflowTask: WorkflowTask
     };