Implement basic functionalities for workflow control.
- Manage join/leave of clients
- All clients communicate via socket.io
- Probes emit events
- Managers register workflows (by using a workflow essence)
- Send kickstart request to Managers to launch workflows
- Route events to workflow runs
- Queue events to not lose events between workflow tasks
- Fixed some issues found while working on testcases
- Set to perform coverage and unittest and generate outputs to files

Change-Id: I678723edc20df9247d63a4bf6380785ab8b2b221
diff --git a/src/controllers/eventrouter.js b/src/controllers/eventrouter.js
new file mode 100644
index 0000000..82d27db
--- /dev/null
+++ b/src/controllers/eventrouter.js
@@ -0,0 +1,512 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+    'use strict';
+
+    const _ = require('lodash');
+    const logger = require('../config/logger.js');
+    const Client = require('../types/client.js');
+    const WorkflowRun = require('../types/workflowrun.js');
+    const ws_manager = require('./ws_manager.js');
+    const ws_workflowrun = require('./ws_workflowrun.js');
+
+    let allClients = {}; // has publishers and subscribers
+    let probeClients = {}; // a subset of clients
+    let workflowManagerClients = {}; // a subset of clients
+    let workflowRunClients = {}; // a subset of clients
+
+    //let io;
+
+    // key: workflow id
+    // value: Workflow instance
+    let workflows = {};
+
+    // key: workflow run id
+    // value: WorkflowRun instance
+    let workflowRuns = {};
+
+    let serviceEvents = {
+        GREETING: 'cord.workflow.ctlsvc.greeting'
+    };
+
+    // add ws_mgroperation events
+    _.forOwn(ws_manager.serviceEvents, (wsServiceEvent, key) => {
+        serviceEvents[key] = wsServiceEvent;
+    });
+
+    // add ws_runoperation events
+    _.forOwn(ws_workflowrun.serviceEvents, (wsServiceEvent, key) => {
+        serviceEvents[key] = wsServiceEvent;
+    });
+
+    //const setIO = (ioInstance) => {
+    //    io = ioInstance;
+    //};
+
+    const destroy = () => {
+        removeClients();
+        clearWorkflowRuns();
+        clearWorkflows();
+    };
+
+    const listWorkflows = () => {
+        let workflowList = [];
+        _.forOwn(workflows, (_workflow, workflowId) => {
+            workflowList.push(workflowId);
+        });
+        return workflowList;
+    };
+
+    const checkWorkflow = (workflowId) => {
+        if(workflowId in workflows) {
+            return true;
+        }
+        return false;
+    };
+
+    const addWorkflow = (workflow) => {
+        if(workflow.getId() in workflows) {
+            logger.log('error', `there exists a workflow with the same id - ${workflow.getId()}`);
+            return false;
+        }
+
+        let workflowId = workflow.getId();
+        workflows[workflowId] = workflow;
+        return true;
+    };
+
+    const clearWorkflows = () => {
+        _.forOwn(workflows, (_workflow, workflowId) => {
+            delete workflows[workflowId];
+        });
+    };
+
+    const listWorkflowRuns = () => {
+        let workflowRunList = [];
+        _.forOwn(workflowRuns, (_workflowRun, workflowRunId) => {
+            workflowRunList.push(workflowRunId);
+        });
+        return workflowRunList;
+    };
+
+    const checkWorkflowRun = (workflowRunId) => {
+        if(workflowRunId in workflowRuns) {
+            return true;
+        }
+        return false;
+    };
+
+    const addWorkflowRun = (workflowRun) => {
+        let workflowId = workflowRun.getWorkflowId();
+        let workflowRunId = workflowRun.getId();
+
+        if(workflowRunId in workflowRuns) {
+            logger.log('warn', `there exists a workflow run with the same id - ${workflowRunId}`);
+            return false;
+        }
+
+        if(!(workflowId in workflows)) {
+            logger.log('warn', `cannot find a workflow with id - ${workflowId}`);
+            return false;
+        }
+
+        workflowRuns[workflowRunId] = workflowRun;
+        return true;
+    };
+
+    const clearWorkflowRuns = () => {
+        _.forOwn(workflowRuns, (_workflowRun, workflowRunId) => {
+            delete workflowRuns[workflowRunId];
+        });
+    };
+
+    const updateWorkflowRunStatus = (workflowRunId, taskId, status) => {
+        if(!(workflowRunId in workflowRuns)) {
+            logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
+            return false;
+        }
+
+        let workflowRun = workflowRuns[workflowRunId];
+        workflowRun.updateTaskStatus(taskId, status);
+        return true;
+    };
+
+    const setWorkflowRunKickstarted = (workflowRunId) => {
+        if(!(workflowRunId in workflowRuns)) {
+            logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
+            return false;
+        }
+
+        let workflowRun = workflowRuns[workflowRunId];
+        workflowRun.setKickstarted();
+        return true;
+    };
+
+    const kickstart = (workflowId, workflowRunId) => {
+        if(!(workflowId in workflows)) {
+            logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
+            return false;
+        }
+
+        if(!(workflowRunId in workflowRuns)) {
+            logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
+            return false;
+        }
+
+        ws_manager.kickstartWorkflow(workflowId, workflowRunId);
+    };
+
+    const removeWorkflow = (workflowId) => {
+        if(!(workflowId in workflows)) {
+            logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
+            return false;
+        }
+
+        // check if there are workflow runs
+        _.forOwn(workflowRuns, (workflowRun, _workflowRunId) => {
+            if(workflowRun.getWorkflowId() === workflowId) {
+                logger.log('warn', `there exists a workflow run for a workflow id - ${workflowId}`);
+                return false;
+            }
+        });
+
+        delete workflows[workflowId];
+        return true;
+    };
+
+    const removeWorkflowRun = (workflowRunId) => {
+        if(!(workflowRunId in workflowRuns)) {
+            logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
+            return false;
+        }
+
+        let workflowRun = workflowRuns[workflowRunId];
+        delete workflowRuns[workflowRunId];
+
+        workflowRun.setFinished();
+        return true;
+    };
+
+    const sendEvent = (topic, message) => {
+        // list of workflowIds
+        // to check if there are workflow runs for the events
+        let workflowIdsRunning = [];
+
+        // route event to running instances
+        _.forOwn(workflowRuns, (workflowRun, workflowRunId) => {
+            let workflowId = workflowRun.getWorkflowId();
+            let workflow = workflows[workflowId];
+
+            // event will be routed to workflow runs that meet following criteria
+            // 1) the workflow is currently interested in the same topic
+            //      (already finished tasks are not counted)
+            // 2) the task's key field and value
+            if(workflowRun.isEventAcceptable(workflow, topic, message)) {
+                logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
+                workflowRun.enqueueEvent(topic, message);
+
+                if(!workflowIdsRunning.includes(workflowId)) {
+                    workflowIdsRunning.push(workflowId);
+                }
+            }
+        });
+
+        // check if the event is a kickstart event
+        _.forOwn(workflows, (workflow, workflowId) => {
+            if(workflow.isKickstartTopic(topic)) {
+                // check if there is a workflow run for the event
+                // kickstart a workflow if there is no workflows runs for the event
+                if(!workflowIdsRunning.includes(workflowId)) {
+                    // we need to buffer the event until workflow run is brought up
+                    let workflowRun = WorkflowRun.WorkflowRun.makeNewRun(workflow);
+                    let workflowRunId = workflowRun.getId();
+
+                    // register for management
+                    workflowRuns[workflowRunId] = workflowRun;
+
+                    // route event
+                    logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
+                    workflowRun.enqueueEvent(topic, message);
+
+                    // KICKSTART!
+                    kickstart(workflowId, workflowRunId);
+                }
+            }
+        });
+    };
+
+    const fetchEvent = (workflowRunId, taskId, topic) => {
+        // this returns an event or an empty obj when there is no message
+        if(!(workflowRunId in workflowRuns)) {
+            logger.log('warn', `workflow run ${workflowRunId} does not exist`);
+            return null;
+        }
+
+        let workflowRun = workflowRuns[workflowRunId];
+        let workflowId = workflowRun.getWorkflowId();
+
+        if(!(workflowId in workflows)) {
+            logger.log('warn', `workflow ${workflowId} does not exist`);
+            return null;
+        }
+
+        let workflow = workflows[workflowId];
+
+        let task = workflow.getTask(taskId);
+        if(!task) {
+            logger.log('warn', `workflow ${workflowId} does not have task ${taskId}`);
+            return null;
+        }
+
+        logger.log('debug', `workflow run ${workflowRunId}, task ${taskId} fetches an event`);
+
+        let event = workflowRun.dequeueEvent(topic);
+        if(event) {
+            return event;
+        }
+        else {
+            return {};
+        }
+    };
+
+    const addClient = (c) => {
+        let clientId = c.getId();
+        let socket = c.getSocket();
+
+        // check id that client is already there
+        if(clientId in allClients) {
+            logger.log('warn', `there exists a client with the same id - ${clientId}`);
+            return false;
+        }
+
+        if(c.getType() === Client.Type.PROBE) {
+            // probe' messages are relayed
+            // relay messages based on topic
+            // probe protocol:
+            // REQ:
+            //      topic: event topic
+            //      message: <data>
+            // RES:
+            //      topic: topic sent
+            //      message: {result: <true/false>, message: <error message> }
+            allClients[clientId] = c;
+            probeClients[clientId] = c;
+
+            socket.on('*', (msg) => {
+                let jsonMsg = msg.data;
+                if(jsonMsg.length === 2) {
+                    // must have two parts
+                    // first part is topic
+                    // second part is message body
+                    let topic = jsonMsg[0];
+                    let messageBody = jsonMsg[1];
+
+                    sendEvent(topic, messageBody);
+
+                    // return true for success
+                    socket.emit(topic, {
+                        result: true
+                    });
+                }
+                else {
+                    logger.log('warn', `unexpected message is received - ${JSON.stringify(jsonMsg)}`);
+                    socket.emit(jsonMsg[0], {
+                        result: false,
+                        message: `unexpected message is received - ${JSON.stringify(jsonMsg)}`
+                    });
+                }
+            });
+            return true;
+        }
+        else if(c.getType() === Client.Type.WORKFLOW_MANAGER) {
+            // manager
+            // manager protocol:
+            // REQ:
+            //      topic: operation
+            //      message: <data>
+            // RES:
+            //      topic: topic sent
+            //      message: {result: <true/false>, message: <error message> }F
+            allClients[clientId] = c;
+            workflowManagerClients[clientId] = c;
+
+            // attach manager operations
+            let router = ws_manager.getRouter();
+            _.forOwn(router, (routerElem, _key) => {
+                socket.on(routerElem.topic, (msg) => {
+                    routerElem.handler(routerElem.topic, msg, (err, result) => {
+                        if(err) {
+                            logger.log('warn', `unable to handle a message - ${err}`);
+                            socket.emit(routerElem.topic, {
+                                result: false,
+                                message: err
+                            });
+                            return;
+                        }
+
+                        if(routerElem.return === undefined || routerElem.return) {
+                            socket.emit(routerElem.topic, {
+                                result: result
+                            });
+                        }
+                    });
+                });
+            });
+            return true;
+        }
+        else if(c.getType() === Client.Type.WORKFLOW_RUN) {
+            // workflow run
+            // workflow run protocol:
+            // REQ:
+            //      topic: operation
+            //      message: <data>
+            // RES:
+            //      topic: topic sent
+            //      message: {result: <true/false>, message: <error message> }
+
+            // map to WorkflowRun instance
+            let workflowId = c.getWorkflowId();
+            let workflowRunId = c.getWorkflowRunId();
+            let workflowRun;
+
+            if(!(workflowId in workflows)) {
+                logger.log('warn', `cannot find a workflow ${workflowId}`);
+                return false;
+            }
+
+            // register client to workflow run
+            if(!(workflowRunId in workflowRuns)) {
+                // workflow run not exist yet
+                logger.log('warn', `cannot find a workflow run ${workflowRunId}`);
+                return false;
+            }
+
+            //let workflow = workflows[workflowId];
+
+            allClients[clientId] = c;
+            workflowRunClients[clientId] = c;
+
+            // update
+            workflowRun = workflowRuns[workflowRunId];
+            workflowRun.addClientId(clientId);
+
+            // attach workflow run operations
+            let router = ws_workflowrun.getRouter();
+            _.forOwn(router, (routerElem, _key) => {
+                socket.on(routerElem.topic, (msg) => {
+                    routerElem.handler(routerElem.topic, msg, (err, result) => {
+                        if(err) {
+                            logger.log('warn', `unable to handle a message - ${err}`);
+                            socket.emit(routerElem.topic, {
+                                result: false,
+                                message: err
+                            });
+                            return;
+                        }
+
+                        if(routerElem.return === undefined || routerElem.return) {
+                            socket.emit(routerElem.topic, {
+                                result: result
+                            });
+                        }
+                    });
+                });
+            });
+            return true;
+        }
+        return false;
+    };
+
+    const removeClient = (id) => {
+        if(id in allClients) {
+            let removedClient = allClients[id];
+            delete allClients[id];
+
+            let type = removedClient.getType();
+            if(type === Client.Type.PROBE) {
+                delete probeClients[id];
+            }
+            else if(type === Client.Type.WORKFLOW_MANAGER) {
+                delete workflowManagerClients[id];
+            }
+            else if(type === Client.Type.WORKFLOW_RUN) {
+                delete workflowRunClients[id];
+
+                let workflowRunId = removedClient.getWorkflowRunId();
+                let workflowRun = workflowRuns[workflowRunId];
+
+                if(workflowRun) {
+                    workflowRun.removeClientId(id);
+
+                    //TODO
+                    // WorkflowRun can have no clients between tasks
+                    // So we should not remove the run until the workflow run finishes
+                }
+            }
+        }
+    };
+
+    const removeClients = () => {
+        let probeClients = {};
+
+        _.forOwn(probeClients, (_probeClient, clientId) => {
+            delete probeClients[clientId];
+        });
+
+        _.forOwn(workflowManagerClients, (_workflowManagerClient, clientId) => {
+            delete workflowManagerClients[clientId];
+        });
+
+        _.forOwn(workflowRunClients, (_workflowRunClients, clientId) => {
+            delete workflowRunClients[clientId];
+        });
+
+        _.forOwn(allClients, (client, clientId) => {
+            client.getSocket().disconnect(true);
+            delete allClients[clientId];
+        });
+    }
+
+    module.exports = {
+        serviceEvents: serviceEvents,
+        destroy: destroy,
+        getClients: () => { return allClients; },
+        getProbeClients: () => { return probeClients; },
+        getWorkflowManagerClients: () => { return workflowManagerClients; },
+        getWorkflowRunClients: () => { return workflowRunClients; },
+        clientType: Client.Type,
+        //setIO: setIO,
+        sendEvent: sendEvent,
+        fetchEvent: fetchEvent,
+        addClient: addClient,
+        removeClient: removeClient,
+        removeClients: removeClients,
+        addWorkflow: addWorkflow,
+        listWorkflows: listWorkflows,
+        checkWorkflow: checkWorkflow,
+        removeWorkflow: removeWorkflow,
+        clearWorkflows: clearWorkflows,
+        addWorkflowRun: addWorkflowRun,
+        listWorkflowRuns: listWorkflowRuns,
+        checkWorkflowRun: checkWorkflowRun,
+        removeWorkflowRun: removeWorkflowRun,
+        clearWorkflowRuns: clearWorkflowRuns,
+        updateWorkflowRunStatus: updateWorkflowRunStatus,
+        setWorkflowRunKickstarted: setWorkflowRunKickstarted,
+    };
+})();
\ No newline at end of file