Implement basic functionalities for workflow control.
- Manage join/leave of clients
- All clients communicate via socket.io
- Probes emit events
- Managers register workflows (by using a workflow essence)
- Send kickstart request to Managers to launch workflows
- Route events to workflow runs
- Queue events to not lose events between workflow tasks
- Fixed some issues found while working on testcases
- Set to perform coverage and unittest and generate outputs to files

Change-Id: I678723edc20df9247d63a4bf6380785ab8b2b221
diff --git a/src/config/config.js b/src/config/config.js
new file mode 100644
index 0000000..288e184
--- /dev/null
+++ b/src/config/config.js
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+(function () {
+    'use strict';
+  
+    const argv = require('yargs').argv;
+    const path = require('path');
+    const YamlConfig = require('node-yaml-config');
+    const logger = require('../config/logger.js');
+  
+    // if a config file is specified in as a CLI arguments use that one
+    const cfgFile = argv.config || 'config.yml';
+  
+    let config;
+    try {
+        logger.log('debug', `Loading ${path.join(__dirname, cfgFile)}`);
+        config = YamlConfig.load(path.join(__dirname, cfgFile));
+        logger.log('debug', `Parsed config: ${JSON.stringify(config)}`);
+    }
+    catch(e) {
+        logger.log('debug', `No ${cfgFile} found, using default params`);
+    }
+  
+    module.exports = {
+        service: {
+            port: (config && config.service) ? config.service.port : 3000
+        }
+    };
+})();
\ No newline at end of file
diff --git a/src/config/config.yml b/src/config/config.yml
new file mode 100644
index 0000000..6fb1570
--- /dev/null
+++ b/src/config/config.yml
@@ -0,0 +1,17 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+default:
+  service:
+    port: 3030
diff --git a/src/config/logger.js b/src/config/logger.js
new file mode 100644
index 0000000..0a70904
--- /dev/null
+++ b/src/config/logger.js
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+(function () {
+    'use strict';
+    
+    const winston = require('winston');
+    const fs = require('fs');
+    const path = require('path');
+    const level = process.env.LOG_LEVEL || 'debug';
+    winston.level = level;
+  
+    const logFile = path.join(__dirname, '../../logs/cord-workflow-control-service');
+  
+    // clear old logs
+    ['error', 'debug'].forEach(l => {
+        try {
+            fs.statSync(`${logFile}.${l}.log`)
+            fs.unlinkSync(`${logFile}.${l}.log`);
+        }
+        catch(e) {
+            // log does not exist
+        }
+    });
+  
+    // create a custom logger with colorized console and persistance to file
+    const logger = winston.createLogger({
+        transports: [
+            new (winston.transports.Console)({level: level, colorize: true}),
+            new (winston.transports.File)({name: 'error-log', level: 'error', filename: `${logFile}.error.log`}),
+            new (winston.transports.File)({name: 'debug-log', level: 'debug', filename: `${logFile}.debug.log`})
+        ]
+    });
+  
+    module.exports = logger;  
+})();
\ No newline at end of file
diff --git a/src/controllers/eventrouter.js b/src/controllers/eventrouter.js
new file mode 100644
index 0000000..82d27db
--- /dev/null
+++ b/src/controllers/eventrouter.js
@@ -0,0 +1,512 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+    'use strict';
+
+    const _ = require('lodash');
+    const logger = require('../config/logger.js');
+    const Client = require('../types/client.js');
+    const WorkflowRun = require('../types/workflowrun.js');
+    const ws_manager = require('./ws_manager.js');
+    const ws_workflowrun = require('./ws_workflowrun.js');
+
+    let allClients = {}; // has publishers and subscribers
+    let probeClients = {}; // a subset of clients
+    let workflowManagerClients = {}; // a subset of clients
+    let workflowRunClients = {}; // a subset of clients
+
+    //let io;
+
+    // key: workflow id
+    // value: Workflow instance
+    let workflows = {};
+
+    // key: workflow run id
+    // value: WorkflowRun instance
+    let workflowRuns = {};
+
+    let serviceEvents = {
+        GREETING: 'cord.workflow.ctlsvc.greeting'
+    };
+
+    // add ws_mgroperation events
+    _.forOwn(ws_manager.serviceEvents, (wsServiceEvent, key) => {
+        serviceEvents[key] = wsServiceEvent;
+    });
+
+    // add ws_runoperation events
+    _.forOwn(ws_workflowrun.serviceEvents, (wsServiceEvent, key) => {
+        serviceEvents[key] = wsServiceEvent;
+    });
+
+    //const setIO = (ioInstance) => {
+    //    io = ioInstance;
+    //};
+
+    const destroy = () => {
+        removeClients();
+        clearWorkflowRuns();
+        clearWorkflows();
+    };
+
+    const listWorkflows = () => {
+        let workflowList = [];
+        _.forOwn(workflows, (_workflow, workflowId) => {
+            workflowList.push(workflowId);
+        });
+        return workflowList;
+    };
+
+    const checkWorkflow = (workflowId) => {
+        if(workflowId in workflows) {
+            return true;
+        }
+        return false;
+    };
+
+    const addWorkflow = (workflow) => {
+        if(workflow.getId() in workflows) {
+            logger.log('error', `there exists a workflow with the same id - ${workflow.getId()}`);
+            return false;
+        }
+
+        let workflowId = workflow.getId();
+        workflows[workflowId] = workflow;
+        return true;
+    };
+
+    const clearWorkflows = () => {
+        _.forOwn(workflows, (_workflow, workflowId) => {
+            delete workflows[workflowId];
+        });
+    };
+
+    const listWorkflowRuns = () => {
+        let workflowRunList = [];
+        _.forOwn(workflowRuns, (_workflowRun, workflowRunId) => {
+            workflowRunList.push(workflowRunId);
+        });
+        return workflowRunList;
+    };
+
+    const checkWorkflowRun = (workflowRunId) => {
+        if(workflowRunId in workflowRuns) {
+            return true;
+        }
+        return false;
+    };
+
+    const addWorkflowRun = (workflowRun) => {
+        let workflowId = workflowRun.getWorkflowId();
+        let workflowRunId = workflowRun.getId();
+
+        if(workflowRunId in workflowRuns) {
+            logger.log('warn', `there exists a workflow run with the same id - ${workflowRunId}`);
+            return false;
+        }
+
+        if(!(workflowId in workflows)) {
+            logger.log('warn', `cannot find a workflow with id - ${workflowId}`);
+            return false;
+        }
+
+        workflowRuns[workflowRunId] = workflowRun;
+        return true;
+    };
+
+    const clearWorkflowRuns = () => {
+        _.forOwn(workflowRuns, (_workflowRun, workflowRunId) => {
+            delete workflowRuns[workflowRunId];
+        });
+    };
+
+    const updateWorkflowRunStatus = (workflowRunId, taskId, status) => {
+        if(!(workflowRunId in workflowRuns)) {
+            logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
+            return false;
+        }
+
+        let workflowRun = workflowRuns[workflowRunId];
+        workflowRun.updateTaskStatus(taskId, status);
+        return true;
+    };
+
+    const setWorkflowRunKickstarted = (workflowRunId) => {
+        if(!(workflowRunId in workflowRuns)) {
+            logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
+            return false;
+        }
+
+        let workflowRun = workflowRuns[workflowRunId];
+        workflowRun.setKickstarted();
+        return true;
+    };
+
+    const kickstart = (workflowId, workflowRunId) => {
+        if(!(workflowId in workflows)) {
+            logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
+            return false;
+        }
+
+        if(!(workflowRunId in workflowRuns)) {
+            logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
+            return false;
+        }
+
+        ws_manager.kickstartWorkflow(workflowId, workflowRunId);
+    };
+
+    const removeWorkflow = (workflowId) => {
+        if(!(workflowId in workflows)) {
+            logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
+            return false;
+        }
+
+        // check if there are workflow runs
+        _.forOwn(workflowRuns, (workflowRun, _workflowRunId) => {
+            if(workflowRun.getWorkflowId() === workflowId) {
+                logger.log('warn', `there exists a workflow run for a workflow id - ${workflowId}`);
+                return false;
+            }
+        });
+
+        delete workflows[workflowId];
+        return true;
+    };
+
+    const removeWorkflowRun = (workflowRunId) => {
+        if(!(workflowRunId in workflowRuns)) {
+            logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
+            return false;
+        }
+
+        let workflowRun = workflowRuns[workflowRunId];
+        delete workflowRuns[workflowRunId];
+
+        workflowRun.setFinished();
+        return true;
+    };
+
+    const sendEvent = (topic, message) => {
+        // list of workflowIds
+        // to check if there are workflow runs for the events
+        let workflowIdsRunning = [];
+
+        // route event to running instances
+        _.forOwn(workflowRuns, (workflowRun, workflowRunId) => {
+            let workflowId = workflowRun.getWorkflowId();
+            let workflow = workflows[workflowId];
+
+            // event will be routed to workflow runs that meet following criteria
+            // 1) the workflow is currently interested in the same topic
+            //      (already finished tasks are not counted)
+            // 2) the task's key field and value
+            if(workflowRun.isEventAcceptable(workflow, topic, message)) {
+                logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
+                workflowRun.enqueueEvent(topic, message);
+
+                if(!workflowIdsRunning.includes(workflowId)) {
+                    workflowIdsRunning.push(workflowId);
+                }
+            }
+        });
+
+        // check if the event is a kickstart event
+        _.forOwn(workflows, (workflow, workflowId) => {
+            if(workflow.isKickstartTopic(topic)) {
+                // check if there is a workflow run for the event
+                // kickstart a workflow if there is no workflows runs for the event
+                if(!workflowIdsRunning.includes(workflowId)) {
+                    // we need to buffer the event until workflow run is brought up
+                    let workflowRun = WorkflowRun.WorkflowRun.makeNewRun(workflow);
+                    let workflowRunId = workflowRun.getId();
+
+                    // register for management
+                    workflowRuns[workflowRunId] = workflowRun;
+
+                    // route event
+                    logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
+                    workflowRun.enqueueEvent(topic, message);
+
+                    // KICKSTART!
+                    kickstart(workflowId, workflowRunId);
+                }
+            }
+        });
+    };
+
+    const fetchEvent = (workflowRunId, taskId, topic) => {
+        // this returns an event or an empty obj when there is no message
+        if(!(workflowRunId in workflowRuns)) {
+            logger.log('warn', `workflow run ${workflowRunId} does not exist`);
+            return null;
+        }
+
+        let workflowRun = workflowRuns[workflowRunId];
+        let workflowId = workflowRun.getWorkflowId();
+
+        if(!(workflowId in workflows)) {
+            logger.log('warn', `workflow ${workflowId} does not exist`);
+            return null;
+        }
+
+        let workflow = workflows[workflowId];
+
+        let task = workflow.getTask(taskId);
+        if(!task) {
+            logger.log('warn', `workflow ${workflowId} does not have task ${taskId}`);
+            return null;
+        }
+
+        logger.log('debug', `workflow run ${workflowRunId}, task ${taskId} fetches an event`);
+
+        let event = workflowRun.dequeueEvent(topic);
+        if(event) {
+            return event;
+        }
+        else {
+            return {};
+        }
+    };
+
+    const addClient = (c) => {
+        let clientId = c.getId();
+        let socket = c.getSocket();
+
+        // check id that client is already there
+        if(clientId in allClients) {
+            logger.log('warn', `there exists a client with the same id - ${clientId}`);
+            return false;
+        }
+
+        if(c.getType() === Client.Type.PROBE) {
+            // probe' messages are relayed
+            // relay messages based on topic
+            // probe protocol:
+            // REQ:
+            //      topic: event topic
+            //      message: <data>
+            // RES:
+            //      topic: topic sent
+            //      message: {result: <true/false>, message: <error message> }
+            allClients[clientId] = c;
+            probeClients[clientId] = c;
+
+            socket.on('*', (msg) => {
+                let jsonMsg = msg.data;
+                if(jsonMsg.length === 2) {
+                    // must have two parts
+                    // first part is topic
+                    // second part is message body
+                    let topic = jsonMsg[0];
+                    let messageBody = jsonMsg[1];
+
+                    sendEvent(topic, messageBody);
+
+                    // return true for success
+                    socket.emit(topic, {
+                        result: true
+                    });
+                }
+                else {
+                    logger.log('warn', `unexpected message is received - ${JSON.stringify(jsonMsg)}`);
+                    socket.emit(jsonMsg[0], {
+                        result: false,
+                        message: `unexpected message is received - ${JSON.stringify(jsonMsg)}`
+                    });
+                }
+            });
+            return true;
+        }
+        else if(c.getType() === Client.Type.WORKFLOW_MANAGER) {
+            // manager
+            // manager protocol:
+            // REQ:
+            //      topic: operation
+            //      message: <data>
+            // RES:
+            //      topic: topic sent
+            //      message: {result: <true/false>, message: <error message> }F
+            allClients[clientId] = c;
+            workflowManagerClients[clientId] = c;
+
+            // attach manager operations
+            let router = ws_manager.getRouter();
+            _.forOwn(router, (routerElem, _key) => {
+                socket.on(routerElem.topic, (msg) => {
+                    routerElem.handler(routerElem.topic, msg, (err, result) => {
+                        if(err) {
+                            logger.log('warn', `unable to handle a message - ${err}`);
+                            socket.emit(routerElem.topic, {
+                                result: false,
+                                message: err
+                            });
+                            return;
+                        }
+
+                        if(routerElem.return === undefined || routerElem.return) {
+                            socket.emit(routerElem.topic, {
+                                result: result
+                            });
+                        }
+                    });
+                });
+            });
+            return true;
+        }
+        else if(c.getType() === Client.Type.WORKFLOW_RUN) {
+            // workflow run
+            // workflow run protocol:
+            // REQ:
+            //      topic: operation
+            //      message: <data>
+            // RES:
+            //      topic: topic sent
+            //      message: {result: <true/false>, message: <error message> }
+
+            // map to WorkflowRun instance
+            let workflowId = c.getWorkflowId();
+            let workflowRunId = c.getWorkflowRunId();
+            let workflowRun;
+
+            if(!(workflowId in workflows)) {
+                logger.log('warn', `cannot find a workflow ${workflowId}`);
+                return false;
+            }
+
+            // register client to workflow run
+            if(!(workflowRunId in workflowRuns)) {
+                // workflow run not exist yet
+                logger.log('warn', `cannot find a workflow run ${workflowRunId}`);
+                return false;
+            }
+
+            //let workflow = workflows[workflowId];
+
+            allClients[clientId] = c;
+            workflowRunClients[clientId] = c;
+
+            // update
+            workflowRun = workflowRuns[workflowRunId];
+            workflowRun.addClientId(clientId);
+
+            // attach workflow run operations
+            let router = ws_workflowrun.getRouter();
+            _.forOwn(router, (routerElem, _key) => {
+                socket.on(routerElem.topic, (msg) => {
+                    routerElem.handler(routerElem.topic, msg, (err, result) => {
+                        if(err) {
+                            logger.log('warn', `unable to handle a message - ${err}`);
+                            socket.emit(routerElem.topic, {
+                                result: false,
+                                message: err
+                            });
+                            return;
+                        }
+
+                        if(routerElem.return === undefined || routerElem.return) {
+                            socket.emit(routerElem.topic, {
+                                result: result
+                            });
+                        }
+                    });
+                });
+            });
+            return true;
+        }
+        return false;
+    };
+
+    const removeClient = (id) => {
+        if(id in allClients) {
+            let removedClient = allClients[id];
+            delete allClients[id];
+
+            let type = removedClient.getType();
+            if(type === Client.Type.PROBE) {
+                delete probeClients[id];
+            }
+            else if(type === Client.Type.WORKFLOW_MANAGER) {
+                delete workflowManagerClients[id];
+            }
+            else if(type === Client.Type.WORKFLOW_RUN) {
+                delete workflowRunClients[id];
+
+                let workflowRunId = removedClient.getWorkflowRunId();
+                let workflowRun = workflowRuns[workflowRunId];
+
+                if(workflowRun) {
+                    workflowRun.removeClientId(id);
+
+                    //TODO
+                    // WorkflowRun can have no clients between tasks
+                    // So we should not remove the run until the workflow run finishes
+                }
+            }
+        }
+    };
+
+    const removeClients = () => {
+        let probeClients = {};
+
+        _.forOwn(probeClients, (_probeClient, clientId) => {
+            delete probeClients[clientId];
+        });
+
+        _.forOwn(workflowManagerClients, (_workflowManagerClient, clientId) => {
+            delete workflowManagerClients[clientId];
+        });
+
+        _.forOwn(workflowRunClients, (_workflowRunClients, clientId) => {
+            delete workflowRunClients[clientId];
+        });
+
+        _.forOwn(allClients, (client, clientId) => {
+            client.getSocket().disconnect(true);
+            delete allClients[clientId];
+        });
+    }
+
+    module.exports = {
+        serviceEvents: serviceEvents,
+        destroy: destroy,
+        getClients: () => { return allClients; },
+        getProbeClients: () => { return probeClients; },
+        getWorkflowManagerClients: () => { return workflowManagerClients; },
+        getWorkflowRunClients: () => { return workflowRunClients; },
+        clientType: Client.Type,
+        //setIO: setIO,
+        sendEvent: sendEvent,
+        fetchEvent: fetchEvent,
+        addClient: addClient,
+        removeClient: removeClient,
+        removeClients: removeClients,
+        addWorkflow: addWorkflow,
+        listWorkflows: listWorkflows,
+        checkWorkflow: checkWorkflow,
+        removeWorkflow: removeWorkflow,
+        clearWorkflows: clearWorkflows,
+        addWorkflowRun: addWorkflowRun,
+        listWorkflowRuns: listWorkflowRuns,
+        checkWorkflowRun: checkWorkflowRun,
+        removeWorkflowRun: removeWorkflowRun,
+        clearWorkflowRuns: clearWorkflowRuns,
+        updateWorkflowRunStatus: updateWorkflowRunStatus,
+        setWorkflowRunKickstarted: setWorkflowRunKickstarted,
+    };
+})();
\ No newline at end of file
diff --git a/src/controllers/rest_probe.js b/src/controllers/rest_probe.js
new file mode 100644
index 0000000..9d67386
--- /dev/null
+++ b/src/controllers/rest_probe.js
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+    'use strict';
+
+    const express = require('express');
+    const {checkSchema, validationResult} = require('express-validator');
+    const logger = require('../config/logger.js');
+    const eventrouter = require('./eventrouter.js');
+
+    // HTTP REST interface for message intake
+    // POST method
+    // Message format:
+    // {
+    //     topic: 'topic here',
+    //     message: 'message body here'
+    // }
+    // e.g., /intake?topic=aaa&message=bbb
+    const intakeMessageInputValidator = {
+        topic: {
+            in: ['params', 'query'],
+            errorMessage: 'Message topic is null or empty',
+        },
+        message: {
+            in: ['params', 'query'],
+            errorMessage: 'Message body is null or empty',
+        }
+    };
+
+    const intakeMessage = (req, res) => {
+        let errors = validationResult(req);
+        if(!errors.isEmpty()) {
+            res.status(400).send(
+                JSON.stringify({
+                    errors: errors.array()
+                })
+            );
+            return;
+        }
+
+        let jsonMessage = req.body
+        logger.debug(`Received a message ${jsonMessage}`);
+
+        // send the message to the event distributor
+        eventrouter.sendEvent(jsonMessage.topic, jsonMessage.message);
+
+        res.status(200).send({
+            result: true
+        });
+        return;
+    };
+
+    const getRouter = () => {
+        var routerInstance = new express.Router();
+        routerInstance.use((req, res, next) => {
+            logger.info(`[REQ] ${req.method}, ${req.url}`);
+            next();
+        });
+
+        // intake apis
+        routerInstance.post('/intake', checkSchema(intakeMessageInputValidator), intakeMessage);
+        return routerInstance;
+    };
+
+    module.exports = {
+        getRouter: getRouter
+    };
+})();
\ No newline at end of file
diff --git a/src/controllers/websocket.js b/src/controllers/websocket.js
new file mode 100644
index 0000000..3f3216a
--- /dev/null
+++ b/src/controllers/websocket.js
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+(function () {
+    'use strict';
+
+    const socketio = require('socket.io');
+    const ioWildcard = require('socketio-wildcard');
+    const client = require('../types/client.js');
+    const eventrouter = require('./eventrouter.js');
+    const logger = require('../config/logger.js');
+
+    let io;
+    const createSocketIO = (server) => {
+        // INSTANTIATE SOCKET.IO
+        io = socketio.listen(server);
+        io.use(ioWildcard());
+
+        // set io to eventrouter
+        //eventrouter.setIO(io);
+
+        // LISTEN TO "CONNECTION" EVENT (FROM SOCKET.IO)
+        io.on('connection', (socket) => {
+            let query = socket.handshake.query;
+            logger.log('debug', `connect ${JSON.stringify(query)}`);
+            let added = false;
+
+            // make a client
+            let c = client.Client.fromObj(query);
+            c.setSocket(socket);
+
+            if(!c.validate()) {
+                logger.log('warn', `client validation failed - ${JSON.stringify(query)}`);
+                return;
+            }
+
+            // register the client for management
+            if(eventrouter.addClient(c)) {
+                // Send a greeting message to the client
+                socket.emit(eventrouter.serviceEvents.GREETING, {
+                    to: c.getId(),
+                    message: 'Welcome to CORD Workflow Control Service'
+                });
+
+                added = true;
+            }
+            else {
+                logger.log('warn', `client could not be added - ${JSON.stringify(query)}`);
+                socket.disconnect(true);
+            }
+
+            // set a disconnect event handler
+            socket.on('disconnect', (reason) => {
+                logger.log('debug', `disconnect ${reason} ${JSON.stringify(query)}`);
+                if(added) {
+                    eventrouter.removeClient(c.getId());
+                }
+            });
+        });
+    };
+
+    const destroySocketIO = () => {
+        io.close();
+    };
+
+    const getSocketIO = () => io;
+
+    module.exports = {
+        create: createSocketIO,
+        destroy: destroySocketIO,
+        get: getSocketIO
+    };
+
+    // USAGE
+    // const socketIo = require('./controllers/websocket.js');
+    // const socket = socketIo.get();
+    // socket.emit('eventName', data);
+
+})();
\ No newline at end of file
diff --git a/src/controllers/ws_manager.js b/src/controllers/ws_manager.js
new file mode 100644
index 0000000..b3e8877
--- /dev/null
+++ b/src/controllers/ws_manager.js
@@ -0,0 +1,328 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+    'use strict';
+
+    const _ = require('lodash');
+    const Workflow = require('../types/workflow.js');
+    const logger = require('../config/logger.js');
+
+    let serviceEvents = {
+        WORKFLOW_REG: 'cord.workflow.ctlsvc.workflow.reg',
+        WORKFLOW_REG_ESSENCE: 'cord.workflow.ctlsvc.workflow.reg_essence',
+        WORKFLOW_LIST: 'cord.workflow.ctlsvc.workflow.list',
+        WORKFLOW_RUN_LIST: 'cord.workflow.ctlsvc.workflow.run.list',
+        WORKFLOW_CHECK: 'cord.workflow.ctlsvc.workflow.check',
+        WORKFLOW_KICKSTART: 'cord.workflow.ctlsvc.workflow.kickstart',
+        WORKFLOW_REMOVE: 'cord.workflow.ctlsvc.workflow.remove',
+        WORKFLOW_RUN_REMOVE: 'cord.workflow.ctlsvc.workflow.run.remove'
+    };
+
+    // WebSocket interface for workflow registration
+    // Message format:
+    // {
+    //     topic: 'cord.workflow.ctlsvc.workflow.reg',
+    //     message: <workflow>
+    // }
+    const registWorkflow = (topic, message, cb) => {
+        const distributor = require('./eventrouter.js/index.js');
+
+        let errorMessage;
+        if(!message) {
+            // error
+            errorMessage = `Message body for topic ${topic} is null or empty`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let workflow = message;
+
+        logger.log('debug', `workflow - ${JSON.stringify(workflow)}`);
+
+        let result = distributor.addWorkflow(workflow);
+        if(!result) {
+            errorMessage = `failed to register a workflow ${workflow.getId()}`;
+            cb(errorMessage, false);
+        }
+        else {
+            cb(null, true);
+        }
+        return;
+    };
+
+    // WebSocket interface for workflow registration (via essence)
+    // Message format:
+    // {
+    //     topic: 'cord.workflow.ctlsvc.workflow.reg',
+    //     message: <workflow essence>
+    // }
+    const registerWorkflowEssence = (topic, message, cb) => {
+        const eventrouter = require('./eventrouter.js');
+        let errorMessage;
+        if(!message) {
+            // error
+            errorMessage = `Message body for topic ${topic} is null or empty`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let essence = message;
+        let result = true;
+        let errorResults = [];
+
+        logger.log('debug', `workflow essence - ${JSON.stringify(essence)}`);
+
+        let workflows = Workflow.loadWorkflowsFromEssence(essence);
+        workflows.forEach((workflow) => {
+            if(workflow) {
+                let localResult = eventrouter.addWorkflow(workflow);
+                errorResults.push(localResult);
+                result = result && localResult; // false if any of registrations fails
+            }
+        });
+
+        if(!result) {
+            errorMessage = `failed to register workflows ${errorResults}`;
+            cb(errorMessage, false);
+        }
+        else {
+            cb(null, true);
+        }
+        return;
+    };
+
+    // WebSocket interface for workflow listing
+    // Message format:
+    // {
+    //     topic: 'cord.workflow.ctlsvc.workflow.list',
+    //     message: null
+    // }
+    const listWorkflows = (topic, message, cb) => {
+        const eventrouter = require('./eventrouter.js');
+
+        let result = eventrouter.listWorkflows();
+        cb(null, result);
+        return;
+    };
+
+    // WebSocket interface for workflow run listing
+    // Message format:
+    // {
+    //     topic: 'cord.workflow.ctlsvc.workflow.list',
+    //     message: null
+    // }
+    const listWorkflowRuns = (topic, message, cb) => {
+        const eventrouter = require('./eventrouter.js');
+
+        let result = eventrouter.listWorkflowRuns();
+        cb(null, result);
+        return;
+    };
+
+    // WebSocket interface for workflow check
+    // Message format:
+    // {
+    //     topic: 'cord.workflow.ctlsvc.workflow.check',
+    //     message: <workflow_id>
+    // }
+    const checkWorkflow = (topic, message, cb) => {
+        const eventrouter = require('./eventrouter.js');
+
+        let errorMessage;
+        if(!message) {
+            // error
+            errorMessage = `Message body for topic ${topic} is null or empty`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let workflowId = message;
+        let result = eventrouter.checkWorkflow(workflowId);
+        cb(null, result);
+        return;
+    };
+
+    // WebSocket interface for workflow start notification
+    // Message format:
+    // {
+    //     topic: 'cord.workflow.ctlsvc.workflow.kickstart',
+    //     message: {
+    //          workflow_id: <workflow_id>,
+    //          workflow_run_id: <workflow_run_id>
+    //     }
+    // }
+    const notifyWorkflowStart = (topic, message, cb) => {
+        const eventrouter = require('./eventrouter.js');
+
+        let errorMessage;
+        if(!message) {
+            // error
+            errorMessage = `Message body for topic ${topic} is null or empty`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('workflow_id' in message)) {
+            // error
+            errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('workflow_run_id' in message)) {
+            // error
+            errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(message)}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let workflowRunId = message.workflow_run_id;
+
+        // there must be a workflow matching
+        // set the workflow kickstarted
+        eventrouter.setWorkflowRunKickstarted(workflowRunId);
+        cb(null, true);
+        return;
+    }
+
+    // WebSocket interface for workflow removal
+    // Message format:
+    // {
+    //     topic: 'cord.workflow.ctlsvc.workflow.remove',
+    //     message: <workflow_id>
+    // }
+    const removeWorkflow = (topic, message, cb) => {
+        const eventrouter = require('./eventrouter.js');
+
+        let workflowId = message;
+        let result = eventrouter.removeWorkflow(workflowId);
+        cb(null, result);
+        return;
+    }
+
+    // WebSocket interface for workflow run removal
+    // Message format:
+    // {
+    //     topic: 'cord.workflow.ctlsvc.workflow.run.remove',
+    //     message: {
+    //          workflow_id: <workflow_id>,
+    //          workflow_run_id: <workflow_run_id>
+    //     }
+    // }
+    const removeWorkflowRun = (topic, message, cb) => {
+        const eventrouter = require('./eventrouter.js');
+
+        let errorMessage;
+        if(!message) {
+            // error
+            errorMessage = `Message body for topic ${topic} is null or empty`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('workflow_id' in message)) {
+            // error
+            errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('workflow_run_id' in message)) {
+            // error
+            errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(message)}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let workflowRunId = message.workflow_run_id;
+
+        let result = eventrouter.removeWorkflowRun(workflowRunId);
+        cb(null, result);
+        return;
+    }
+
+    const getRouter = () => {
+        return {
+            registWorkflow: {
+                topic: serviceEvents.WORKFLOW_REG,
+                handler: registWorkflow
+            },
+            registerWorkflowEssence: {
+                topic: serviceEvents.WORKFLOW_REG_ESSENCE,
+                handler: registerWorkflowEssence
+            },
+            listWorkflows: {
+                topic: serviceEvents.WORKFLOW_LIST,
+                handler: listWorkflows
+            },
+            listWorkflowRuns: {
+                topic: serviceEvents.WORKFLOW_RUN_LIST,
+                handler: listWorkflowRuns
+            },
+            checkWorkflow: {
+                topic: serviceEvents.WORKFLOW_CHECK,
+                handler: checkWorkflow
+            },
+            notifyWorkflowStart: {
+                topic: serviceEvents.WORKFLOW_KICKSTART,
+                handler: notifyWorkflowStart,
+                return: false
+            },
+            removeWorkflow: {
+                topic: serviceEvents.WORKFLOW_REMOVE,
+                handler: removeWorkflow
+            },
+            removeWorkflowRun: {
+                topic: serviceEvents.WORKFLOW_RUN_REMOVE,
+                handler: removeWorkflowRun
+            }
+        };
+    };
+
+    // out-going commands
+    const kickstartWorkflow = (workflowId, workflowRunId) => {
+        const eventrouter = require('./eventrouter.js');
+
+        let clients = eventrouter.getWorkflowManagerClients();
+        _.forOwn(clients, (client, _clientId) => {
+            let socket = client.getSocket();
+            if(socket) {
+                socket.emit(serviceEvents.WORKFLOW_KICKSTART, {
+                    workflow_id: workflowId,
+                    workflow_run_id: workflowRunId
+                });
+            }
+        });
+        return;
+    };
+
+    module.exports = {
+        serviceEvents: serviceEvents,
+        getRouter: getRouter,
+        kickstartWorkflow: kickstartWorkflow
+    };
+})();
\ No newline at end of file
diff --git a/src/controllers/ws_workflowrun.js b/src/controllers/ws_workflowrun.js
new file mode 100644
index 0000000..f7ee474
--- /dev/null
+++ b/src/controllers/ws_workflowrun.js
@@ -0,0 +1,183 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+    'use strict';
+
+    const logger = require('../config/logger.js');
+
+    let serviceEvents = {
+        WORKFLOW_RUN_UPDATE_STATUS: 'cord.workflow.ctlsvc.workflow.run.status',
+        WORKFLOW_RUN_FETCH_EVENT: 'cord.workflow.ctlsvc.workflow.run.fetch'
+    };
+
+    // WebSocket interface for workflow status update
+    // Message format:
+    // {
+    //     topic: 'cord.workflow.ctlsvc.workflowrun.status',
+    //     message: {
+    //          workflow_id: <workflow_id>,
+    //          workflow_run_id: <workflow_run_id>,
+    //          task_id: <task_id>,
+    //          status: 'begin' or 'end'
+    //     }
+    // }
+    const updateWorkflowRunStatus = (topic, message, cb) => {
+        const eventrouter = require('./eventrouter.js');
+
+        let errorMessage;
+        if(!message) {
+            // error
+            errorMessage = `Message body for topic ${topic} is null or empty`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('workflow_id' in message)) {
+            // error
+            errorMessage = `workflow_id field is not in message body - ${message}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('workflow_run_id' in message)) {
+            // error
+            errorMessage = `workflow_run_id field is not in message body - ${message}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('task_id' in message)) {
+            // error
+            errorMessage = `task_id field is not in message body - ${message}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('status' in message)) {
+            // error
+            errorMessage = `status field is not in message body - ${message}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let result = eventrouter.updateWorkflowRunStatus(
+            message.workflow_id,
+            message.workflow_run_id,
+            message.task_id,
+            message.status.toLowerCase()
+        );
+        cb(null, result);
+        return;
+    };
+
+    // WebSocket interface for workflow status update
+    // Message format:
+    // {
+    //     topic: 'cord.workflow.ctlsvc.workflowrun.fetch',
+    //     message: {
+    //          workflow_id: <workflow_id>,
+    //          workflow_run_id: <workflow_run_id>,
+    //          task_id: <task_id>,
+    //          topic: <expected topic>
+    //     }
+    // }
+    const fetchEvent = (topic, message, cb) => {
+        const eventrouter = require('./eventrouter.js');
+
+        let errorMessage;
+        if(!message) {
+            // error
+            errorMessage = `Message body for topic ${topic} is null or empty`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('workflow_id' in message)) {
+            // error
+            errorMessage = `workflow_id field is not in message body - ${message}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('workflow_run_id' in message)) {
+            // error
+            errorMessage = `workflow_run_id field is not in message body - ${message}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('task_id' in message)) {
+            // error
+            errorMessage = `task_id field is not in message body - ${message}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('topic' in message)) {
+            // error
+            errorMessage = `topic field is not in message body - ${message}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let result = eventrouter.fetchEvent(
+            message.workflow_run_id,
+            message.task_id,
+            message.topic
+        );
+        if(result) {
+            // empty object {} when no message
+            cb(null, result);
+        }
+        else {
+            cb(
+                `could not fetch event ${message.topic} from workflow run ${message.workflow_run_id}`,
+                null
+            );
+        }
+        return;
+    };
+
+    const getRouter = () => {
+        return {
+            updateWorkflowRunStatus: {
+                topic: serviceEvents.WORKFLOW_RUN_UPDATE_STATUS,
+                handler: updateWorkflowRunStatus
+            },
+            fetchEvent: {
+                topic: serviceEvents.WORKFLOW_RUN_FETCH_EVENT,
+                handler: fetchEvent
+            }
+        };
+    };
+
+    module.exports = {
+        serviceEvents: serviceEvents,
+        getRouter: getRouter
+    };
+})();
\ No newline at end of file
diff --git a/src/server.js b/src/server.js
new file mode 100644
index 0000000..6f70d3a
--- /dev/null
+++ b/src/server.js
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+    'use strict';
+
+    const express = require('express');
+    const config = require('./config/config.js').service;
+    const bodyParser = require('body-parser');
+    const cors = require('cors');
+    const socketIo = require('./controllers/websocket.js');
+    const eventrouter = require('./controllers/eventrouter.js');
+    const workflowLoader = require('./workflows/loader.js');
+    const logger = require('./config/logger.js');
+    const rest_probe = require('./controllers/rest_probe.js');
+
+    const app = express();
+
+    // Apply middlewares
+    app.use(cors());
+    app.use(bodyParser.json());
+
+    // Set a router for intake interface
+    app.use('/', rest_probe.getRouter());
+
+    const startServer = (port) => {
+        // if is running just return it
+        if(app.server) {
+            return app.server;
+        }
+
+        const server =  app.listen(port || config.port, () => {
+            logger.info(`Express is listening to http://localhost:${port || config.port}`);
+
+            // once server is ready setup WebSocket
+            socketIo.create(server);
+
+            // load built-in workflows
+            let workflows = workflowLoader.loadAllWorkflows();
+            for(let workflow in workflows) {
+                eventrouter.addWorkflow(workflow);
+            }
+        });
+        app.server = server;
+        return server;
+    };
+
+    const stopServer = () => {
+        if(app.server) {
+            socketIo.destroy();
+            app.server.close();
+            app.server = undefined;
+            eventrouter.destroy();
+        }
+    }
+
+    if(!module.parent) {
+        startServer();
+    }
+
+    module.exports = {
+        serviceEvents: eventrouter.serviceEvents,
+        app: app,
+        start: startServer,
+        stop: stopServer
+    };
+})();
\ No newline at end of file
diff --git a/src/types/client.js b/src/types/client.js
new file mode 100644
index 0000000..02e8eb6
--- /dev/null
+++ b/src/types/client.js
@@ -0,0 +1,184 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+    'use strict';
+
+    const _ = require('lodash');
+    const logger = require('../config/logger.js');
+
+    const ClientType = {
+        PROBE: 'probe',
+        WORKFLOW_MANAGER: 'workflow_manager',
+        WORKFLOW_RUN: 'workflow_run',
+        UNKNOWN: 'unknown'
+    };
+
+    class Client {
+        constructor(id) {
+            this.id = id.toLowerCase();
+            // a field value can be one of followings
+            // - probe : message publisher
+            // - workflow_manager : workflow manager
+            // - workflow_run : workflow run
+            this.type = ClientType.UNKNOWN;
+            // used by workflow_run
+            this.workflowId = null;
+            this.workflowRunId = null;
+            this.socket = null;
+            // optional info.
+            this.params = {};
+        }
+
+        static parseClientType(strClientType) {
+            if(!strClientType) {
+                return ClientType.UNKNOWN;
+            }
+            else if(['probe', 'prb'].includes(strClientType.toLowerCase())) {
+                return ClientType.PROBE;
+            }
+            else if(['workflow_manager', 'manager'].includes(strClientType.toLowerCase())) {
+                return ClientType.WORKFLOW_MANAGER;
+            }
+            else if(['workflow_run', 'run'].includes(strClientType.toLowerCase())) {
+                return ClientType.WORKFLOW_RUN;
+            }
+            else {
+                return ClientType.UNKNOWN;
+            }
+        }
+
+        static fromObj(obj) {
+            if(obj) {
+                let client;
+                if('id' in obj) {
+                    client = new Client(obj['id']);
+                }
+                else {
+                    logger.log('error', 'id is not given');
+                    return null;
+                }
+
+                if('type' in obj) {
+                    client.setType(obj.type);
+                }
+
+                if('workflow_id' in obj) {
+                    client.setWorkflowId(obj.workflow_id);
+                }
+
+                if('workflow_run_id' in obj) {
+                    client.setWorkflowRunId(obj.workflow_run_id);
+                }
+    
+                if('socket' in obj) {
+                    client.setSocket(obj.socket);
+                }
+    
+                // all others are sent to params
+                client.params = {};
+                _.forOwn(obj, (val, key) => {
+                    client.params[key] = val;
+                });
+                return client;
+            }
+            else {
+                return null;
+            }
+        }
+
+        setId(id) {
+            this.id = id.toLowerCase();
+        }
+
+        getId() {
+            return this.id;
+        }
+
+        setType(type) {
+            let clientType = Client.parseClientType(type);
+            this.type = clientType;
+        }
+
+        getType() {
+            return this.type;
+        }
+
+        setWorkflowId(id) {
+            this.workflowId = id;
+        }
+
+        getWorkflowId() {
+            return this.workflowId;
+        }
+
+        setWorkflowRunId(id) {
+            this.workflowRunId = id;
+        }
+
+        getWorkflowRunId() {
+            return this.workflowRunId;
+        }
+
+        setParams(params={}) {
+            this.params = params;
+        }
+
+        getParams() {
+            return this.params;
+        }
+
+        setSocket(socket) {
+            this.socket = socket;
+        }
+
+        getSocket() {
+            return this.socket;
+        }
+
+        validate() {
+            // id field is required for all types of clients
+            if(!this.id) {
+                logger.log('error', 'id is not given');
+                return false;
+            }
+    
+            if(this.type === ClientType.UNKNOWN) {
+                logger.log('error', 'type is not given properly');
+                return false;
+            }
+    
+            if(this.type === ClientType.WORKFLOW_RUN) {
+                if(!this.workflowId) {
+                    logger.log('error', 'workflowId is not given');
+                    return false;
+                }
+
+                if(!this.workflowRunId) {
+                    logger.log('error', 'workflowRunId is not given');
+                    return false;
+                }
+            }
+            return true;
+        }
+    }
+    
+    module.exports = {
+        Type: ClientType,
+        Client: Client
+    };
+})();
\ No newline at end of file
diff --git a/src/types/workflow.js b/src/types/workflow.js
new file mode 100644
index 0000000..11ace2f
--- /dev/null
+++ b/src/types/workflow.js
@@ -0,0 +1,237 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+    'use strict';
+
+    const _ = require('lodash');
+    const WorkflowTask = require('./workflowtask.js');
+    const logger = require('../config/logger.js');
+
+    const loadWorkflowsFromEssence = (essence) => {
+        // an essence can have multiple workflows
+        let workflows = [];
+        _.forOwn(essence, (workflowEssence, _workflowId) => {
+            let workflow = Workflow.fromEssence(workflowEssence);
+            if(workflow) {
+                workflows.push(workflow);
+            }
+        });
+        return workflows;
+    };
+
+    class Workflow {
+        constructor(id) {
+            // dag_id
+            this.id = id;
+
+            // key: topic
+            // value: an array of WorkflowTask objects
+            this.topics = {};
+
+            // key: task id
+            // value: WorkflowTask object
+            this.tasks = {};
+
+            // preserve raw essense
+            this.essence = {};
+        }
+
+        static fromEssence(essence) {
+            if(essence) {
+                let workflow;
+                if('dag' in essence) {
+                    let dag = essence.dag;
+                    if('dag_id' in dag) {
+                        workflow = new Workflow(dag.dag_id);
+                    }
+                    else {
+                        logger.log('error', 'dag is not given');
+                        return null;
+                    }
+                }
+                else {
+                    logger.log('error', 'dag is not given');
+                    return null;
+                }
+
+                // read this to detect kickstart events
+                // use map for fast look up
+                let headTasks = {};
+                if('dependencies' in essence) {
+                    let dependencies = essence.dependencies;
+                    _.forOwn(dependencies, (dependency, taskId) => {
+                        // if the task does not have parents, it means the head task.
+                        if(!('parents' in dependency)) {
+                            // kickstart task
+                            headTasks[taskId] = true;
+                        }
+                        else {
+                            if(!dependency['parents'] || dependency['parents'].length === 0) {
+                                // empty array
+                                // kickstart task
+                                headTasks[taskId] = true;
+                            }
+                        }
+                    });
+                }
+
+                if('tasks' in essence) {
+                    let tasks = essence.tasks;
+                    _.forOwn(tasks, (taskEssence, _taskId) => {
+                        let task = WorkflowTask.WorkflowTask.fromEssence(taskEssence);
+
+                        // if its in head tasks, it has a kickstart event.
+                        if(task.getId() in headTasks) {
+                            task.setKickstart(true);
+                        }
+
+                        workflow.addTask(task);
+                    });
+                }
+
+                workflow.essence = essence;
+                return workflow;
+            }
+            return undefined;
+        }
+
+        setId(id) {
+            this.id = id;
+        }
+
+        getId() {
+            return this.id;
+        }
+
+        getTopics() {
+            let allTopics = [];
+            _.forOwn(this.topics, (_tasks, topic) => {
+                // value is an array
+                if(!allTopics.includes(topic)) {
+                    allTopics.push(topic);
+                }
+            });
+            return allTopics;
+        }
+
+        getTasksForTopic(topic) {
+            if(topic in this.topics) {
+                let workflowTasks = this.topics[topic];
+                return workflowTasks;
+            }
+            return undefined;
+        }
+
+        hasTasksForTopic(topic) {
+            if(topic in this.topics) {
+                return true;
+            }
+            return false;
+        }
+
+        getTasks() {
+            return this.tasks;
+        }
+
+        getTask(id) {
+            if(id in this.tasks) {
+                let workflowTask = this.tasks[id];
+                return workflowTask;
+            }
+            return undefined;
+        }
+
+        getKickstartTopics() {
+            let kickstartTopics = [];
+            _.forOwn(this.tasks, (task, _taskId) => {
+                if(task.isKickstart()) {
+                    let topic = task.getTopic();
+                    if(!kickstartTopics.includes(topic)) {
+                        kickstartTopics.push(topic);
+                    }
+                }
+            });
+            return kickstartTopics;
+        }
+
+        isKickstartTopic(topic) {
+            let kickstartTopics = this.getKickstartTopics();
+            if(kickstartTopics.includes(topic)) {
+                return true;
+            }
+            return false;
+        }
+
+        addTask(task) {
+            let taskId = task.getId();
+            if(taskId in this.tasks) {
+                logger.log('warn', `there exists a task with the same id - ${JSON.stringify(task)}`);
+                return false;
+            }
+
+            this.tasks[taskId] = task;
+
+            let taskTopic = task.getTopic();
+            if(taskTopic in this.topics) {
+                this.topics[taskTopic].push(task);
+            }
+            else {
+                this.topics[taskTopic] = [task];
+            }
+            return true;
+        }
+
+        setEssence(essence) {
+            this.essence = essence;
+        }
+
+        getEssence() {
+            return this.essence;
+        }
+
+        validate() {
+            if(!this.id) {
+                logger.log('error', 'id is not given');
+                return false;
+            }
+
+            if(!this.tasks || Object.keys(this.tasks).length > 0) {
+                logger.log('error', 'task is not given');
+                return false;
+            }
+
+            let countKickstartEvent = 0;
+            _.forOwn(this.tasks, (task, _taskId) => {
+                if(task.isKickstart()) {
+                    countKickstartEvent++;
+                }
+            });
+
+            if(countKickstartEvent <= 0) {
+                logger.log('error', 'kickstart event is not given');
+                return false;
+            }
+            return true;
+        }
+    }
+
+    module.exports = {
+        Workflow: Workflow,
+        loadWorkflowsFromEssence: loadWorkflowsFromEssence
+    };
+})();
\ No newline at end of file
diff --git a/src/types/workflowrun.js b/src/types/workflowrun.js
new file mode 100644
index 0000000..65027ff
--- /dev/null
+++ b/src/types/workflowrun.js
@@ -0,0 +1,416 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+    'use strict';
+
+    const _ = require('lodash');
+    const dateformat = require('dateformat');
+    const WorkflowRunTask = require('./workflowruntask.js');
+    const logger = require('../config/logger.js');
+
+    class WorkflowRun {
+        constructor(workflowId, workflowRunId) {
+            // workflow run id (dag_run_id)
+            this.id = workflowRunId;
+            // workflow id
+            this.workflowId = workflowId;
+
+            // workflow run tasks - for storing status
+            // id: task id
+            // value : workflow run task obj
+            this.runTasks = {};
+
+            // storing key-field, key-value pairs for <event, workflow run> mapping
+            // key: topic
+            // value: [{
+            //      field:
+            //      value:
+            // }, ...]
+            this.eventKeyFieldValues = {};
+
+            // client ids
+            this.clientIds = [];
+
+            // event queue
+            // {
+            //      topic: topic,
+            //      message: message
+            // }
+            this.eventQueue = [];
+            // trash bins
+            // dequeued events are sent to this queue
+            // for debugging
+            this.trashEventQueue = [];
+
+            this.kickstarted = false;
+            this.finished = false;
+        }
+
+        static makeWorkflowRunId(workflowId) {
+            let now = new Date();
+            let datetimestr = dateformat(now, 'yyyymmdd_HHMMssl');
+            return `${workflowId}_${datetimestr}`;
+        }
+
+        static makeNewRun(workflow) {
+            let workflowId = workflow.getId();
+            let workflowRunId = WorkflowRun.makeWorkflowRunId(workflowId);
+            let workflowRun = new WorkflowRun(workflowId, workflowRunId);
+
+            let tasks = workflow.getTasks();
+            _.forOwn(tasks, (task, taskId) => {
+                // set run tasks
+                let runTask = new WorkflowRunTask.WorkflowRunTask(taskId);
+                workflowRun.addRunTask(runTask);
+
+                // set key_field / value
+                workflowRun.setEventKeyFieldValue(task.getTopic(), task.getKeyField(), null); // init
+            });
+            return workflowRun;
+        }
+
+        setId(id) {
+            this.id = id;
+        }
+
+        getId() {
+            return this.id;
+        }
+
+        setWorkflowId(workflowId) {
+            this.workflowId = workflowId;
+        }
+
+        getWorkflowId() {
+            return this.workflowId;
+        }
+
+        addRunTask(runTask) {
+            this.runTasks[runTask.getTaskId()] = runTask;
+        }
+
+        getRunTask(taskId) {
+            if(taskId in this.runTasks) {
+                return this.runTasks[taskId];
+            }
+            return undefined;
+        }
+
+        getTaskStatus(taskId) {
+            return this.runTasks[taskId].getStatus();
+        }
+
+        updateTaskStatus(taskId, status) {
+            let runTask = this.runTasks[taskId].getStatus();
+            runTask.setStatus(status);
+        }
+
+        setEventKeyFieldValue(topic, field, value=null) {
+            let keyFieldValues;
+            if(!(topic in this.eventKeyFieldValues)) {
+                keyFieldValues = [];
+                // put a new empty array
+                this.eventKeyFieldValues[topic] = keyFieldValues;
+            }
+            else {
+                keyFieldValues = this.eventKeyFieldValues[topic];
+            }
+
+            let index = _.findIndex(keyFieldValues, (keyFieldValue) => {
+                return keyFieldValue.field === field;
+            });
+
+            if(index >= 0) {
+                // update
+                keyFieldValues[index] = {
+                    field: field,
+                    value: value
+                };
+            }
+            else {
+                // push a new
+                keyFieldValues.push({
+                    field: field,
+                    value: value
+                });
+            }
+            return true;
+        }
+
+        isEventKeyFieldValueAcceptable(topic, field, value) {
+            if(!(topic in this.eventKeyFieldValues)) {
+                // topic does not exist
+                return false;
+            }
+
+            let keyFieldValues = this.eventKeyFieldValues[topic];
+            let index = _.findIndex(keyFieldValues, (keyFieldValue) => {
+                return (keyFieldValue.field === field) &&
+                    ((!keyFieldValue.value) || (keyFieldValue.value === value));
+            });
+
+            if(index >= 0) {
+                return true;
+            }
+            return false;
+        }
+
+        isEventAcceptableByKeyFieldValue(topic, message) {
+            if(!(topic in this.eventKeyFieldValues)) {
+                // topic does not exist
+                return false;
+            }
+
+            let keyFieldValues = this.eventKeyFieldValues[topic];
+            keyFieldValues.forEach((keyFieldValue) => {
+                if(keyFieldValue.field in message) {
+                    // has same field in the message
+                    // check value
+                    if(keyFieldValue.value === message[keyFieldValue.field]) {
+                        // has the same value
+                        return true;
+                    }
+                }
+            });
+            return false;
+        }
+
+        getFilteredRunTasks(includes, excludes) {
+            // returns tasks with filters
+            let includeStatuses=[];
+            let excludeStatuses=[];
+            let includeAll = false;
+
+            if(includes) {
+                if(Array.isArray(includes)) {
+                    // array
+                    includes.forEach((include) => {
+                        if(!includeStatuses.includes(include)) {
+                            includeStatuses.push(include);
+                        }
+                    });
+                }
+                else {
+                    includeStatuses.push(includes);
+                }
+            }
+            else {
+                // undefined or null
+                // include all
+                includeAll = true;
+            }
+
+            if(excludes) {
+                if(Array.isArray(excludes)) {
+                    // array
+                    excludes.forEach((exclude) => {
+                        if(!excludeStatuses.includes(exclude)) {
+                            excludeStatuses.push(exclude);
+                        }
+                    });
+                }
+                else {
+                    excludeStatuses.push(excludes);
+                }
+            }
+            else {
+                // in this case, nothing will be excluded
+                // leave the array empty
+            }
+
+            let filteredRunTasks = [];
+            _.forOwn(this.runTasks, (runTask, _runTaskId) => {
+                // 'excludes' has a higher priority than 'includes'
+                if(!excludes.includes(runTask.getStatus())) {
+                    if(includeAll || includes.includes(runTask.getStatus())) {
+                        // screen tasks that are not finished
+                        filteredRunTasks.push(runTask);
+                    }
+                }
+            });
+            return filteredRunTasks;
+        }
+
+        getFilteredTopics(workflow, includes, excludes) {
+            // returns topics with filters
+            let filteredRunTasks = this.getFilteredRunTasks(includes, excludes);
+            let filteredTopics = [];
+
+            filteredRunTasks.forEach((runTask) => {
+                let taskId = runTask.getTaskId();
+                let task = workflow.getTask(taskId);
+                let topic = task.getTopic();
+                if(!filteredTopics.includes(topic)) {
+                    filteredTopics.push(topic);
+                }
+            });
+            return filteredTopics;
+        }
+
+        getAllTopics(workflow) {
+            return this.getFilteredTopics(workflow, null, null);
+        }
+
+        getAcceptableTopics(workflow) {
+            // return topics for tasks that are running or to be run in the future
+            // include all tasks that are not ended
+            return this.getFilteredTopics(workflow, null, [WorkflowRunTask.TaskStatus.END]);
+        }
+
+        isTopicAcceptable(workflow, topic) {
+            // get topics of tasks that are not completed yet
+            let filteredTopics = this.getFilteredTopics(
+                workflow,
+                null,
+                [WorkflowRunTask.TaskStatus.END]
+            );
+
+            if(filteredTopics.includes(topic)) {
+                return true;
+            }
+            else {
+                return false;
+            }
+        }
+
+        isEventAcceptable(workflow, topic, message) {
+            // event is acceptable if it meets following criteria
+            // 1) the workflow is currently interested in the same topic
+            //      (finished tasks are not counted)
+            // 2) the task's key field and value
+            if(this.isTopicAcceptable(workflow, topic) &&
+                this.isEventAcceptableByKeyFieldValue(topic, message)) {
+                return true;
+            }
+            return false;
+        }
+
+        addClientId(clientId) {
+            if(!this.clientIds.includes(clientId)) {
+                this.clientIds.push(clientId);
+            }
+        }
+
+        removeClientId(clientId) {
+            _.pull(this.clientIds, clientId);
+        }
+
+        getClientIds() {
+            return this.clientIds;
+        }
+
+        enqueueEvent(topic, message) {
+            this.eventQueue.push({
+                topic: topic,
+                message: message
+            });
+        }
+
+        peekEvent() {
+            // if the queue is empty, this returns undefined
+            if(this.eventQueue.length > 0) {
+                return this.eventQueue[0];
+            }
+            return undefined;
+        }
+
+        dequeueEvent() {
+            // if the queue is empty, this returns undefined
+            if(this.eventQueue.length > 0) {
+                let events = _.pullAt(this.eventQueue, [0]);
+
+                // move to trash
+                this.trashEventQueue.push(events[0]);
+                return events[0];
+            }
+            return undefined;
+        }
+
+        peekEventByTopic(topic) {
+            // if the queue is empty, this returns undefined
+            let index = _.findIndex(this.eventQueue, (event) => {
+                return event.topic === topic;
+            });
+
+            if(index >= 0) {
+                return this.eventQueue[index];
+            }
+            return undefined;
+        }
+
+        dequeueEventByTopic(topic) {
+            // find event by topic.
+            // returns only first item in the queue
+            // if the queue is empty, this returns undefined
+            let index = _.findIndex(this.eventQueue, (event) => {
+                return event.topic === topic;
+            });
+
+            if(index >= 0) {
+                let events = _.pullAt(this.eventQueue, [index]);
+
+                // move to trash
+                this.trashEventQueue.push(events[0]);
+                return events[0];
+            }
+            return undefined;
+        }
+
+        getTrashEvents() {
+            return this.trashEventQueue;
+        }
+
+        lengthEventQueue() {
+            return this.eventQueue.length;
+        }
+
+        setKickstarted() {
+            this.kickstarted = true;
+        }
+
+        isKickstarted() {
+            return this.kickstarted;
+        }
+
+        setFinished() {
+            this.finished = true;
+        }
+
+        isFinished() {
+            return this.finished;
+        }
+
+        validate() {
+            if(!this.id) {
+                logger.log('error', 'id is not given');
+                return false;
+            }
+
+            if(!this.workflowId) {
+                logger.log('error', 'workflowId is not given');
+                return false;
+            }
+
+            return true;
+        }
+    }
+
+    module.exports = {
+        WorkflowRun: WorkflowRun
+    };
+})();
\ No newline at end of file
diff --git a/src/types/workflowruntask.js b/src/types/workflowruntask.js
new file mode 100644
index 0000000..fbaa604
--- /dev/null
+++ b/src/types/workflowruntask.js
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+    'use strict';
+
+    const logger = require('../config/logger.js');
+
+    const TaskStatus = {
+        INIT: 'init',
+        BEGIN: 'begin',
+        END: 'end',
+        UNKNOWN: 'unknown'
+    };
+
+    class WorkflowRunTask {
+        constructor(taskId) {
+            this.taskId = taskId;
+            this.status = TaskStatus.UNKNOWN;
+        }
+
+        static parseStatus(strTaskStatus) {
+            if(!strTaskStatus) {
+                return TaskStatus.UNKNOWN;
+            }
+            else if(['i', 'init'].includes(strTaskStatus.toLowerCase())) {
+                return TaskStatus.END;
+            }
+            else if(['b', 'begin', 'start'].includes(strTaskStatus.toLowerCase())) {
+                return TaskStatus.BEGIN;
+            }
+            else if(['e', 'end', 'finish'].includes(strTaskStatus.toLowerCase())) {
+                return TaskStatus.END;
+            }
+            else {
+                return TaskStatus.UNKNOWN;
+            }
+        }
+
+        setTaskId(id) {
+            this.taskId = id;
+        }
+
+        getTaskId() {
+            return this.taskId;
+        }
+
+        setStatus(status) {
+            let taskStatus = WorkflowRunTask.parseStatus(status);
+            this.status = taskStatus;
+        }
+
+        getStatus() {
+            return this.status;
+        }
+
+        validate() {
+            if(!this.taskId) {
+                logger.log('error', 'id is not given');
+                return false;
+            }
+
+            if(!this.status) {
+                logger.log('error', 'status is not given');
+                return false;
+            }
+    
+            return true;
+        }
+    }
+    
+    module.exports = {
+        TaskStatus: TaskStatus,
+        WorkflowRunTask: WorkflowRunTask
+    };
+})();
\ No newline at end of file
diff --git a/src/types/workflowtask.js b/src/types/workflowtask.js
new file mode 100644
index 0000000..efde42b
--- /dev/null
+++ b/src/types/workflowtask.js
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+    'use strict';
+
+    const logger = require('../config/logger.js');
+
+    class WorkflowTask {
+        constructor(id, kickstart=false) {
+            this.id = id;
+            this.topic = null;
+            this.kickstart = kickstart;
+            this.keyField = null;
+            this.essence = {};
+        }
+
+        static fromEssence(essence) {
+            if(essence) {
+                let workflowTask;
+                if('task_id' in essence) {
+                    workflowTask = new WorkflowTask(essence.task_id);
+                }
+                else {
+                    logger.log('error', 'task_id is not given');
+                    return null;
+                }
+
+                if('topic' in essence) {
+                    workflowTask.setTopic(essence.topic);
+                }
+
+                if('model_name' in essence) {
+                    workflowTask.setTopic('datamodel.' + essence.model_name + '.create');
+                    workflowTask.setTopic('datamodel.' + essence.model_name + '.update');
+                    workflowTask.setTopic('datamodel.' + essence.model_name + '.delete');
+                }
+
+                if('key_field' in essence) {
+                    workflowTask.setKeyField(essence.key_field);
+                }
+    
+                workflowTask.setEssence(essence);
+                return workflowTask;
+            }
+            else {
+                return null;
+            }
+        }
+
+        setId(id) {
+            this.id = id;
+        }
+
+        getId() {
+            return this.id;
+        }
+
+        setTopic(topic) {
+            this.topic = topic;
+        }
+
+        getTopic() {
+            return this.topic;
+        }
+
+        setKickstart(kickstart=false) {
+            this.kickstart = kickstart;
+        }
+
+        isKickstart() {
+            return this.kickstart;
+        }
+
+        setKeyField(keyField) {
+            this.keyField = keyField;
+        }
+        
+        getKeyField() {
+            return this.keyField;
+        }
+
+        setEssence(essence) {
+            this.essence = essence;
+        }
+
+        getEssence() {
+            return this.essence;
+        }
+
+        validate() {
+            if(!this.id) {
+                logger.log('error', 'id is not given');
+                return false;
+            }
+
+            // general Airflow operators other than XOS operators don't have these fields.
+            // 
+            // if(!this.topic) {
+            //     logger.log('error', 'topic is not given');
+            //     return false;
+            // }
+    
+            // if(!this.keyField) {
+            //     logger.log('error', 'keyField is not given');
+            //     return false;
+            // }
+    
+            return true;
+        }
+    }
+    
+    module.exports = {
+        WorkflowTask: WorkflowTask
+    };
+})();
\ No newline at end of file
diff --git a/src/workflows/hello_workflow.json b/src/workflows/hello_workflow.json
new file mode 100644
index 0000000..ebf94af
--- /dev/null
+++ b/src/workflows/hello_workflow.json
@@ -0,0 +1,24 @@
+{
+    "hello_workflow": {
+        "dag": {
+            "dag_id": "hello_workflow",
+            "local_variable": "dag_hello"
+        },
+        "dependencies": {
+            "onu_event_handler": {}
+        },
+        "tasks": {
+            "onu_event_handler": {
+                "class": "XOSEventSensor",
+                "dag": "dag_hello",
+                "key_field": "serialNumber",
+                "local_variable": "onu_event_handler",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "ONU_event",
+                "task_id": "onu_event_handler",
+                "topic": "onu.events"
+            }
+        }
+    }
+}
diff --git a/src/workflows/loader.js b/src/workflows/loader.js
new file mode 100644
index 0000000..a7bc275
--- /dev/null
+++ b/src/workflows/loader.js
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+(function () {
+    'use strict';
+
+    const path = require('path');
+    const fs = require('fs');
+    const _ = require('lodash');
+    const Workflow = require('../types/workflow.js');
+    const logger = require('../config/logger.js');
+
+    const loadEssence = (essenceFilename, absPath=false) => {
+        let filepath;
+        if(!absPath) {
+            filepath = path.join(__dirname, essenceFilename);
+        }
+        else {
+            filepath = essenceFilename;
+        }
+
+        try {
+            if (fs.existsSync(filepath)) {
+                logger.log('debug', `Loading an essence - ${filepath}`);
+                let rawdata = fs.readFileSync(filepath);
+                let essence = null;
+                try {
+                    essence = JSON.parse(rawdata);
+                }
+                catch (objError) {
+                    if (objError instanceof SyntaxError) {
+                        logger.log('warn', `failed to parse a json data (syntax error) - ${rawdata}`);
+                    }
+                    else {
+                        logger.log('warn', `failed to parse a json data - ${rawdata}`);
+                    }
+                }
+                return essence;
+            }
+            else {
+                logger.log('warn', `No ${filepath} found`);
+                return null;
+            }
+        }
+        catch(e) {
+            logger.log('warn', `Cannot read ${filepath} - ${e}`);
+            return null;
+        }
+    };
+
+    const loadWorkflows = (essenceFilename) => {
+        let filepath = path.join(__dirname, essenceFilename);
+
+        try {
+            if (fs.existsSync(filepath)) {
+                logger.log('debug', `Loading an essence - ${filepath}`);
+                let rawdata = fs.readFileSync(filepath);
+                let workflows = [];
+
+                try {
+                    let essence = JSON.parse(rawdata);
+
+                    // an essence can have multiple workflows
+                    _.forOwn(essence, (workflowEssence, workflowId) => {
+                        let workflow = Workflow.Workflow.fromEssence(workflowEssence);
+                        if(workflow) {
+                            workflows.push(workflow);
+                            logger.log('debug', `Loaded workflow: ${workflowId}`);
+                        }
+                    });
+                }
+                catch (objError) {
+                    if (objError instanceof SyntaxError) {
+                        logger.log('warn', `failed to parse a json data (syntax error) - ${rawdata}`);
+                    }
+                    else {
+                        logger.log('warn', `failed to parse a json data - ${rawdata}`);
+                    }
+                }
+
+                return workflows
+            }
+            else {
+                logger.log('warn', `No ${filepath} found`);
+                return null;
+            }
+        }
+        catch(e) {
+            logger.log('warn', `Cannot read ${filepath} - ${e}`);
+            return null;
+        }
+    };
+
+    const loadAllWorkflows = () => {
+        let dirpath = __dirname;
+
+        let allWorkflows = [];
+        let dirEntries = fs.readdirSync(dirpath);
+        dirEntries.forEach((dirEntry) => {
+            if(dirEntry.endsWith('.json')) {
+                // found workflow essence file in json format
+                let workflows = loadWorkflows(dirEntry);
+                if(workflows) {
+                    for(let workflow in workflows) {
+                        allWorkflows.push[workflow];
+                    }
+                }
+            }
+        });
+        return allWorkflows;
+    };
+
+    module.exports = {
+        loadEssence: loadEssence,
+        loadAllWorkflows: loadAllWorkflows,
+        loadWorkflows: loadWorkflows
+    };
+})();
\ No newline at end of file