Implement basic functionalities for workflow control.
- Manage join/leave of clients
- All clients communicate via socket.io
- Probes emit events
- Managers register workflows (by using a workflow essence)
- Send kickstart request to Managers to launch workflows
- Route events to workflow runs
- Queue events to not lose events between workflow tasks
- Fixed some issues found while working on testcases
- Set to perform coverage and unittest and generate outputs to files
Change-Id: I678723edc20df9247d63a4bf6380785ab8b2b221
diff --git a/src/controllers/ws_manager.js b/src/controllers/ws_manager.js
new file mode 100644
index 0000000..b3e8877
--- /dev/null
+++ b/src/controllers/ws_manager.js
@@ -0,0 +1,328 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+ 'use strict';
+
+ const _ = require('lodash');
+ const Workflow = require('../types/workflow.js');
+ const logger = require('../config/logger.js');
+
+ let serviceEvents = {
+ WORKFLOW_REG: 'cord.workflow.ctlsvc.workflow.reg',
+ WORKFLOW_REG_ESSENCE: 'cord.workflow.ctlsvc.workflow.reg_essence',
+ WORKFLOW_LIST: 'cord.workflow.ctlsvc.workflow.list',
+ WORKFLOW_RUN_LIST: 'cord.workflow.ctlsvc.workflow.run.list',
+ WORKFLOW_CHECK: 'cord.workflow.ctlsvc.workflow.check',
+ WORKFLOW_KICKSTART: 'cord.workflow.ctlsvc.workflow.kickstart',
+ WORKFLOW_REMOVE: 'cord.workflow.ctlsvc.workflow.remove',
+ WORKFLOW_RUN_REMOVE: 'cord.workflow.ctlsvc.workflow.run.remove'
+ };
+
+ // WebSocket interface for workflow registration
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.reg',
+ // message: <workflow>
+ // }
+ const registWorkflow = (topic, message, cb) => {
+ const distributor = require('./eventrouter.js/index.js');
+
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let workflow = message;
+
+ logger.log('debug', `workflow - ${JSON.stringify(workflow)}`);
+
+ let result = distributor.addWorkflow(workflow);
+ if(!result) {
+ errorMessage = `failed to register a workflow ${workflow.getId()}`;
+ cb(errorMessage, false);
+ }
+ else {
+ cb(null, true);
+ }
+ return;
+ };
+
+ // WebSocket interface for workflow registration (via essence)
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.reg',
+ // message: <workflow essence>
+ // }
+ const registerWorkflowEssence = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let essence = message;
+ let result = true;
+ let errorResults = [];
+
+ logger.log('debug', `workflow essence - ${JSON.stringify(essence)}`);
+
+ let workflows = Workflow.loadWorkflowsFromEssence(essence);
+ workflows.forEach((workflow) => {
+ if(workflow) {
+ let localResult = eventrouter.addWorkflow(workflow);
+ errorResults.push(localResult);
+ result = result && localResult; // false if any of registrations fails
+ }
+ });
+
+ if(!result) {
+ errorMessage = `failed to register workflows ${errorResults}`;
+ cb(errorMessage, false);
+ }
+ else {
+ cb(null, true);
+ }
+ return;
+ };
+
+ // WebSocket interface for workflow listing
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.list',
+ // message: null
+ // }
+ const listWorkflows = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let result = eventrouter.listWorkflows();
+ cb(null, result);
+ return;
+ };
+
+ // WebSocket interface for workflow run listing
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.list',
+ // message: null
+ // }
+ const listWorkflowRuns = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let result = eventrouter.listWorkflowRuns();
+ cb(null, result);
+ return;
+ };
+
+ // WebSocket interface for workflow check
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.check',
+ // message: <workflow_id>
+ // }
+ const checkWorkflow = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let workflowId = message;
+ let result = eventrouter.checkWorkflow(workflowId);
+ cb(null, result);
+ return;
+ };
+
+ // WebSocket interface for workflow start notification
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.kickstart',
+ // message: {
+ // workflow_id: <workflow_id>,
+ // workflow_run_id: <workflow_run_id>
+ // }
+ // }
+ const notifyWorkflowStart = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_id' in message)) {
+ // error
+ errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_run_id' in message)) {
+ // error
+ errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let workflowRunId = message.workflow_run_id;
+
+ // there must be a workflow matching
+ // set the workflow kickstarted
+ eventrouter.setWorkflowRunKickstarted(workflowRunId);
+ cb(null, true);
+ return;
+ }
+
+ // WebSocket interface for workflow removal
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.remove',
+ // message: <workflow_id>
+ // }
+ const removeWorkflow = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let workflowId = message;
+ let result = eventrouter.removeWorkflow(workflowId);
+ cb(null, result);
+ return;
+ }
+
+ // WebSocket interface for workflow run removal
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.run.remove',
+ // message: {
+ // workflow_id: <workflow_id>,
+ // workflow_run_id: <workflow_run_id>
+ // }
+ // }
+ const removeWorkflowRun = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_id' in message)) {
+ // error
+ errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_run_id' in message)) {
+ // error
+ errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let workflowRunId = message.workflow_run_id;
+
+ let result = eventrouter.removeWorkflowRun(workflowRunId);
+ cb(null, result);
+ return;
+ }
+
+ const getRouter = () => {
+ return {
+ registWorkflow: {
+ topic: serviceEvents.WORKFLOW_REG,
+ handler: registWorkflow
+ },
+ registerWorkflowEssence: {
+ topic: serviceEvents.WORKFLOW_REG_ESSENCE,
+ handler: registerWorkflowEssence
+ },
+ listWorkflows: {
+ topic: serviceEvents.WORKFLOW_LIST,
+ handler: listWorkflows
+ },
+ listWorkflowRuns: {
+ topic: serviceEvents.WORKFLOW_RUN_LIST,
+ handler: listWorkflowRuns
+ },
+ checkWorkflow: {
+ topic: serviceEvents.WORKFLOW_CHECK,
+ handler: checkWorkflow
+ },
+ notifyWorkflowStart: {
+ topic: serviceEvents.WORKFLOW_KICKSTART,
+ handler: notifyWorkflowStart,
+ return: false
+ },
+ removeWorkflow: {
+ topic: serviceEvents.WORKFLOW_REMOVE,
+ handler: removeWorkflow
+ },
+ removeWorkflowRun: {
+ topic: serviceEvents.WORKFLOW_RUN_REMOVE,
+ handler: removeWorkflowRun
+ }
+ };
+ };
+
+ // out-going commands
+ const kickstartWorkflow = (workflowId, workflowRunId) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let clients = eventrouter.getWorkflowManagerClients();
+ _.forOwn(clients, (client, _clientId) => {
+ let socket = client.getSocket();
+ if(socket) {
+ socket.emit(serviceEvents.WORKFLOW_KICKSTART, {
+ workflow_id: workflowId,
+ workflow_run_id: workflowRunId
+ });
+ }
+ });
+ return;
+ };
+
+ module.exports = {
+ serviceEvents: serviceEvents,
+ getRouter: getRouter,
+ kickstartWorkflow: kickstartWorkflow
+ };
+})();
\ No newline at end of file