Add message routing testcases and related bugfixes
- Handle XOS sensor tasks specially to route events correctly
- Add message counting API for test
- Add req_id optional field to manager request API for client-side req-res mapping
- Fix several bugs related to message routing

Change-Id: Ie18cbc63926b352bd7655797655194ece9506c6b
diff --git a/package.json b/package.json
index 614d00b..2604522 100644
--- a/package.json
+++ b/package.json
@@ -1,6 +1,6 @@
 {
     "name": "cord_workflow_controller",
-    "version": "0.1.0",
+    "version": "0.1.1",
     "description": "CORD Workflow Controller",
     "main": "src/server.js",
     "scripts": {
diff --git a/spec/clients.spec.js b/spec/clients.spec.js
index 534797d..9996944 100644
--- a/spec/clients.spec.js
+++ b/spec/clients.spec.js
@@ -41,6 +41,8 @@
         var workflowId;
         var workflowRunId;
 
+        this.slow(5000);
+
         before(function() {
             // Start our server
             server.start(port);
@@ -99,7 +101,9 @@
 
                         workflowCnt++;
 
-                        workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_CHECK, essenceWorkflowId);
+                        workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_CHECK, {
+                            workflow_id: essenceWorkflowId
+                        });
 
                         workflowManagerClient.on(eventrouter.serviceEvents.WORKFLOW_CHECK, (workflowCheckResult) => {
                             workflowCnt--;
@@ -124,7 +128,9 @@
                     if(register) {
                         let essence = essenceLoader.loadEssence(essenceFileName, true);
 
-                        workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_REG_ESSENCE, essence);
+                        workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_REG_ESSENCE, {
+                            essence: essence
+                        });
 
                         workflowManagerClient.on(
                             eventrouter.serviceEvents.WORKFLOW_REG_ESSENCE,
@@ -144,7 +150,7 @@
                     setTimeout(() => {
                         expect(workflowRunId).to.not.be.undefined;
                         callback(null, true);
-                    }, 500);
+                    }, 1000);
                     return;
                 },
                 (callback) => {
@@ -182,7 +188,9 @@
             });
 
             // remove workflow
-            workflowManagerClient.emit(server.serviceEvents.WORKFLOW_REMOVE, workflowId);
+            workflowManagerClient.emit(server.serviceEvents.WORKFLOW_REMOVE, {
+                workflow_id: workflowId
+            });
 
             workflowId = null;
             workflowRunId = null;
diff --git a/spec/eventrouter.spec.js b/spec/eventrouter.spec.js
index 6184f58..df594fb 100644
--- a/spec/eventrouter.spec.js
+++ b/spec/eventrouter.spec.js
@@ -41,6 +41,7 @@
     var receivedKickstartMessages = [[],[]];
 
     describe('Workflow kickstart test', function() {
+        this.slow(5000);
 
         before(function() {
             // Start our server
@@ -90,7 +91,7 @@
                                 workflow_id: message.workflow_id,
                                 workflow_run_id: message.workflow_run_id
                             })
-                        }, 2000);
+                        }, 1000);
                     });
 
                     workflowManagerClient.on('connect', () => {
@@ -123,7 +124,9 @@
                     let essence = essenceLoader.loadEssence(essenceFileName, true);
 
                     // register the workflow
-                    workflowManagerClients[0].emit(eventrouter.serviceEvents.WORKFLOW_REG_ESSENCE, essence);
+                    workflowManagerClients[0].emit(eventrouter.serviceEvents.WORKFLOW_REG_ESSENCE, {
+                        essence: essence
+                    });
 
                     _.forOwn(essence, (_value, workflowId) => {
                         // save
@@ -164,7 +167,9 @@
 
             // remove workflows
             _.forOwn(workflowIds, (workflowId) => {
-                workflowManagerClients[0].emit(server.serviceEvents.WORKFLOW_REMOVE, workflowId);
+                workflowManagerClients[0].emit(server.serviceEvents.WORKFLOW_REMOVE, {
+                    workflow_id: workflowId
+                });
             });
             workflowIds.length = 0;
 
@@ -216,7 +221,7 @@
                     expect(receivedKickstartMessageStore.length, 'num of messages in a store').to.equal(1);
                 });
                 done();
-            }, 500);
+            }, 1000);
         });
 
         it('should have only one workflow run', function(done) {
@@ -230,10 +235,10 @@
                 // the workflow must be 'should_be_called'
                 expect(workflowRunInfos[0].workflowId, 'workflow id').to.equal('should_be_called');
                 done();
-            }, 3000);
+            }, 2000);
         });
 
-        it('should be able to read an event that is used for workflow kickstart', function(done) {
+        it('should read an event that is used for workflow kickstart', function(done) {
             this.timeout(5000);
 
             // kickstart the workflow
@@ -268,86 +273,84 @@
                     expect(event.message.serialNumber).to.equal('testSerialXXX');
                     done();
                 });
-            }, 3000);
+            }, 2000);
         });
 
-        /*
-        it('should store user details for a new connection', () => {
-            const eventrouter = require('../src/controllers/eventrouter.js');
+        it('should map a workflow run using key-field', function(done) {
+            this.timeout(5000);
 
-            const probe = eventrouter.getClients()['probe_id'];
-            expect(probe.getParams().name).to.equal('probe@xos.org');
-
-            const manager = eventrouter.getClients()['workflow_manager_id'];
-            expect(manager.getParams().name).to.equal('manager@xos.org');
-
-            const run = eventrouter.getClients()['workflow_run_id'];
-            expect(run.getParams().name).to.equal('run@xos.org');
-        });
-
-        it('should not store the same user twice', (done) => {
-            // This test case makes cleaning up process taking long time because it leaves
-            // a client socket. It seems there's no way to release it from server-side.
-
-            // connect a client to the server
-            const client2 = io.connect(`http://localhost:${port}`, {
-                query: 'id=probe_id&type=probe' +
-                        '&name=probe@xos.org&value=different_value'
-            });
-
-            // when is connected start testing
-            client2.on('connect', () => {
-                setTimeout(() => {
-                    const eventrouter = require('../src/controllers/eventrouter.js');
-                    expect(
-                        Object.keys(eventrouter.getWorkflowRunClients()).length,
-                        'num of workflow run clients'
-                    ).to.equal(1);
-                    expect(
-                        Object.keys(eventrouter.getWorkflowManagerClients()).length,
-                        'num of workflow manager clients'
-                    ).to.equal(1);
-                    expect(
-                        Object.keys(eventrouter.getProbeClients()).length,
-                        'num of probe clients'
-                    ).to.equal(1);
-                    expect(
-                        Object.keys(eventrouter.getClients()).length,
-                        'total num of clients'
-                    ).to.equal(3);
-
-                    done();
-                }, 100);
-            });
-        });
-
-        it('should remove a user on disconnect', (done) => {
-            workflowManagerClient.disconnect();
-            workflowRunClient.disconnect();
-            probeClient.disconnect();
-
-            // we need to wait for the event to be dispatched
+            // kickstart the workflow
+            probeClient.emit(
+                'onu.events',
+                {serialNumber: 'testSerialXXX', other: 'test_other_field'}
+            );
+            probeClient.emit(
+                'datamodel.AttWorkflowDriverServiceInstance',
+                {operation: 'update', serialNumber: 'testSerialXXX', other: 'updated_test_other_field'}
+            );
             setTimeout(() => {
-                const eventrouter = require('../src/controllers/eventrouter.js');
-                expect(
-                    Object.keys(eventrouter.getWorkflowRunClients()).length,
-                    'num of workflow run clients'
-                ).to.equal(0);
-                expect(
-                    Object.keys(eventrouter.getWorkflowManagerClients()).length,
-                    'num of workflow manager clients'
-                ).to.equal(0);
-                expect(
-                    Object.keys(eventrouter.getProbeClients()).length,
-                    'num of probe clients'
-                ).to.equal(0);
-                expect(
-                    Object.keys(eventrouter.getClients()).length,
-                    'total num of clients'
-                ).to.equal(0);
-                done();
-            }, 100);
+                // kickstart will take 2 seconds roughly
+                expect(workflowRunInfos.length, 'num of workflow runs').to.equal(1);
+                // the workflow must be 'should_be_called'
+                expect(workflowRunInfos[0].workflowId, 'workflow id').to.equal('should_be_called');
+
+                // connect a workflow run client to the server
+                let workflowRunClient = io.connect(`http://localhost:${port}`, {
+                    query: 'id=workflow_run_id1&type=workflow_run' +
+                            `&workflow_id=${workflowRunInfos[0].workflowId}` +
+                            `&workflow_run_id=${workflowRunInfos[0].workflowRunId}` +
+                            '&name=run1@xos.org'
+                });
+                workflowRunClients.push(workflowRunClient);
+
+                workflowRunClient.on('connect', () => {
+                    // check message counts
+                    workflowRunClient.emit(eventrouter.serviceEvents.WORKFLOW_RUN_COUNT_EVENTS, {
+                        workflow_id: workflowRunInfos[0].workflowId,
+                        workflow_run_id: workflowRunInfos[0].workflowRunId
+                    });
+                });
+
+                let eventRaised = 0;
+                workflowRunClient.on(eventrouter.serviceEvents.WORKFLOW_RUN_COUNT_EVENTS, (result) => {
+                    let count = result.result;
+                    expect(count, 'number of events queued').to.equal(2);
+
+                    // fetch two events
+                    workflowRunClient.emit(eventrouter.serviceEvents.WORKFLOW_RUN_FETCH_EVENT, {
+                        workflow_id: workflowRunInfos[0].workflowId,
+                        workflow_run_id: workflowRunInfos[0].workflowRunId,
+                        task_id: 'onu_event_handler',
+                        topic: 'onu.events'
+                    });
+                    eventRaised++;
+
+                    workflowRunClient.emit(eventrouter.serviceEvents.WORKFLOW_RUN_FETCH_EVENT, {
+                        workflow_id: workflowRunInfos[0].workflowId,
+                        workflow_run_id: workflowRunInfos[0].workflowRunId,
+                        task_id: 'onu_model_event_handler',
+                        topic: 'datamodel.AttWorkflowDriverServiceInstance'
+                    });
+                    eventRaised++;
+                });
+
+                workflowRunClient.on(eventrouter.serviceEvents.WORKFLOW_RUN_FETCH_EVENT, (result) => {
+                    let event = result.result;
+                    if(eventRaised === 2) {
+                        expect(event.topic).to.equal('onu.events');
+                        expect(event.message.serialNumber).to.equal('testSerialXXX');
+                    }
+                    else if(eventRaised === 1) {
+                        expect(event.topic).to.equal('datamodel.AttWorkflowDriverServiceInstance');
+                        expect(event.message.serialNumber).to.equal('testSerialXXX');
+                    }
+                    eventRaised--;
+
+                    if(eventRaised === 0) {
+                        done();
+                    }
+                });
+            }, 2000);
         });
-        */
     });
 })();
\ No newline at end of file
diff --git a/src/controllers/eventrouter.js b/src/controllers/eventrouter.js
index 82d27db..9a54181 100644
--- a/src/controllers/eventrouter.js
+++ b/src/controllers/eventrouter.js
@@ -58,6 +58,10 @@
     //    io = ioInstance;
     //};
 
+    const checkObject = (obj) => {
+        return Object.prototype.toString.call(obj) === '[object Object]';
+    };
+
     const destroy = () => {
         removeClients();
         clearWorkflowRuns();
@@ -178,12 +182,25 @@
         }
 
         // check if there are workflow runs
-        _.forOwn(workflowRuns, (workflowRun, _workflowRunId) => {
+        for(let key in workflowRuns) {
+            if (!workflowRuns.hasOwnProperty(key)) {
+                continue;
+            }
+
+            let workflowRun = workflowRuns[key];
             if(workflowRun.getWorkflowId() === workflowId) {
                 logger.log('warn', `there exists a workflow run for a workflow id - ${workflowId}`);
                 return false;
             }
-        });
+        }
+
+        // we don't use below code becuase it cannot properly stop and return value with 'return'
+        // _.forOwn(workflowRuns, (workflowRun, _workflowRunId) => {
+        //     if(workflowRun.getWorkflowId() === workflowId) {
+        //         logger.log('warn', `there exists a workflow run for a workflow id - ${workflowId}`);
+        //         return false;
+        //     }
+        // });
 
         delete workflows[workflowId];
         return true;
@@ -207,6 +224,8 @@
         // to check if there are workflow runs for the events
         let workflowIdsRunning = [];
 
+        logger.log('debug', `event is raised : topic ${topic}, message ${JSON.stringify(message)}`);
+
         // route event to running instances
         _.forOwn(workflowRuns, (workflowRun, workflowRunId) => {
             let workflowId = workflowRun.getWorkflowId();
@@ -217,6 +236,7 @@
             //      (already finished tasks are not counted)
             // 2) the task's key field and value
             if(workflowRun.isEventAcceptable(workflow, topic, message)) {
+                //console.log(`event ${topic} is routed to workflow run ${workflowRunId}`);
                 logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
                 workflowRun.enqueueEvent(topic, message);
 
@@ -234,6 +254,8 @@
                 if(!workflowIdsRunning.includes(workflowId)) {
                     // we need to buffer the event until workflow run is brought up
                     let workflowRun = WorkflowRun.WorkflowRun.makeNewRun(workflow);
+                    workflowRun.updateEventKeyFieldValueFromMessage(topic, message);
+
                     let workflowRunId = workflowRun.getId();
 
                     // register for management
@@ -250,6 +272,17 @@
         });
     };
 
+    const countQueuedEvents = (workflowRunId) => {
+        // this counts queued events
+        if(!(workflowRunId in workflowRuns)) {
+            logger.log('warn', `workflow run ${workflowRunId} does not exist`);
+            return null;
+        }
+
+        let workflowRun = workflowRuns[workflowRunId];
+        return workflowRun.lengthEventQueue();
+    };
+
     const fetchEvent = (workflowRunId, taskId, topic) => {
         // this returns an event or an empty obj when there is no message
         if(!(workflowRunId in workflowRuns)) {
@@ -338,10 +371,17 @@
             // manager protocol:
             // REQ:
             //      topic: operation
-            //      message: <data>
+            //      message: {
+            //          req_id: <req_id>,
+            //          <data>...
+            //      }
             // RES:
             //      topic: topic sent
-            //      message: {result: <true/false>, message: <error message> }F
+            //      message: {
+            //          req_id: <req_id>,
+            //          result: <true/false>,
+            //          message: <error message>
+            //      }
             allClients[clientId] = c;
             workflowManagerClients[clientId] = c;
 
@@ -349,11 +389,23 @@
             let router = ws_manager.getRouter();
             _.forOwn(router, (routerElem, _key) => {
                 socket.on(routerElem.topic, (msg) => {
-                    routerElem.handler(routerElem.topic, msg, (err, result) => {
+                    // handle a common parameter - req_id
+                    // when we get req_id, return the same req_id in response.
+                    // this is to help identify a request from a response at client-side
+                    let req_id = 101010; // default number, signiture
+                    if(msg && checkObject(msg)) {
+                        if('req_id' in msg) {
+                            req_id = msg.req_id;
+                        }
+                    }
+
+                    routerElem.handler(routerElem.topic, msg || {}, (err, result) => {
                         if(err) {
                             logger.log('warn', `unable to handle a message - ${err}`);
                             socket.emit(routerElem.topic, {
-                                result: false,
+                                req_id: req_id,
+                                error: true,
+                                result: result,
                                 message: err
                             });
                             return;
@@ -361,6 +413,8 @@
 
                         if(routerElem.return === undefined || routerElem.return) {
                             socket.emit(routerElem.topic, {
+                                req_id: req_id,
+                                error: false,
                                 result: result
                             });
                         }
@@ -492,6 +546,7 @@
         clientType: Client.Type,
         //setIO: setIO,
         sendEvent: sendEvent,
+        countQueuedEvents: countQueuedEvents,
         fetchEvent: fetchEvent,
         addClient: addClient,
         removeClient: removeClient,
diff --git a/src/controllers/ws_manager.js b/src/controllers/ws_manager.js
index b3e8877..5c77ebd 100644
--- a/src/controllers/ws_manager.js
+++ b/src/controllers/ws_manager.js
@@ -37,9 +37,12 @@
     // Message format:
     // {
     //     topic: 'cord.workflow.ctlsvc.workflow.reg',
-    //     message: <workflow>
+    //     message: {
+    //         req_id: <req_id> // optional
+    //         workflow: <workflow>
+    //     }
     // }
-    const registWorkflow = (topic, message, cb) => {
+    const registerWorkflow = (topic, message, cb) => {
         const distributor = require('./eventrouter.js/index.js');
 
         let errorMessage;
@@ -51,7 +54,15 @@
             return;
         }
 
-        let workflow = message;
+        if(!('workflow' in message)) {
+            // error
+            errorMessage = `field 'workflow' does not exist in message body - ${JSON.stringify(message)}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let workflow = message.workflow;
 
         logger.log('debug', `workflow - ${JSON.stringify(workflow)}`);
 
@@ -69,8 +80,11 @@
     // WebSocket interface for workflow registration (via essence)
     // Message format:
     // {
-    //     topic: 'cord.workflow.ctlsvc.workflow.reg',
-    //     message: <workflow essence>
+    //     topic: 'cord.workflow.ctlsvc.workflow.reg_essence',
+    //     message: {
+    //         req_id: <req_id> // optional
+    //         essence: <workflow essence>
+    //     }
     // }
     const registerWorkflowEssence = (topic, message, cb) => {
         const eventrouter = require('./eventrouter.js');
@@ -83,7 +97,15 @@
             return;
         }
 
-        let essence = message;
+        if(!('essence' in message)) {
+            // error
+            errorMessage = `field 'essence' does not exist in message body - ${JSON.stringify(message)}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let essence = message.essence;
         let result = true;
         let errorResults = [];
 
@@ -112,9 +134,11 @@
     // Message format:
     // {
     //     topic: 'cord.workflow.ctlsvc.workflow.list',
-    //     message: null
+    //     message: {
+    //         req_id: <req_id> // optional
+    //     }
     // }
-    const listWorkflows = (topic, message, cb) => {
+    const listWorkflows = (_topic, _message, cb) => {
         const eventrouter = require('./eventrouter.js');
 
         let result = eventrouter.listWorkflows();
@@ -126,9 +150,11 @@
     // Message format:
     // {
     //     topic: 'cord.workflow.ctlsvc.workflow.list',
-    //     message: null
+    //     message: {
+    //         req_id: <req_id> // optional
+    //     }
     // }
-    const listWorkflowRuns = (topic, message, cb) => {
+    const listWorkflowRuns = (_topic, _message, cb) => {
         const eventrouter = require('./eventrouter.js');
 
         let result = eventrouter.listWorkflowRuns();
@@ -140,7 +166,10 @@
     // Message format:
     // {
     //     topic: 'cord.workflow.ctlsvc.workflow.check',
-    //     message: <workflow_id>
+    //     message: {
+    //         req_id: <req_id> // optional
+    //         workflow_id: <workflow_id>
+    //     }
     // }
     const checkWorkflow = (topic, message, cb) => {
         const eventrouter = require('./eventrouter.js');
@@ -154,7 +183,15 @@
             return;
         }
 
-        let workflowId = message;
+        if(!('workflow_id' in message)) {
+            // error
+            errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let workflowId = message.workflow_id;
         let result = eventrouter.checkWorkflow(workflowId);
         cb(null, result);
         return;
@@ -165,8 +202,9 @@
     // {
     //     topic: 'cord.workflow.ctlsvc.workflow.kickstart',
     //     message: {
-    //          workflow_id: <workflow_id>,
-    //          workflow_run_id: <workflow_run_id>
+    //         req_id: <req_id> // optional
+    //         workflow_id: <workflow_id>,
+    //         workflow_run_id: <workflow_run_id>
     //     }
     // }
     const notifyWorkflowStart = (topic, message, cb) => {
@@ -210,12 +248,32 @@
     // Message format:
     // {
     //     topic: 'cord.workflow.ctlsvc.workflow.remove',
-    //     message: <workflow_id>
+    //     message: {
+    //         req_id: <req_id> // optional
+    //         workflow_id: <workflow_id>
+    //     }
     // }
     const removeWorkflow = (topic, message, cb) => {
         const eventrouter = require('./eventrouter.js');
 
-        let workflowId = message;
+        let errorMessage;
+        if(!message) {
+            // error
+            errorMessage = `Message body for topic ${topic} is null or empty`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('workflow_id' in message)) {
+            // error
+            errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let workflowId = message.workflow_id;
         let result = eventrouter.removeWorkflow(workflowId);
         cb(null, result);
         return;
@@ -226,8 +284,9 @@
     // {
     //     topic: 'cord.workflow.ctlsvc.workflow.run.remove',
     //     message: {
-    //          workflow_id: <workflow_id>,
-    //          workflow_run_id: <workflow_run_id>
+    //         req_id: <req_id> // optional
+    //         workflow_id: <workflow_id>,
+    //         workflow_run_id: <workflow_run_id>
     //     }
     // }
     const removeWorkflowRun = (topic, message, cb) => {
@@ -267,9 +326,9 @@
 
     const getRouter = () => {
         return {
-            registWorkflow: {
+            registerWorkflow: {
                 topic: serviceEvents.WORKFLOW_REG,
-                handler: registWorkflow
+                handler: registerWorkflow
             },
             registerWorkflowEssence: {
                 topic: serviceEvents.WORKFLOW_REG_ESSENCE,
diff --git a/src/controllers/ws_workflowrun.js b/src/controllers/ws_workflowrun.js
index f7ee474..036b1ef 100644
--- a/src/controllers/ws_workflowrun.js
+++ b/src/controllers/ws_workflowrun.js
@@ -22,13 +22,14 @@
 
     let serviceEvents = {
         WORKFLOW_RUN_UPDATE_STATUS: 'cord.workflow.ctlsvc.workflow.run.status',
+        WORKFLOW_RUN_COUNT_EVENTS: 'cord.workflow.ctlsvc.workflow.run.count',
         WORKFLOW_RUN_FETCH_EVENT: 'cord.workflow.ctlsvc.workflow.run.fetch'
     };
 
     // WebSocket interface for workflow status update
     // Message format:
     // {
-    //     topic: 'cord.workflow.ctlsvc.workflowrun.status',
+    //     topic: 'cord.workflow.ctlsvc.workflow.run.status',
     //     message: {
     //          workflow_id: <workflow_id>,
     //          workflow_run_id: <workflow_run_id>,
@@ -90,10 +91,52 @@
         return;
     };
 
-    // WebSocket interface for workflow status update
+    // WebSocket interface for counting queued events
     // Message format:
     // {
-    //     topic: 'cord.workflow.ctlsvc.workflowrun.fetch',
+    //     topic: 'cord.workflow.ctlsvc.workflow.run.count',
+    //     message: {
+    //          workflow_id: <workflow_id>,
+    //          workflow_run_id: <workflow_run_id>
+    //     }
+    // }
+    const countQueuedEvents = (topic, message, cb) => {
+        const eventrouter = require('./eventrouter.js');
+
+        let errorMessage;
+        if(!message) {
+            // error
+            errorMessage = `Message body for topic ${topic} is null or empty`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('workflow_id' in message)) {
+            // error
+            errorMessage = `workflow_id field is not in message body - ${message}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('workflow_run_id' in message)) {
+            // error
+            errorMessage = `workflow_run_id field is not in message body - ${message}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let count = eventrouter.countQueuedEvents(message.workflow_run_id);
+        cb(null, count);
+        return;
+    };
+
+    // WebSocket interface for fetching an event
+    // Message format:
+    // {
+    //     topic: 'cord.workflow.ctlsvc.workflow.run.fetch',
     //     message: {
     //          workflow_id: <workflow_id>,
     //          workflow_run_id: <workflow_run_id>,
@@ -169,6 +212,10 @@
                 topic: serviceEvents.WORKFLOW_RUN_UPDATE_STATUS,
                 handler: updateWorkflowRunStatus
             },
+            countQueuedEvents: {
+                topic: serviceEvents.WORKFLOW_RUN_COUNT_EVENTS,
+                handler: countQueuedEvents
+            },
             fetchEvent: {
                 topic: serviceEvents.WORKFLOW_RUN_FETCH_EVENT,
                 handler: fetchEvent
diff --git a/src/types/workflowrun.js b/src/types/workflowrun.js
index 65027ff..4b0b6b0 100644
--- a/src/types/workflowrun.js
+++ b/src/types/workflowrun.js
@@ -79,7 +79,9 @@
                 workflowRun.addRunTask(runTask);
 
                 // set key_field / value
-                workflowRun.setEventKeyFieldValue(task.getTopic(), task.getKeyField(), null); // init
+                if(task.isCORDTask()) {
+                    workflowRun.setEventKeyFieldValue(task.getTopic(), task.getKeyField(), null); // init
+                }
             });
             return workflowRun;
         }
@@ -152,22 +154,21 @@
             return true;
         }
 
-        isEventKeyFieldValueAcceptable(topic, field, value) {
+        updateEventKeyFieldValueFromMessage(topic, message) {
             if(!(topic in this.eventKeyFieldValues)) {
-                // topic does not exist
+                logger.log('warn', `cannot find a topic ${topic} in event key field values`);
                 return false;
             }
 
             let keyFieldValues = this.eventKeyFieldValues[topic];
-            let index = _.findIndex(keyFieldValues, (keyFieldValue) => {
-                return (keyFieldValue.field === field) &&
-                    ((!keyFieldValue.value) || (keyFieldValue.value === value));
+            keyFieldValues.forEach((keyFieldValue) => {
+                if(keyFieldValue.field in message) {
+                    // has same field in the message
+                    // set value
+                    keyFieldValue['value'] = message[keyFieldValue.field];
+                }
             });
-
-            if(index >= 0) {
-                return true;
-            }
-            return false;
+            return true;
         }
 
         isEventAcceptableByKeyFieldValue(topic, message) {
@@ -176,17 +177,43 @@
                 return false;
             }
 
-            let keyFieldValues = this.eventKeyFieldValues[topic];
-            keyFieldValues.forEach((keyFieldValue) => {
-                if(keyFieldValue.field in message) {
-                    // has same field in the message
-                    // check value
-                    if(keyFieldValue.value === message[keyFieldValue.field]) {
-                        // has the same value
-                        return true;
+            // check all key-field values
+            for(let key in this.eventKeyFieldValues) {
+                if (!this.eventKeyFieldValues.hasOwnProperty(key)) {
+                    continue;
+                }
+
+                let keyFieldValues = this.eventKeyFieldValues[key];
+                let arrayLength = keyFieldValues.length;
+                for (var i = 0; i < arrayLength; i++) {
+                    let keyFieldValue = keyFieldValues[i];
+
+                    if(keyFieldValue.field in message) {
+                        // has same field in the message
+                        // check value
+                        if(keyFieldValue.value === message[keyFieldValue.field]) {
+                            // has the same value
+                            return true;
+                        }
                     }
                 }
-            });
+            }
+
+            // We cannot break the loop when we get the result.
+            // because return/break does not work with handler functions
+            // _.forOwn(this.eventKeyFieldValues, (keyFieldValues, _topic) => {
+            //     keyFieldValues.forEach((keyFieldValue) => {
+            //         if(keyFieldValue.field in message) {
+            //             // has same field in the message
+            //             // check value
+            //             if(keyFieldValue.value === message[keyFieldValue.field]) {
+            //                 // has the same value
+            //                 result = true;
+            //             }
+            //         }
+            //     });
+            // });
+
             return false;
         }
 
@@ -295,8 +322,11 @@
             // 2) the task's key field and value
             if(this.isTopicAcceptable(workflow, topic) &&
                 this.isEventAcceptableByKeyFieldValue(topic, message)) {
+                // update key-field values for my topic
+                this.updateEventKeyFieldValueFromMessage(topic, message);
                 return true;
             }
+
             return false;
         }
 
diff --git a/src/types/workflowtask.js b/src/types/workflowtask.js
index efde42b..8e18700 100644
--- a/src/types/workflowtask.js
+++ b/src/types/workflowtask.js
@@ -20,9 +20,15 @@
 
     const logger = require('../config/logger.js');
 
+    const CORD_SENSORS = [
+        'XOSEventSensor', 'XOSModelSensor',
+        'CORDEventSensor', 'CORDModelSensor'
+    ];
+
     class WorkflowTask {
         constructor(id, kickstart=false) {
             this.id = id;
+            this.sensorClass = null;
             this.topic = null;
             this.kickstart = kickstart;
             this.keyField = null;
@@ -40,20 +46,22 @@
                     return null;
                 }
 
+                if('class' in essence) {
+                    workflowTask.setSensorClass(essence.class);
+                }
+
                 if('topic' in essence) {
                     workflowTask.setTopic(essence.topic);
                 }
 
                 if('model_name' in essence) {
-                    workflowTask.setTopic('datamodel.' + essence.model_name + '.create');
-                    workflowTask.setTopic('datamodel.' + essence.model_name + '.update');
-                    workflowTask.setTopic('datamodel.' + essence.model_name + '.delete');
+                    workflowTask.setTopic('datamodel.' + essence.model_name);
                 }
 
                 if('key_field' in essence) {
                     workflowTask.setKeyField(essence.key_field);
                 }
-    
+
                 workflowTask.setEssence(essence);
                 return workflowTask;
             }
@@ -70,6 +78,23 @@
             return this.id;
         }
 
+        setSensorClass(sensorClass) {
+            this.sensorClass = sensorClass;
+        }
+
+        getSensorClass() {
+            return this.sensorClass;
+        }
+
+        isCORDTask() {
+            if(this.sensorClass) {
+                if(CORD_SENSORS.includes(this.sensorClass)) {
+                    return true;
+                }
+            }
+            return false;
+        }
+
         setTopic(topic) {
             this.topic = topic;
         }
@@ -89,7 +114,7 @@
         setKeyField(keyField) {
             this.keyField = keyField;
         }
-        
+
         getKeyField() {
             return this.keyField;
         }
@@ -109,21 +134,21 @@
             }
 
             // general Airflow operators other than XOS operators don't have these fields.
-            // 
+            //
             // if(!this.topic) {
             //     logger.log('error', 'topic is not given');
             //     return false;
             // }
-    
+
             // if(!this.keyField) {
             //     logger.log('error', 'keyField is not given');
             //     return false;
             // }
-    
+
             return true;
         }
     }
-    
+
     module.exports = {
         WorkflowTask: WorkflowTask
     };