Implement basic functionalities for workflow control.
- Manage join/leave of clients
- All clients communicate via socket.io
- Probes emit events
- Managers register workflows (by using a workflow essence)
- Send kickstart request to Managers to launch workflows
- Route events to workflow runs
- Queue events to not lose events between workflow tasks
- Fixed some issues found while working on testcases
- Set to perform coverage and unittest and generate outputs to files
Change-Id: I678723edc20df9247d63a4bf6380785ab8b2b221
diff --git a/src/controllers/eventrouter.js b/src/controllers/eventrouter.js
new file mode 100644
index 0000000..82d27db
--- /dev/null
+++ b/src/controllers/eventrouter.js
@@ -0,0 +1,512 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+ 'use strict';
+
+ const _ = require('lodash');
+ const logger = require('../config/logger.js');
+ const Client = require('../types/client.js');
+ const WorkflowRun = require('../types/workflowrun.js');
+ const ws_manager = require('./ws_manager.js');
+ const ws_workflowrun = require('./ws_workflowrun.js');
+
+ let allClients = {}; // has publishers and subscribers
+ let probeClients = {}; // a subset of clients
+ let workflowManagerClients = {}; // a subset of clients
+ let workflowRunClients = {}; // a subset of clients
+
+ //let io;
+
+ // key: workflow id
+ // value: Workflow instance
+ let workflows = {};
+
+ // key: workflow run id
+ // value: WorkflowRun instance
+ let workflowRuns = {};
+
+ let serviceEvents = {
+ GREETING: 'cord.workflow.ctlsvc.greeting'
+ };
+
+ // add ws_mgroperation events
+ _.forOwn(ws_manager.serviceEvents, (wsServiceEvent, key) => {
+ serviceEvents[key] = wsServiceEvent;
+ });
+
+ // add ws_runoperation events
+ _.forOwn(ws_workflowrun.serviceEvents, (wsServiceEvent, key) => {
+ serviceEvents[key] = wsServiceEvent;
+ });
+
+ //const setIO = (ioInstance) => {
+ // io = ioInstance;
+ //};
+
+ const destroy = () => {
+ removeClients();
+ clearWorkflowRuns();
+ clearWorkflows();
+ };
+
+ const listWorkflows = () => {
+ let workflowList = [];
+ _.forOwn(workflows, (_workflow, workflowId) => {
+ workflowList.push(workflowId);
+ });
+ return workflowList;
+ };
+
+ const checkWorkflow = (workflowId) => {
+ if(workflowId in workflows) {
+ return true;
+ }
+ return false;
+ };
+
+ const addWorkflow = (workflow) => {
+ if(workflow.getId() in workflows) {
+ logger.log('error', `there exists a workflow with the same id - ${workflow.getId()}`);
+ return false;
+ }
+
+ let workflowId = workflow.getId();
+ workflows[workflowId] = workflow;
+ return true;
+ };
+
+ const clearWorkflows = () => {
+ _.forOwn(workflows, (_workflow, workflowId) => {
+ delete workflows[workflowId];
+ });
+ };
+
+ const listWorkflowRuns = () => {
+ let workflowRunList = [];
+ _.forOwn(workflowRuns, (_workflowRun, workflowRunId) => {
+ workflowRunList.push(workflowRunId);
+ });
+ return workflowRunList;
+ };
+
+ const checkWorkflowRun = (workflowRunId) => {
+ if(workflowRunId in workflowRuns) {
+ return true;
+ }
+ return false;
+ };
+
+ const addWorkflowRun = (workflowRun) => {
+ let workflowId = workflowRun.getWorkflowId();
+ let workflowRunId = workflowRun.getId();
+
+ if(workflowRunId in workflowRuns) {
+ logger.log('warn', `there exists a workflow run with the same id - ${workflowRunId}`);
+ return false;
+ }
+
+ if(!(workflowId in workflows)) {
+ logger.log('warn', `cannot find a workflow with id - ${workflowId}`);
+ return false;
+ }
+
+ workflowRuns[workflowRunId] = workflowRun;
+ return true;
+ };
+
+ const clearWorkflowRuns = () => {
+ _.forOwn(workflowRuns, (_workflowRun, workflowRunId) => {
+ delete workflowRuns[workflowRunId];
+ });
+ };
+
+ const updateWorkflowRunStatus = (workflowRunId, taskId, status) => {
+ if(!(workflowRunId in workflowRuns)) {
+ logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
+ return false;
+ }
+
+ let workflowRun = workflowRuns[workflowRunId];
+ workflowRun.updateTaskStatus(taskId, status);
+ return true;
+ };
+
+ const setWorkflowRunKickstarted = (workflowRunId) => {
+ if(!(workflowRunId in workflowRuns)) {
+ logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
+ return false;
+ }
+
+ let workflowRun = workflowRuns[workflowRunId];
+ workflowRun.setKickstarted();
+ return true;
+ };
+
+ const kickstart = (workflowId, workflowRunId) => {
+ if(!(workflowId in workflows)) {
+ logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
+ return false;
+ }
+
+ if(!(workflowRunId in workflowRuns)) {
+ logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
+ return false;
+ }
+
+ ws_manager.kickstartWorkflow(workflowId, workflowRunId);
+ };
+
+ const removeWorkflow = (workflowId) => {
+ if(!(workflowId in workflows)) {
+ logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
+ return false;
+ }
+
+ // check if there are workflow runs
+ _.forOwn(workflowRuns, (workflowRun, _workflowRunId) => {
+ if(workflowRun.getWorkflowId() === workflowId) {
+ logger.log('warn', `there exists a workflow run for a workflow id - ${workflowId}`);
+ return false;
+ }
+ });
+
+ delete workflows[workflowId];
+ return true;
+ };
+
+ const removeWorkflowRun = (workflowRunId) => {
+ if(!(workflowRunId in workflowRuns)) {
+ logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
+ return false;
+ }
+
+ let workflowRun = workflowRuns[workflowRunId];
+ delete workflowRuns[workflowRunId];
+
+ workflowRun.setFinished();
+ return true;
+ };
+
+ const sendEvent = (topic, message) => {
+ // list of workflowIds
+ // to check if there are workflow runs for the events
+ let workflowIdsRunning = [];
+
+ // route event to running instances
+ _.forOwn(workflowRuns, (workflowRun, workflowRunId) => {
+ let workflowId = workflowRun.getWorkflowId();
+ let workflow = workflows[workflowId];
+
+ // event will be routed to workflow runs that meet following criteria
+ // 1) the workflow is currently interested in the same topic
+ // (already finished tasks are not counted)
+ // 2) the task's key field and value
+ if(workflowRun.isEventAcceptable(workflow, topic, message)) {
+ logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
+ workflowRun.enqueueEvent(topic, message);
+
+ if(!workflowIdsRunning.includes(workflowId)) {
+ workflowIdsRunning.push(workflowId);
+ }
+ }
+ });
+
+ // check if the event is a kickstart event
+ _.forOwn(workflows, (workflow, workflowId) => {
+ if(workflow.isKickstartTopic(topic)) {
+ // check if there is a workflow run for the event
+ // kickstart a workflow if there is no workflows runs for the event
+ if(!workflowIdsRunning.includes(workflowId)) {
+ // we need to buffer the event until workflow run is brought up
+ let workflowRun = WorkflowRun.WorkflowRun.makeNewRun(workflow);
+ let workflowRunId = workflowRun.getId();
+
+ // register for management
+ workflowRuns[workflowRunId] = workflowRun;
+
+ // route event
+ logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
+ workflowRun.enqueueEvent(topic, message);
+
+ // KICKSTART!
+ kickstart(workflowId, workflowRunId);
+ }
+ }
+ });
+ };
+
+ const fetchEvent = (workflowRunId, taskId, topic) => {
+ // this returns an event or an empty obj when there is no message
+ if(!(workflowRunId in workflowRuns)) {
+ logger.log('warn', `workflow run ${workflowRunId} does not exist`);
+ return null;
+ }
+
+ let workflowRun = workflowRuns[workflowRunId];
+ let workflowId = workflowRun.getWorkflowId();
+
+ if(!(workflowId in workflows)) {
+ logger.log('warn', `workflow ${workflowId} does not exist`);
+ return null;
+ }
+
+ let workflow = workflows[workflowId];
+
+ let task = workflow.getTask(taskId);
+ if(!task) {
+ logger.log('warn', `workflow ${workflowId} does not have task ${taskId}`);
+ return null;
+ }
+
+ logger.log('debug', `workflow run ${workflowRunId}, task ${taskId} fetches an event`);
+
+ let event = workflowRun.dequeueEvent(topic);
+ if(event) {
+ return event;
+ }
+ else {
+ return {};
+ }
+ };
+
+ const addClient = (c) => {
+ let clientId = c.getId();
+ let socket = c.getSocket();
+
+ // check id that client is already there
+ if(clientId in allClients) {
+ logger.log('warn', `there exists a client with the same id - ${clientId}`);
+ return false;
+ }
+
+ if(c.getType() === Client.Type.PROBE) {
+ // probe' messages are relayed
+ // relay messages based on topic
+ // probe protocol:
+ // REQ:
+ // topic: event topic
+ // message: <data>
+ // RES:
+ // topic: topic sent
+ // message: {result: <true/false>, message: <error message> }
+ allClients[clientId] = c;
+ probeClients[clientId] = c;
+
+ socket.on('*', (msg) => {
+ let jsonMsg = msg.data;
+ if(jsonMsg.length === 2) {
+ // must have two parts
+ // first part is topic
+ // second part is message body
+ let topic = jsonMsg[0];
+ let messageBody = jsonMsg[1];
+
+ sendEvent(topic, messageBody);
+
+ // return true for success
+ socket.emit(topic, {
+ result: true
+ });
+ }
+ else {
+ logger.log('warn', `unexpected message is received - ${JSON.stringify(jsonMsg)}`);
+ socket.emit(jsonMsg[0], {
+ result: false,
+ message: `unexpected message is received - ${JSON.stringify(jsonMsg)}`
+ });
+ }
+ });
+ return true;
+ }
+ else if(c.getType() === Client.Type.WORKFLOW_MANAGER) {
+ // manager
+ // manager protocol:
+ // REQ:
+ // topic: operation
+ // message: <data>
+ // RES:
+ // topic: topic sent
+ // message: {result: <true/false>, message: <error message> }F
+ allClients[clientId] = c;
+ workflowManagerClients[clientId] = c;
+
+ // attach manager operations
+ let router = ws_manager.getRouter();
+ _.forOwn(router, (routerElem, _key) => {
+ socket.on(routerElem.topic, (msg) => {
+ routerElem.handler(routerElem.topic, msg, (err, result) => {
+ if(err) {
+ logger.log('warn', `unable to handle a message - ${err}`);
+ socket.emit(routerElem.topic, {
+ result: false,
+ message: err
+ });
+ return;
+ }
+
+ if(routerElem.return === undefined || routerElem.return) {
+ socket.emit(routerElem.topic, {
+ result: result
+ });
+ }
+ });
+ });
+ });
+ return true;
+ }
+ else if(c.getType() === Client.Type.WORKFLOW_RUN) {
+ // workflow run
+ // workflow run protocol:
+ // REQ:
+ // topic: operation
+ // message: <data>
+ // RES:
+ // topic: topic sent
+ // message: {result: <true/false>, message: <error message> }
+
+ // map to WorkflowRun instance
+ let workflowId = c.getWorkflowId();
+ let workflowRunId = c.getWorkflowRunId();
+ let workflowRun;
+
+ if(!(workflowId in workflows)) {
+ logger.log('warn', `cannot find a workflow ${workflowId}`);
+ return false;
+ }
+
+ // register client to workflow run
+ if(!(workflowRunId in workflowRuns)) {
+ // workflow run not exist yet
+ logger.log('warn', `cannot find a workflow run ${workflowRunId}`);
+ return false;
+ }
+
+ //let workflow = workflows[workflowId];
+
+ allClients[clientId] = c;
+ workflowRunClients[clientId] = c;
+
+ // update
+ workflowRun = workflowRuns[workflowRunId];
+ workflowRun.addClientId(clientId);
+
+ // attach workflow run operations
+ let router = ws_workflowrun.getRouter();
+ _.forOwn(router, (routerElem, _key) => {
+ socket.on(routerElem.topic, (msg) => {
+ routerElem.handler(routerElem.topic, msg, (err, result) => {
+ if(err) {
+ logger.log('warn', `unable to handle a message - ${err}`);
+ socket.emit(routerElem.topic, {
+ result: false,
+ message: err
+ });
+ return;
+ }
+
+ if(routerElem.return === undefined || routerElem.return) {
+ socket.emit(routerElem.topic, {
+ result: result
+ });
+ }
+ });
+ });
+ });
+ return true;
+ }
+ return false;
+ };
+
+ const removeClient = (id) => {
+ if(id in allClients) {
+ let removedClient = allClients[id];
+ delete allClients[id];
+
+ let type = removedClient.getType();
+ if(type === Client.Type.PROBE) {
+ delete probeClients[id];
+ }
+ else if(type === Client.Type.WORKFLOW_MANAGER) {
+ delete workflowManagerClients[id];
+ }
+ else if(type === Client.Type.WORKFLOW_RUN) {
+ delete workflowRunClients[id];
+
+ let workflowRunId = removedClient.getWorkflowRunId();
+ let workflowRun = workflowRuns[workflowRunId];
+
+ if(workflowRun) {
+ workflowRun.removeClientId(id);
+
+ //TODO
+ // WorkflowRun can have no clients between tasks
+ // So we should not remove the run until the workflow run finishes
+ }
+ }
+ }
+ };
+
+ const removeClients = () => {
+ let probeClients = {};
+
+ _.forOwn(probeClients, (_probeClient, clientId) => {
+ delete probeClients[clientId];
+ });
+
+ _.forOwn(workflowManagerClients, (_workflowManagerClient, clientId) => {
+ delete workflowManagerClients[clientId];
+ });
+
+ _.forOwn(workflowRunClients, (_workflowRunClients, clientId) => {
+ delete workflowRunClients[clientId];
+ });
+
+ _.forOwn(allClients, (client, clientId) => {
+ client.getSocket().disconnect(true);
+ delete allClients[clientId];
+ });
+ }
+
+ module.exports = {
+ serviceEvents: serviceEvents,
+ destroy: destroy,
+ getClients: () => { return allClients; },
+ getProbeClients: () => { return probeClients; },
+ getWorkflowManagerClients: () => { return workflowManagerClients; },
+ getWorkflowRunClients: () => { return workflowRunClients; },
+ clientType: Client.Type,
+ //setIO: setIO,
+ sendEvent: sendEvent,
+ fetchEvent: fetchEvent,
+ addClient: addClient,
+ removeClient: removeClient,
+ removeClients: removeClients,
+ addWorkflow: addWorkflow,
+ listWorkflows: listWorkflows,
+ checkWorkflow: checkWorkflow,
+ removeWorkflow: removeWorkflow,
+ clearWorkflows: clearWorkflows,
+ addWorkflowRun: addWorkflowRun,
+ listWorkflowRuns: listWorkflowRuns,
+ checkWorkflowRun: checkWorkflowRun,
+ removeWorkflowRun: removeWorkflowRun,
+ clearWorkflowRuns: clearWorkflowRuns,
+ updateWorkflowRunStatus: updateWorkflowRunStatus,
+ setWorkflowRunKickstarted: setWorkflowRunKickstarted,
+ };
+})();
\ No newline at end of file