Implement basic functionalities for workflow control.
- Manage join/leave of clients
- All clients communicate via socket.io
- Probes emit events
- Managers register workflows (by using a workflow essence)
- Send kickstart request to Managers to launch workflows
- Route events to workflow runs
- Queue events to not lose events between workflow tasks
- Fixed some issues found while working on testcases
- Set to perform coverage and unittest and generate outputs to files

Change-Id: I678723edc20df9247d63a4bf6380785ab8b2b221
diff --git a/src/controllers/eventrouter.js b/src/controllers/eventrouter.js
new file mode 100644
index 0000000..82d27db
--- /dev/null
+++ b/src/controllers/eventrouter.js
@@ -0,0 +1,512 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+    'use strict';
+
+    const _ = require('lodash');
+    const logger = require('../config/logger.js');
+    const Client = require('../types/client.js');
+    const WorkflowRun = require('../types/workflowrun.js');
+    const ws_manager = require('./ws_manager.js');
+    const ws_workflowrun = require('./ws_workflowrun.js');
+
+    let allClients = {}; // has publishers and subscribers
+    let probeClients = {}; // a subset of clients
+    let workflowManagerClients = {}; // a subset of clients
+    let workflowRunClients = {}; // a subset of clients
+
+    //let io;
+
+    // key: workflow id
+    // value: Workflow instance
+    let workflows = {};
+
+    // key: workflow run id
+    // value: WorkflowRun instance
+    let workflowRuns = {};
+
+    let serviceEvents = {
+        GREETING: 'cord.workflow.ctlsvc.greeting'
+    };
+
+    // add ws_mgroperation events
+    _.forOwn(ws_manager.serviceEvents, (wsServiceEvent, key) => {
+        serviceEvents[key] = wsServiceEvent;
+    });
+
+    // add ws_runoperation events
+    _.forOwn(ws_workflowrun.serviceEvents, (wsServiceEvent, key) => {
+        serviceEvents[key] = wsServiceEvent;
+    });
+
+    //const setIO = (ioInstance) => {
+    //    io = ioInstance;
+    //};
+
+    const destroy = () => {
+        removeClients();
+        clearWorkflowRuns();
+        clearWorkflows();
+    };
+
+    const listWorkflows = () => {
+        let workflowList = [];
+        _.forOwn(workflows, (_workflow, workflowId) => {
+            workflowList.push(workflowId);
+        });
+        return workflowList;
+    };
+
+    const checkWorkflow = (workflowId) => {
+        if(workflowId in workflows) {
+            return true;
+        }
+        return false;
+    };
+
+    const addWorkflow = (workflow) => {
+        if(workflow.getId() in workflows) {
+            logger.log('error', `there exists a workflow with the same id - ${workflow.getId()}`);
+            return false;
+        }
+
+        let workflowId = workflow.getId();
+        workflows[workflowId] = workflow;
+        return true;
+    };
+
+    const clearWorkflows = () => {
+        _.forOwn(workflows, (_workflow, workflowId) => {
+            delete workflows[workflowId];
+        });
+    };
+
+    const listWorkflowRuns = () => {
+        let workflowRunList = [];
+        _.forOwn(workflowRuns, (_workflowRun, workflowRunId) => {
+            workflowRunList.push(workflowRunId);
+        });
+        return workflowRunList;
+    };
+
+    const checkWorkflowRun = (workflowRunId) => {
+        if(workflowRunId in workflowRuns) {
+            return true;
+        }
+        return false;
+    };
+
+    const addWorkflowRun = (workflowRun) => {
+        let workflowId = workflowRun.getWorkflowId();
+        let workflowRunId = workflowRun.getId();
+
+        if(workflowRunId in workflowRuns) {
+            logger.log('warn', `there exists a workflow run with the same id - ${workflowRunId}`);
+            return false;
+        }
+
+        if(!(workflowId in workflows)) {
+            logger.log('warn', `cannot find a workflow with id - ${workflowId}`);
+            return false;
+        }
+
+        workflowRuns[workflowRunId] = workflowRun;
+        return true;
+    };
+
+    const clearWorkflowRuns = () => {
+        _.forOwn(workflowRuns, (_workflowRun, workflowRunId) => {
+            delete workflowRuns[workflowRunId];
+        });
+    };
+
+    const updateWorkflowRunStatus = (workflowRunId, taskId, status) => {
+        if(!(workflowRunId in workflowRuns)) {
+            logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
+            return false;
+        }
+
+        let workflowRun = workflowRuns[workflowRunId];
+        workflowRun.updateTaskStatus(taskId, status);
+        return true;
+    };
+
+    const setWorkflowRunKickstarted = (workflowRunId) => {
+        if(!(workflowRunId in workflowRuns)) {
+            logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
+            return false;
+        }
+
+        let workflowRun = workflowRuns[workflowRunId];
+        workflowRun.setKickstarted();
+        return true;
+    };
+
+    const kickstart = (workflowId, workflowRunId) => {
+        if(!(workflowId in workflows)) {
+            logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
+            return false;
+        }
+
+        if(!(workflowRunId in workflowRuns)) {
+            logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
+            return false;
+        }
+
+        ws_manager.kickstartWorkflow(workflowId, workflowRunId);
+    };
+
+    const removeWorkflow = (workflowId) => {
+        if(!(workflowId in workflows)) {
+            logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
+            return false;
+        }
+
+        // check if there are workflow runs
+        _.forOwn(workflowRuns, (workflowRun, _workflowRunId) => {
+            if(workflowRun.getWorkflowId() === workflowId) {
+                logger.log('warn', `there exists a workflow run for a workflow id - ${workflowId}`);
+                return false;
+            }
+        });
+
+        delete workflows[workflowId];
+        return true;
+    };
+
+    const removeWorkflowRun = (workflowRunId) => {
+        if(!(workflowRunId in workflowRuns)) {
+            logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
+            return false;
+        }
+
+        let workflowRun = workflowRuns[workflowRunId];
+        delete workflowRuns[workflowRunId];
+
+        workflowRun.setFinished();
+        return true;
+    };
+
+    const sendEvent = (topic, message) => {
+        // list of workflowIds
+        // to check if there are workflow runs for the events
+        let workflowIdsRunning = [];
+
+        // route event to running instances
+        _.forOwn(workflowRuns, (workflowRun, workflowRunId) => {
+            let workflowId = workflowRun.getWorkflowId();
+            let workflow = workflows[workflowId];
+
+            // event will be routed to workflow runs that meet following criteria
+            // 1) the workflow is currently interested in the same topic
+            //      (already finished tasks are not counted)
+            // 2) the task's key field and value
+            if(workflowRun.isEventAcceptable(workflow, topic, message)) {
+                logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
+                workflowRun.enqueueEvent(topic, message);
+
+                if(!workflowIdsRunning.includes(workflowId)) {
+                    workflowIdsRunning.push(workflowId);
+                }
+            }
+        });
+
+        // check if the event is a kickstart event
+        _.forOwn(workflows, (workflow, workflowId) => {
+            if(workflow.isKickstartTopic(topic)) {
+                // check if there is a workflow run for the event
+                // kickstart a workflow if there is no workflows runs for the event
+                if(!workflowIdsRunning.includes(workflowId)) {
+                    // we need to buffer the event until workflow run is brought up
+                    let workflowRun = WorkflowRun.WorkflowRun.makeNewRun(workflow);
+                    let workflowRunId = workflowRun.getId();
+
+                    // register for management
+                    workflowRuns[workflowRunId] = workflowRun;
+
+                    // route event
+                    logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
+                    workflowRun.enqueueEvent(topic, message);
+
+                    // KICKSTART!
+                    kickstart(workflowId, workflowRunId);
+                }
+            }
+        });
+    };
+
+    const fetchEvent = (workflowRunId, taskId, topic) => {
+        // this returns an event or an empty obj when there is no message
+        if(!(workflowRunId in workflowRuns)) {
+            logger.log('warn', `workflow run ${workflowRunId} does not exist`);
+            return null;
+        }
+
+        let workflowRun = workflowRuns[workflowRunId];
+        let workflowId = workflowRun.getWorkflowId();
+
+        if(!(workflowId in workflows)) {
+            logger.log('warn', `workflow ${workflowId} does not exist`);
+            return null;
+        }
+
+        let workflow = workflows[workflowId];
+
+        let task = workflow.getTask(taskId);
+        if(!task) {
+            logger.log('warn', `workflow ${workflowId} does not have task ${taskId}`);
+            return null;
+        }
+
+        logger.log('debug', `workflow run ${workflowRunId}, task ${taskId} fetches an event`);
+
+        let event = workflowRun.dequeueEvent(topic);
+        if(event) {
+            return event;
+        }
+        else {
+            return {};
+        }
+    };
+
+    const addClient = (c) => {
+        let clientId = c.getId();
+        let socket = c.getSocket();
+
+        // check id that client is already there
+        if(clientId in allClients) {
+            logger.log('warn', `there exists a client with the same id - ${clientId}`);
+            return false;
+        }
+
+        if(c.getType() === Client.Type.PROBE) {
+            // probe' messages are relayed
+            // relay messages based on topic
+            // probe protocol:
+            // REQ:
+            //      topic: event topic
+            //      message: <data>
+            // RES:
+            //      topic: topic sent
+            //      message: {result: <true/false>, message: <error message> }
+            allClients[clientId] = c;
+            probeClients[clientId] = c;
+
+            socket.on('*', (msg) => {
+                let jsonMsg = msg.data;
+                if(jsonMsg.length === 2) {
+                    // must have two parts
+                    // first part is topic
+                    // second part is message body
+                    let topic = jsonMsg[0];
+                    let messageBody = jsonMsg[1];
+
+                    sendEvent(topic, messageBody);
+
+                    // return true for success
+                    socket.emit(topic, {
+                        result: true
+                    });
+                }
+                else {
+                    logger.log('warn', `unexpected message is received - ${JSON.stringify(jsonMsg)}`);
+                    socket.emit(jsonMsg[0], {
+                        result: false,
+                        message: `unexpected message is received - ${JSON.stringify(jsonMsg)}`
+                    });
+                }
+            });
+            return true;
+        }
+        else if(c.getType() === Client.Type.WORKFLOW_MANAGER) {
+            // manager
+            // manager protocol:
+            // REQ:
+            //      topic: operation
+            //      message: <data>
+            // RES:
+            //      topic: topic sent
+            //      message: {result: <true/false>, message: <error message> }F
+            allClients[clientId] = c;
+            workflowManagerClients[clientId] = c;
+
+            // attach manager operations
+            let router = ws_manager.getRouter();
+            _.forOwn(router, (routerElem, _key) => {
+                socket.on(routerElem.topic, (msg) => {
+                    routerElem.handler(routerElem.topic, msg, (err, result) => {
+                        if(err) {
+                            logger.log('warn', `unable to handle a message - ${err}`);
+                            socket.emit(routerElem.topic, {
+                                result: false,
+                                message: err
+                            });
+                            return;
+                        }
+
+                        if(routerElem.return === undefined || routerElem.return) {
+                            socket.emit(routerElem.topic, {
+                                result: result
+                            });
+                        }
+                    });
+                });
+            });
+            return true;
+        }
+        else if(c.getType() === Client.Type.WORKFLOW_RUN) {
+            // workflow run
+            // workflow run protocol:
+            // REQ:
+            //      topic: operation
+            //      message: <data>
+            // RES:
+            //      topic: topic sent
+            //      message: {result: <true/false>, message: <error message> }
+
+            // map to WorkflowRun instance
+            let workflowId = c.getWorkflowId();
+            let workflowRunId = c.getWorkflowRunId();
+            let workflowRun;
+
+            if(!(workflowId in workflows)) {
+                logger.log('warn', `cannot find a workflow ${workflowId}`);
+                return false;
+            }
+
+            // register client to workflow run
+            if(!(workflowRunId in workflowRuns)) {
+                // workflow run not exist yet
+                logger.log('warn', `cannot find a workflow run ${workflowRunId}`);
+                return false;
+            }
+
+            //let workflow = workflows[workflowId];
+
+            allClients[clientId] = c;
+            workflowRunClients[clientId] = c;
+
+            // update
+            workflowRun = workflowRuns[workflowRunId];
+            workflowRun.addClientId(clientId);
+
+            // attach workflow run operations
+            let router = ws_workflowrun.getRouter();
+            _.forOwn(router, (routerElem, _key) => {
+                socket.on(routerElem.topic, (msg) => {
+                    routerElem.handler(routerElem.topic, msg, (err, result) => {
+                        if(err) {
+                            logger.log('warn', `unable to handle a message - ${err}`);
+                            socket.emit(routerElem.topic, {
+                                result: false,
+                                message: err
+                            });
+                            return;
+                        }
+
+                        if(routerElem.return === undefined || routerElem.return) {
+                            socket.emit(routerElem.topic, {
+                                result: result
+                            });
+                        }
+                    });
+                });
+            });
+            return true;
+        }
+        return false;
+    };
+
+    const removeClient = (id) => {
+        if(id in allClients) {
+            let removedClient = allClients[id];
+            delete allClients[id];
+
+            let type = removedClient.getType();
+            if(type === Client.Type.PROBE) {
+                delete probeClients[id];
+            }
+            else if(type === Client.Type.WORKFLOW_MANAGER) {
+                delete workflowManagerClients[id];
+            }
+            else if(type === Client.Type.WORKFLOW_RUN) {
+                delete workflowRunClients[id];
+
+                let workflowRunId = removedClient.getWorkflowRunId();
+                let workflowRun = workflowRuns[workflowRunId];
+
+                if(workflowRun) {
+                    workflowRun.removeClientId(id);
+
+                    //TODO
+                    // WorkflowRun can have no clients between tasks
+                    // So we should not remove the run until the workflow run finishes
+                }
+            }
+        }
+    };
+
+    const removeClients = () => {
+        let probeClients = {};
+
+        _.forOwn(probeClients, (_probeClient, clientId) => {
+            delete probeClients[clientId];
+        });
+
+        _.forOwn(workflowManagerClients, (_workflowManagerClient, clientId) => {
+            delete workflowManagerClients[clientId];
+        });
+
+        _.forOwn(workflowRunClients, (_workflowRunClients, clientId) => {
+            delete workflowRunClients[clientId];
+        });
+
+        _.forOwn(allClients, (client, clientId) => {
+            client.getSocket().disconnect(true);
+            delete allClients[clientId];
+        });
+    }
+
+    module.exports = {
+        serviceEvents: serviceEvents,
+        destroy: destroy,
+        getClients: () => { return allClients; },
+        getProbeClients: () => { return probeClients; },
+        getWorkflowManagerClients: () => { return workflowManagerClients; },
+        getWorkflowRunClients: () => { return workflowRunClients; },
+        clientType: Client.Type,
+        //setIO: setIO,
+        sendEvent: sendEvent,
+        fetchEvent: fetchEvent,
+        addClient: addClient,
+        removeClient: removeClient,
+        removeClients: removeClients,
+        addWorkflow: addWorkflow,
+        listWorkflows: listWorkflows,
+        checkWorkflow: checkWorkflow,
+        removeWorkflow: removeWorkflow,
+        clearWorkflows: clearWorkflows,
+        addWorkflowRun: addWorkflowRun,
+        listWorkflowRuns: listWorkflowRuns,
+        checkWorkflowRun: checkWorkflowRun,
+        removeWorkflowRun: removeWorkflowRun,
+        clearWorkflowRuns: clearWorkflowRuns,
+        updateWorkflowRunStatus: updateWorkflowRunStatus,
+        setWorkflowRunKickstarted: setWorkflowRunKickstarted,
+    };
+})();
\ No newline at end of file
diff --git a/src/controllers/rest_probe.js b/src/controllers/rest_probe.js
new file mode 100644
index 0000000..9d67386
--- /dev/null
+++ b/src/controllers/rest_probe.js
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+    'use strict';
+
+    const express = require('express');
+    const {checkSchema, validationResult} = require('express-validator');
+    const logger = require('../config/logger.js');
+    const eventrouter = require('./eventrouter.js');
+
+    // HTTP REST interface for message intake
+    // POST method
+    // Message format:
+    // {
+    //     topic: 'topic here',
+    //     message: 'message body here'
+    // }
+    // e.g., /intake?topic=aaa&message=bbb
+    const intakeMessageInputValidator = {
+        topic: {
+            in: ['params', 'query'],
+            errorMessage: 'Message topic is null or empty',
+        },
+        message: {
+            in: ['params', 'query'],
+            errorMessage: 'Message body is null or empty',
+        }
+    };
+
+    const intakeMessage = (req, res) => {
+        let errors = validationResult(req);
+        if(!errors.isEmpty()) {
+            res.status(400).send(
+                JSON.stringify({
+                    errors: errors.array()
+                })
+            );
+            return;
+        }
+
+        let jsonMessage = req.body
+        logger.debug(`Received a message ${jsonMessage}`);
+
+        // send the message to the event distributor
+        eventrouter.sendEvent(jsonMessage.topic, jsonMessage.message);
+
+        res.status(200).send({
+            result: true
+        });
+        return;
+    };
+
+    const getRouter = () => {
+        var routerInstance = new express.Router();
+        routerInstance.use((req, res, next) => {
+            logger.info(`[REQ] ${req.method}, ${req.url}`);
+            next();
+        });
+
+        // intake apis
+        routerInstance.post('/intake', checkSchema(intakeMessageInputValidator), intakeMessage);
+        return routerInstance;
+    };
+
+    module.exports = {
+        getRouter: getRouter
+    };
+})();
\ No newline at end of file
diff --git a/src/controllers/websocket.js b/src/controllers/websocket.js
new file mode 100644
index 0000000..3f3216a
--- /dev/null
+++ b/src/controllers/websocket.js
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+(function () {
+    'use strict';
+
+    const socketio = require('socket.io');
+    const ioWildcard = require('socketio-wildcard');
+    const client = require('../types/client.js');
+    const eventrouter = require('./eventrouter.js');
+    const logger = require('../config/logger.js');
+
+    let io;
+    const createSocketIO = (server) => {
+        // INSTANTIATE SOCKET.IO
+        io = socketio.listen(server);
+        io.use(ioWildcard());
+
+        // set io to eventrouter
+        //eventrouter.setIO(io);
+
+        // LISTEN TO "CONNECTION" EVENT (FROM SOCKET.IO)
+        io.on('connection', (socket) => {
+            let query = socket.handshake.query;
+            logger.log('debug', `connect ${JSON.stringify(query)}`);
+            let added = false;
+
+            // make a client
+            let c = client.Client.fromObj(query);
+            c.setSocket(socket);
+
+            if(!c.validate()) {
+                logger.log('warn', `client validation failed - ${JSON.stringify(query)}`);
+                return;
+            }
+
+            // register the client for management
+            if(eventrouter.addClient(c)) {
+                // Send a greeting message to the client
+                socket.emit(eventrouter.serviceEvents.GREETING, {
+                    to: c.getId(),
+                    message: 'Welcome to CORD Workflow Control Service'
+                });
+
+                added = true;
+            }
+            else {
+                logger.log('warn', `client could not be added - ${JSON.stringify(query)}`);
+                socket.disconnect(true);
+            }
+
+            // set a disconnect event handler
+            socket.on('disconnect', (reason) => {
+                logger.log('debug', `disconnect ${reason} ${JSON.stringify(query)}`);
+                if(added) {
+                    eventrouter.removeClient(c.getId());
+                }
+            });
+        });
+    };
+
+    const destroySocketIO = () => {
+        io.close();
+    };
+
+    const getSocketIO = () => io;
+
+    module.exports = {
+        create: createSocketIO,
+        destroy: destroySocketIO,
+        get: getSocketIO
+    };
+
+    // USAGE
+    // const socketIo = require('./controllers/websocket.js');
+    // const socket = socketIo.get();
+    // socket.emit('eventName', data);
+
+})();
\ No newline at end of file
diff --git a/src/controllers/ws_manager.js b/src/controllers/ws_manager.js
new file mode 100644
index 0000000..b3e8877
--- /dev/null
+++ b/src/controllers/ws_manager.js
@@ -0,0 +1,328 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+    'use strict';
+
+    const _ = require('lodash');
+    const Workflow = require('../types/workflow.js');
+    const logger = require('../config/logger.js');
+
+    let serviceEvents = {
+        WORKFLOW_REG: 'cord.workflow.ctlsvc.workflow.reg',
+        WORKFLOW_REG_ESSENCE: 'cord.workflow.ctlsvc.workflow.reg_essence',
+        WORKFLOW_LIST: 'cord.workflow.ctlsvc.workflow.list',
+        WORKFLOW_RUN_LIST: 'cord.workflow.ctlsvc.workflow.run.list',
+        WORKFLOW_CHECK: 'cord.workflow.ctlsvc.workflow.check',
+        WORKFLOW_KICKSTART: 'cord.workflow.ctlsvc.workflow.kickstart',
+        WORKFLOW_REMOVE: 'cord.workflow.ctlsvc.workflow.remove',
+        WORKFLOW_RUN_REMOVE: 'cord.workflow.ctlsvc.workflow.run.remove'
+    };
+
+    // WebSocket interface for workflow registration
+    // Message format:
+    // {
+    //     topic: 'cord.workflow.ctlsvc.workflow.reg',
+    //     message: <workflow>
+    // }
+    const registWorkflow = (topic, message, cb) => {
+        const distributor = require('./eventrouter.js/index.js');
+
+        let errorMessage;
+        if(!message) {
+            // error
+            errorMessage = `Message body for topic ${topic} is null or empty`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let workflow = message;
+
+        logger.log('debug', `workflow - ${JSON.stringify(workflow)}`);
+
+        let result = distributor.addWorkflow(workflow);
+        if(!result) {
+            errorMessage = `failed to register a workflow ${workflow.getId()}`;
+            cb(errorMessage, false);
+        }
+        else {
+            cb(null, true);
+        }
+        return;
+    };
+
+    // WebSocket interface for workflow registration (via essence)
+    // Message format:
+    // {
+    //     topic: 'cord.workflow.ctlsvc.workflow.reg',
+    //     message: <workflow essence>
+    // }
+    const registerWorkflowEssence = (topic, message, cb) => {
+        const eventrouter = require('./eventrouter.js');
+        let errorMessage;
+        if(!message) {
+            // error
+            errorMessage = `Message body for topic ${topic} is null or empty`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let essence = message;
+        let result = true;
+        let errorResults = [];
+
+        logger.log('debug', `workflow essence - ${JSON.stringify(essence)}`);
+
+        let workflows = Workflow.loadWorkflowsFromEssence(essence);
+        workflows.forEach((workflow) => {
+            if(workflow) {
+                let localResult = eventrouter.addWorkflow(workflow);
+                errorResults.push(localResult);
+                result = result && localResult; // false if any of registrations fails
+            }
+        });
+
+        if(!result) {
+            errorMessage = `failed to register workflows ${errorResults}`;
+            cb(errorMessage, false);
+        }
+        else {
+            cb(null, true);
+        }
+        return;
+    };
+
+    // WebSocket interface for workflow listing
+    // Message format:
+    // {
+    //     topic: 'cord.workflow.ctlsvc.workflow.list',
+    //     message: null
+    // }
+    const listWorkflows = (topic, message, cb) => {
+        const eventrouter = require('./eventrouter.js');
+
+        let result = eventrouter.listWorkflows();
+        cb(null, result);
+        return;
+    };
+
+    // WebSocket interface for workflow run listing
+    // Message format:
+    // {
+    //     topic: 'cord.workflow.ctlsvc.workflow.list',
+    //     message: null
+    // }
+    const listWorkflowRuns = (topic, message, cb) => {
+        const eventrouter = require('./eventrouter.js');
+
+        let result = eventrouter.listWorkflowRuns();
+        cb(null, result);
+        return;
+    };
+
+    // WebSocket interface for workflow check
+    // Message format:
+    // {
+    //     topic: 'cord.workflow.ctlsvc.workflow.check',
+    //     message: <workflow_id>
+    // }
+    const checkWorkflow = (topic, message, cb) => {
+        const eventrouter = require('./eventrouter.js');
+
+        let errorMessage;
+        if(!message) {
+            // error
+            errorMessage = `Message body for topic ${topic} is null or empty`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let workflowId = message;
+        let result = eventrouter.checkWorkflow(workflowId);
+        cb(null, result);
+        return;
+    };
+
+    // WebSocket interface for workflow start notification
+    // Message format:
+    // {
+    //     topic: 'cord.workflow.ctlsvc.workflow.kickstart',
+    //     message: {
+    //          workflow_id: <workflow_id>,
+    //          workflow_run_id: <workflow_run_id>
+    //     }
+    // }
+    const notifyWorkflowStart = (topic, message, cb) => {
+        const eventrouter = require('./eventrouter.js');
+
+        let errorMessage;
+        if(!message) {
+            // error
+            errorMessage = `Message body for topic ${topic} is null or empty`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('workflow_id' in message)) {
+            // error
+            errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('workflow_run_id' in message)) {
+            // error
+            errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(message)}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let workflowRunId = message.workflow_run_id;
+
+        // there must be a workflow matching
+        // set the workflow kickstarted
+        eventrouter.setWorkflowRunKickstarted(workflowRunId);
+        cb(null, true);
+        return;
+    }
+
+    // WebSocket interface for workflow removal
+    // Message format:
+    // {
+    //     topic: 'cord.workflow.ctlsvc.workflow.remove',
+    //     message: <workflow_id>
+    // }
+    const removeWorkflow = (topic, message, cb) => {
+        const eventrouter = require('./eventrouter.js');
+
+        let workflowId = message;
+        let result = eventrouter.removeWorkflow(workflowId);
+        cb(null, result);
+        return;
+    }
+
+    // WebSocket interface for workflow run removal
+    // Message format:
+    // {
+    //     topic: 'cord.workflow.ctlsvc.workflow.run.remove',
+    //     message: {
+    //          workflow_id: <workflow_id>,
+    //          workflow_run_id: <workflow_run_id>
+    //     }
+    // }
+    const removeWorkflowRun = (topic, message, cb) => {
+        const eventrouter = require('./eventrouter.js');
+
+        let errorMessage;
+        if(!message) {
+            // error
+            errorMessage = `Message body for topic ${topic} is null or empty`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('workflow_id' in message)) {
+            // error
+            errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('workflow_run_id' in message)) {
+            // error
+            errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(message)}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let workflowRunId = message.workflow_run_id;
+
+        let result = eventrouter.removeWorkflowRun(workflowRunId);
+        cb(null, result);
+        return;
+    }
+
+    const getRouter = () => {
+        return {
+            registWorkflow: {
+                topic: serviceEvents.WORKFLOW_REG,
+                handler: registWorkflow
+            },
+            registerWorkflowEssence: {
+                topic: serviceEvents.WORKFLOW_REG_ESSENCE,
+                handler: registerWorkflowEssence
+            },
+            listWorkflows: {
+                topic: serviceEvents.WORKFLOW_LIST,
+                handler: listWorkflows
+            },
+            listWorkflowRuns: {
+                topic: serviceEvents.WORKFLOW_RUN_LIST,
+                handler: listWorkflowRuns
+            },
+            checkWorkflow: {
+                topic: serviceEvents.WORKFLOW_CHECK,
+                handler: checkWorkflow
+            },
+            notifyWorkflowStart: {
+                topic: serviceEvents.WORKFLOW_KICKSTART,
+                handler: notifyWorkflowStart,
+                return: false
+            },
+            removeWorkflow: {
+                topic: serviceEvents.WORKFLOW_REMOVE,
+                handler: removeWorkflow
+            },
+            removeWorkflowRun: {
+                topic: serviceEvents.WORKFLOW_RUN_REMOVE,
+                handler: removeWorkflowRun
+            }
+        };
+    };
+
+    // out-going commands
+    const kickstartWorkflow = (workflowId, workflowRunId) => {
+        const eventrouter = require('./eventrouter.js');
+
+        let clients = eventrouter.getWorkflowManagerClients();
+        _.forOwn(clients, (client, _clientId) => {
+            let socket = client.getSocket();
+            if(socket) {
+                socket.emit(serviceEvents.WORKFLOW_KICKSTART, {
+                    workflow_id: workflowId,
+                    workflow_run_id: workflowRunId
+                });
+            }
+        });
+        return;
+    };
+
+    module.exports = {
+        serviceEvents: serviceEvents,
+        getRouter: getRouter,
+        kickstartWorkflow: kickstartWorkflow
+    };
+})();
\ No newline at end of file
diff --git a/src/controllers/ws_workflowrun.js b/src/controllers/ws_workflowrun.js
new file mode 100644
index 0000000..f7ee474
--- /dev/null
+++ b/src/controllers/ws_workflowrun.js
@@ -0,0 +1,183 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+    'use strict';
+
+    const logger = require('../config/logger.js');
+
+    let serviceEvents = {
+        WORKFLOW_RUN_UPDATE_STATUS: 'cord.workflow.ctlsvc.workflow.run.status',
+        WORKFLOW_RUN_FETCH_EVENT: 'cord.workflow.ctlsvc.workflow.run.fetch'
+    };
+
+    // WebSocket interface for workflow status update
+    // Message format:
+    // {
+    //     topic: 'cord.workflow.ctlsvc.workflowrun.status',
+    //     message: {
+    //          workflow_id: <workflow_id>,
+    //          workflow_run_id: <workflow_run_id>,
+    //          task_id: <task_id>,
+    //          status: 'begin' or 'end'
+    //     }
+    // }
+    const updateWorkflowRunStatus = (topic, message, cb) => {
+        const eventrouter = require('./eventrouter.js');
+
+        let errorMessage;
+        if(!message) {
+            // error
+            errorMessage = `Message body for topic ${topic} is null or empty`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('workflow_id' in message)) {
+            // error
+            errorMessage = `workflow_id field is not in message body - ${message}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('workflow_run_id' in message)) {
+            // error
+            errorMessage = `workflow_run_id field is not in message body - ${message}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('task_id' in message)) {
+            // error
+            errorMessage = `task_id field is not in message body - ${message}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('status' in message)) {
+            // error
+            errorMessage = `status field is not in message body - ${message}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let result = eventrouter.updateWorkflowRunStatus(
+            message.workflow_id,
+            message.workflow_run_id,
+            message.task_id,
+            message.status.toLowerCase()
+        );
+        cb(null, result);
+        return;
+    };
+
+    // WebSocket interface for workflow status update
+    // Message format:
+    // {
+    //     topic: 'cord.workflow.ctlsvc.workflowrun.fetch',
+    //     message: {
+    //          workflow_id: <workflow_id>,
+    //          workflow_run_id: <workflow_run_id>,
+    //          task_id: <task_id>,
+    //          topic: <expected topic>
+    //     }
+    // }
+    const fetchEvent = (topic, message, cb) => {
+        const eventrouter = require('./eventrouter.js');
+
+        let errorMessage;
+        if(!message) {
+            // error
+            errorMessage = `Message body for topic ${topic} is null or empty`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('workflow_id' in message)) {
+            // error
+            errorMessage = `workflow_id field is not in message body - ${message}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('workflow_run_id' in message)) {
+            // error
+            errorMessage = `workflow_run_id field is not in message body - ${message}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('task_id' in message)) {
+            // error
+            errorMessage = `task_id field is not in message body - ${message}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('topic' in message)) {
+            // error
+            errorMessage = `topic field is not in message body - ${message}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let result = eventrouter.fetchEvent(
+            message.workflow_run_id,
+            message.task_id,
+            message.topic
+        );
+        if(result) {
+            // empty object {} when no message
+            cb(null, result);
+        }
+        else {
+            cb(
+                `could not fetch event ${message.topic} from workflow run ${message.workflow_run_id}`,
+                null
+            );
+        }
+        return;
+    };
+
+    const getRouter = () => {
+        return {
+            updateWorkflowRunStatus: {
+                topic: serviceEvents.WORKFLOW_RUN_UPDATE_STATUS,
+                handler: updateWorkflowRunStatus
+            },
+            fetchEvent: {
+                topic: serviceEvents.WORKFLOW_RUN_FETCH_EVENT,
+                handler: fetchEvent
+            }
+        };
+    };
+
+    module.exports = {
+        serviceEvents: serviceEvents,
+        getRouter: getRouter
+    };
+})();
\ No newline at end of file