Implement basic functionalities for workflow control.
- Manage join/leave of clients
- All clients communicate via socket.io
- Probes emit events
- Managers register workflows (by using a workflow essence)
- Send kickstart request to Managers to launch workflows
- Route events to workflow runs
- Queue events to not lose events between workflow tasks
- Fixed some issues found while working on testcases
- Set to perform coverage and unittest and generate outputs to files
Change-Id: I678723edc20df9247d63a4bf6380785ab8b2b221
diff --git a/src/controllers/eventrouter.js b/src/controllers/eventrouter.js
new file mode 100644
index 0000000..82d27db
--- /dev/null
+++ b/src/controllers/eventrouter.js
@@ -0,0 +1,512 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+ 'use strict';
+
+ const _ = require('lodash');
+ const logger = require('../config/logger.js');
+ const Client = require('../types/client.js');
+ const WorkflowRun = require('../types/workflowrun.js');
+ const ws_manager = require('./ws_manager.js');
+ const ws_workflowrun = require('./ws_workflowrun.js');
+
+ let allClients = {}; // has publishers and subscribers
+ let probeClients = {}; // a subset of clients
+ let workflowManagerClients = {}; // a subset of clients
+ let workflowRunClients = {}; // a subset of clients
+
+ //let io;
+
+ // key: workflow id
+ // value: Workflow instance
+ let workflows = {};
+
+ // key: workflow run id
+ // value: WorkflowRun instance
+ let workflowRuns = {};
+
+ let serviceEvents = {
+ GREETING: 'cord.workflow.ctlsvc.greeting'
+ };
+
+ // add ws_mgroperation events
+ _.forOwn(ws_manager.serviceEvents, (wsServiceEvent, key) => {
+ serviceEvents[key] = wsServiceEvent;
+ });
+
+ // add ws_runoperation events
+ _.forOwn(ws_workflowrun.serviceEvents, (wsServiceEvent, key) => {
+ serviceEvents[key] = wsServiceEvent;
+ });
+
+ //const setIO = (ioInstance) => {
+ // io = ioInstance;
+ //};
+
+ const destroy = () => {
+ removeClients();
+ clearWorkflowRuns();
+ clearWorkflows();
+ };
+
+ const listWorkflows = () => {
+ let workflowList = [];
+ _.forOwn(workflows, (_workflow, workflowId) => {
+ workflowList.push(workflowId);
+ });
+ return workflowList;
+ };
+
+ const checkWorkflow = (workflowId) => {
+ if(workflowId in workflows) {
+ return true;
+ }
+ return false;
+ };
+
+ const addWorkflow = (workflow) => {
+ if(workflow.getId() in workflows) {
+ logger.log('error', `there exists a workflow with the same id - ${workflow.getId()}`);
+ return false;
+ }
+
+ let workflowId = workflow.getId();
+ workflows[workflowId] = workflow;
+ return true;
+ };
+
+ const clearWorkflows = () => {
+ _.forOwn(workflows, (_workflow, workflowId) => {
+ delete workflows[workflowId];
+ });
+ };
+
+ const listWorkflowRuns = () => {
+ let workflowRunList = [];
+ _.forOwn(workflowRuns, (_workflowRun, workflowRunId) => {
+ workflowRunList.push(workflowRunId);
+ });
+ return workflowRunList;
+ };
+
+ const checkWorkflowRun = (workflowRunId) => {
+ if(workflowRunId in workflowRuns) {
+ return true;
+ }
+ return false;
+ };
+
+ const addWorkflowRun = (workflowRun) => {
+ let workflowId = workflowRun.getWorkflowId();
+ let workflowRunId = workflowRun.getId();
+
+ if(workflowRunId in workflowRuns) {
+ logger.log('warn', `there exists a workflow run with the same id - ${workflowRunId}`);
+ return false;
+ }
+
+ if(!(workflowId in workflows)) {
+ logger.log('warn', `cannot find a workflow with id - ${workflowId}`);
+ return false;
+ }
+
+ workflowRuns[workflowRunId] = workflowRun;
+ return true;
+ };
+
+ const clearWorkflowRuns = () => {
+ _.forOwn(workflowRuns, (_workflowRun, workflowRunId) => {
+ delete workflowRuns[workflowRunId];
+ });
+ };
+
+ const updateWorkflowRunStatus = (workflowRunId, taskId, status) => {
+ if(!(workflowRunId in workflowRuns)) {
+ logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
+ return false;
+ }
+
+ let workflowRun = workflowRuns[workflowRunId];
+ workflowRun.updateTaskStatus(taskId, status);
+ return true;
+ };
+
+ const setWorkflowRunKickstarted = (workflowRunId) => {
+ if(!(workflowRunId in workflowRuns)) {
+ logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
+ return false;
+ }
+
+ let workflowRun = workflowRuns[workflowRunId];
+ workflowRun.setKickstarted();
+ return true;
+ };
+
+ const kickstart = (workflowId, workflowRunId) => {
+ if(!(workflowId in workflows)) {
+ logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
+ return false;
+ }
+
+ if(!(workflowRunId in workflowRuns)) {
+ logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
+ return false;
+ }
+
+ ws_manager.kickstartWorkflow(workflowId, workflowRunId);
+ };
+
+ const removeWorkflow = (workflowId) => {
+ if(!(workflowId in workflows)) {
+ logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
+ return false;
+ }
+
+ // check if there are workflow runs
+ _.forOwn(workflowRuns, (workflowRun, _workflowRunId) => {
+ if(workflowRun.getWorkflowId() === workflowId) {
+ logger.log('warn', `there exists a workflow run for a workflow id - ${workflowId}`);
+ return false;
+ }
+ });
+
+ delete workflows[workflowId];
+ return true;
+ };
+
+ const removeWorkflowRun = (workflowRunId) => {
+ if(!(workflowRunId in workflowRuns)) {
+ logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
+ return false;
+ }
+
+ let workflowRun = workflowRuns[workflowRunId];
+ delete workflowRuns[workflowRunId];
+
+ workflowRun.setFinished();
+ return true;
+ };
+
+ const sendEvent = (topic, message) => {
+ // list of workflowIds
+ // to check if there are workflow runs for the events
+ let workflowIdsRunning = [];
+
+ // route event to running instances
+ _.forOwn(workflowRuns, (workflowRun, workflowRunId) => {
+ let workflowId = workflowRun.getWorkflowId();
+ let workflow = workflows[workflowId];
+
+ // event will be routed to workflow runs that meet following criteria
+ // 1) the workflow is currently interested in the same topic
+ // (already finished tasks are not counted)
+ // 2) the task's key field and value
+ if(workflowRun.isEventAcceptable(workflow, topic, message)) {
+ logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
+ workflowRun.enqueueEvent(topic, message);
+
+ if(!workflowIdsRunning.includes(workflowId)) {
+ workflowIdsRunning.push(workflowId);
+ }
+ }
+ });
+
+ // check if the event is a kickstart event
+ _.forOwn(workflows, (workflow, workflowId) => {
+ if(workflow.isKickstartTopic(topic)) {
+ // check if there is a workflow run for the event
+ // kickstart a workflow if there is no workflows runs for the event
+ if(!workflowIdsRunning.includes(workflowId)) {
+ // we need to buffer the event until workflow run is brought up
+ let workflowRun = WorkflowRun.WorkflowRun.makeNewRun(workflow);
+ let workflowRunId = workflowRun.getId();
+
+ // register for management
+ workflowRuns[workflowRunId] = workflowRun;
+
+ // route event
+ logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
+ workflowRun.enqueueEvent(topic, message);
+
+ // KICKSTART!
+ kickstart(workflowId, workflowRunId);
+ }
+ }
+ });
+ };
+
+ const fetchEvent = (workflowRunId, taskId, topic) => {
+ // this returns an event or an empty obj when there is no message
+ if(!(workflowRunId in workflowRuns)) {
+ logger.log('warn', `workflow run ${workflowRunId} does not exist`);
+ return null;
+ }
+
+ let workflowRun = workflowRuns[workflowRunId];
+ let workflowId = workflowRun.getWorkflowId();
+
+ if(!(workflowId in workflows)) {
+ logger.log('warn', `workflow ${workflowId} does not exist`);
+ return null;
+ }
+
+ let workflow = workflows[workflowId];
+
+ let task = workflow.getTask(taskId);
+ if(!task) {
+ logger.log('warn', `workflow ${workflowId} does not have task ${taskId}`);
+ return null;
+ }
+
+ logger.log('debug', `workflow run ${workflowRunId}, task ${taskId} fetches an event`);
+
+ let event = workflowRun.dequeueEvent(topic);
+ if(event) {
+ return event;
+ }
+ else {
+ return {};
+ }
+ };
+
+ const addClient = (c) => {
+ let clientId = c.getId();
+ let socket = c.getSocket();
+
+ // check id that client is already there
+ if(clientId in allClients) {
+ logger.log('warn', `there exists a client with the same id - ${clientId}`);
+ return false;
+ }
+
+ if(c.getType() === Client.Type.PROBE) {
+ // probe' messages are relayed
+ // relay messages based on topic
+ // probe protocol:
+ // REQ:
+ // topic: event topic
+ // message: <data>
+ // RES:
+ // topic: topic sent
+ // message: {result: <true/false>, message: <error message> }
+ allClients[clientId] = c;
+ probeClients[clientId] = c;
+
+ socket.on('*', (msg) => {
+ let jsonMsg = msg.data;
+ if(jsonMsg.length === 2) {
+ // must have two parts
+ // first part is topic
+ // second part is message body
+ let topic = jsonMsg[0];
+ let messageBody = jsonMsg[1];
+
+ sendEvent(topic, messageBody);
+
+ // return true for success
+ socket.emit(topic, {
+ result: true
+ });
+ }
+ else {
+ logger.log('warn', `unexpected message is received - ${JSON.stringify(jsonMsg)}`);
+ socket.emit(jsonMsg[0], {
+ result: false,
+ message: `unexpected message is received - ${JSON.stringify(jsonMsg)}`
+ });
+ }
+ });
+ return true;
+ }
+ else if(c.getType() === Client.Type.WORKFLOW_MANAGER) {
+ // manager
+ // manager protocol:
+ // REQ:
+ // topic: operation
+ // message: <data>
+ // RES:
+ // topic: topic sent
+ // message: {result: <true/false>, message: <error message> }F
+ allClients[clientId] = c;
+ workflowManagerClients[clientId] = c;
+
+ // attach manager operations
+ let router = ws_manager.getRouter();
+ _.forOwn(router, (routerElem, _key) => {
+ socket.on(routerElem.topic, (msg) => {
+ routerElem.handler(routerElem.topic, msg, (err, result) => {
+ if(err) {
+ logger.log('warn', `unable to handle a message - ${err}`);
+ socket.emit(routerElem.topic, {
+ result: false,
+ message: err
+ });
+ return;
+ }
+
+ if(routerElem.return === undefined || routerElem.return) {
+ socket.emit(routerElem.topic, {
+ result: result
+ });
+ }
+ });
+ });
+ });
+ return true;
+ }
+ else if(c.getType() === Client.Type.WORKFLOW_RUN) {
+ // workflow run
+ // workflow run protocol:
+ // REQ:
+ // topic: operation
+ // message: <data>
+ // RES:
+ // topic: topic sent
+ // message: {result: <true/false>, message: <error message> }
+
+ // map to WorkflowRun instance
+ let workflowId = c.getWorkflowId();
+ let workflowRunId = c.getWorkflowRunId();
+ let workflowRun;
+
+ if(!(workflowId in workflows)) {
+ logger.log('warn', `cannot find a workflow ${workflowId}`);
+ return false;
+ }
+
+ // register client to workflow run
+ if(!(workflowRunId in workflowRuns)) {
+ // workflow run not exist yet
+ logger.log('warn', `cannot find a workflow run ${workflowRunId}`);
+ return false;
+ }
+
+ //let workflow = workflows[workflowId];
+
+ allClients[clientId] = c;
+ workflowRunClients[clientId] = c;
+
+ // update
+ workflowRun = workflowRuns[workflowRunId];
+ workflowRun.addClientId(clientId);
+
+ // attach workflow run operations
+ let router = ws_workflowrun.getRouter();
+ _.forOwn(router, (routerElem, _key) => {
+ socket.on(routerElem.topic, (msg) => {
+ routerElem.handler(routerElem.topic, msg, (err, result) => {
+ if(err) {
+ logger.log('warn', `unable to handle a message - ${err}`);
+ socket.emit(routerElem.topic, {
+ result: false,
+ message: err
+ });
+ return;
+ }
+
+ if(routerElem.return === undefined || routerElem.return) {
+ socket.emit(routerElem.topic, {
+ result: result
+ });
+ }
+ });
+ });
+ });
+ return true;
+ }
+ return false;
+ };
+
+ const removeClient = (id) => {
+ if(id in allClients) {
+ let removedClient = allClients[id];
+ delete allClients[id];
+
+ let type = removedClient.getType();
+ if(type === Client.Type.PROBE) {
+ delete probeClients[id];
+ }
+ else if(type === Client.Type.WORKFLOW_MANAGER) {
+ delete workflowManagerClients[id];
+ }
+ else if(type === Client.Type.WORKFLOW_RUN) {
+ delete workflowRunClients[id];
+
+ let workflowRunId = removedClient.getWorkflowRunId();
+ let workflowRun = workflowRuns[workflowRunId];
+
+ if(workflowRun) {
+ workflowRun.removeClientId(id);
+
+ //TODO
+ // WorkflowRun can have no clients between tasks
+ // So we should not remove the run until the workflow run finishes
+ }
+ }
+ }
+ };
+
+ const removeClients = () => {
+ let probeClients = {};
+
+ _.forOwn(probeClients, (_probeClient, clientId) => {
+ delete probeClients[clientId];
+ });
+
+ _.forOwn(workflowManagerClients, (_workflowManagerClient, clientId) => {
+ delete workflowManagerClients[clientId];
+ });
+
+ _.forOwn(workflowRunClients, (_workflowRunClients, clientId) => {
+ delete workflowRunClients[clientId];
+ });
+
+ _.forOwn(allClients, (client, clientId) => {
+ client.getSocket().disconnect(true);
+ delete allClients[clientId];
+ });
+ }
+
+ module.exports = {
+ serviceEvents: serviceEvents,
+ destroy: destroy,
+ getClients: () => { return allClients; },
+ getProbeClients: () => { return probeClients; },
+ getWorkflowManagerClients: () => { return workflowManagerClients; },
+ getWorkflowRunClients: () => { return workflowRunClients; },
+ clientType: Client.Type,
+ //setIO: setIO,
+ sendEvent: sendEvent,
+ fetchEvent: fetchEvent,
+ addClient: addClient,
+ removeClient: removeClient,
+ removeClients: removeClients,
+ addWorkflow: addWorkflow,
+ listWorkflows: listWorkflows,
+ checkWorkflow: checkWorkflow,
+ removeWorkflow: removeWorkflow,
+ clearWorkflows: clearWorkflows,
+ addWorkflowRun: addWorkflowRun,
+ listWorkflowRuns: listWorkflowRuns,
+ checkWorkflowRun: checkWorkflowRun,
+ removeWorkflowRun: removeWorkflowRun,
+ clearWorkflowRuns: clearWorkflowRuns,
+ updateWorkflowRunStatus: updateWorkflowRunStatus,
+ setWorkflowRunKickstarted: setWorkflowRunKickstarted,
+ };
+})();
\ No newline at end of file
diff --git a/src/controllers/rest_probe.js b/src/controllers/rest_probe.js
new file mode 100644
index 0000000..9d67386
--- /dev/null
+++ b/src/controllers/rest_probe.js
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+ 'use strict';
+
+ const express = require('express');
+ const {checkSchema, validationResult} = require('express-validator');
+ const logger = require('../config/logger.js');
+ const eventrouter = require('./eventrouter.js');
+
+ // HTTP REST interface for message intake
+ // POST method
+ // Message format:
+ // {
+ // topic: 'topic here',
+ // message: 'message body here'
+ // }
+ // e.g., /intake?topic=aaa&message=bbb
+ const intakeMessageInputValidator = {
+ topic: {
+ in: ['params', 'query'],
+ errorMessage: 'Message topic is null or empty',
+ },
+ message: {
+ in: ['params', 'query'],
+ errorMessage: 'Message body is null or empty',
+ }
+ };
+
+ const intakeMessage = (req, res) => {
+ let errors = validationResult(req);
+ if(!errors.isEmpty()) {
+ res.status(400).send(
+ JSON.stringify({
+ errors: errors.array()
+ })
+ );
+ return;
+ }
+
+ let jsonMessage = req.body
+ logger.debug(`Received a message ${jsonMessage}`);
+
+ // send the message to the event distributor
+ eventrouter.sendEvent(jsonMessage.topic, jsonMessage.message);
+
+ res.status(200).send({
+ result: true
+ });
+ return;
+ };
+
+ const getRouter = () => {
+ var routerInstance = new express.Router();
+ routerInstance.use((req, res, next) => {
+ logger.info(`[REQ] ${req.method}, ${req.url}`);
+ next();
+ });
+
+ // intake apis
+ routerInstance.post('/intake', checkSchema(intakeMessageInputValidator), intakeMessage);
+ return routerInstance;
+ };
+
+ module.exports = {
+ getRouter: getRouter
+ };
+})();
\ No newline at end of file
diff --git a/src/controllers/websocket.js b/src/controllers/websocket.js
new file mode 100644
index 0000000..3f3216a
--- /dev/null
+++ b/src/controllers/websocket.js
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+(function () {
+ 'use strict';
+
+ const socketio = require('socket.io');
+ const ioWildcard = require('socketio-wildcard');
+ const client = require('../types/client.js');
+ const eventrouter = require('./eventrouter.js');
+ const logger = require('../config/logger.js');
+
+ let io;
+ const createSocketIO = (server) => {
+ // INSTANTIATE SOCKET.IO
+ io = socketio.listen(server);
+ io.use(ioWildcard());
+
+ // set io to eventrouter
+ //eventrouter.setIO(io);
+
+ // LISTEN TO "CONNECTION" EVENT (FROM SOCKET.IO)
+ io.on('connection', (socket) => {
+ let query = socket.handshake.query;
+ logger.log('debug', `connect ${JSON.stringify(query)}`);
+ let added = false;
+
+ // make a client
+ let c = client.Client.fromObj(query);
+ c.setSocket(socket);
+
+ if(!c.validate()) {
+ logger.log('warn', `client validation failed - ${JSON.stringify(query)}`);
+ return;
+ }
+
+ // register the client for management
+ if(eventrouter.addClient(c)) {
+ // Send a greeting message to the client
+ socket.emit(eventrouter.serviceEvents.GREETING, {
+ to: c.getId(),
+ message: 'Welcome to CORD Workflow Control Service'
+ });
+
+ added = true;
+ }
+ else {
+ logger.log('warn', `client could not be added - ${JSON.stringify(query)}`);
+ socket.disconnect(true);
+ }
+
+ // set a disconnect event handler
+ socket.on('disconnect', (reason) => {
+ logger.log('debug', `disconnect ${reason} ${JSON.stringify(query)}`);
+ if(added) {
+ eventrouter.removeClient(c.getId());
+ }
+ });
+ });
+ };
+
+ const destroySocketIO = () => {
+ io.close();
+ };
+
+ const getSocketIO = () => io;
+
+ module.exports = {
+ create: createSocketIO,
+ destroy: destroySocketIO,
+ get: getSocketIO
+ };
+
+ // USAGE
+ // const socketIo = require('./controllers/websocket.js');
+ // const socket = socketIo.get();
+ // socket.emit('eventName', data);
+
+})();
\ No newline at end of file
diff --git a/src/controllers/ws_manager.js b/src/controllers/ws_manager.js
new file mode 100644
index 0000000..b3e8877
--- /dev/null
+++ b/src/controllers/ws_manager.js
@@ -0,0 +1,328 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+ 'use strict';
+
+ const _ = require('lodash');
+ const Workflow = require('../types/workflow.js');
+ const logger = require('../config/logger.js');
+
+ let serviceEvents = {
+ WORKFLOW_REG: 'cord.workflow.ctlsvc.workflow.reg',
+ WORKFLOW_REG_ESSENCE: 'cord.workflow.ctlsvc.workflow.reg_essence',
+ WORKFLOW_LIST: 'cord.workflow.ctlsvc.workflow.list',
+ WORKFLOW_RUN_LIST: 'cord.workflow.ctlsvc.workflow.run.list',
+ WORKFLOW_CHECK: 'cord.workflow.ctlsvc.workflow.check',
+ WORKFLOW_KICKSTART: 'cord.workflow.ctlsvc.workflow.kickstart',
+ WORKFLOW_REMOVE: 'cord.workflow.ctlsvc.workflow.remove',
+ WORKFLOW_RUN_REMOVE: 'cord.workflow.ctlsvc.workflow.run.remove'
+ };
+
+ // WebSocket interface for workflow registration
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.reg',
+ // message: <workflow>
+ // }
+ const registWorkflow = (topic, message, cb) => {
+ const distributor = require('./eventrouter.js/index.js');
+
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let workflow = message;
+
+ logger.log('debug', `workflow - ${JSON.stringify(workflow)}`);
+
+ let result = distributor.addWorkflow(workflow);
+ if(!result) {
+ errorMessage = `failed to register a workflow ${workflow.getId()}`;
+ cb(errorMessage, false);
+ }
+ else {
+ cb(null, true);
+ }
+ return;
+ };
+
+ // WebSocket interface for workflow registration (via essence)
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.reg',
+ // message: <workflow essence>
+ // }
+ const registerWorkflowEssence = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let essence = message;
+ let result = true;
+ let errorResults = [];
+
+ logger.log('debug', `workflow essence - ${JSON.stringify(essence)}`);
+
+ let workflows = Workflow.loadWorkflowsFromEssence(essence);
+ workflows.forEach((workflow) => {
+ if(workflow) {
+ let localResult = eventrouter.addWorkflow(workflow);
+ errorResults.push(localResult);
+ result = result && localResult; // false if any of registrations fails
+ }
+ });
+
+ if(!result) {
+ errorMessage = `failed to register workflows ${errorResults}`;
+ cb(errorMessage, false);
+ }
+ else {
+ cb(null, true);
+ }
+ return;
+ };
+
+ // WebSocket interface for workflow listing
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.list',
+ // message: null
+ // }
+ const listWorkflows = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let result = eventrouter.listWorkflows();
+ cb(null, result);
+ return;
+ };
+
+ // WebSocket interface for workflow run listing
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.list',
+ // message: null
+ // }
+ const listWorkflowRuns = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let result = eventrouter.listWorkflowRuns();
+ cb(null, result);
+ return;
+ };
+
+ // WebSocket interface for workflow check
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.check',
+ // message: <workflow_id>
+ // }
+ const checkWorkflow = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let workflowId = message;
+ let result = eventrouter.checkWorkflow(workflowId);
+ cb(null, result);
+ return;
+ };
+
+ // WebSocket interface for workflow start notification
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.kickstart',
+ // message: {
+ // workflow_id: <workflow_id>,
+ // workflow_run_id: <workflow_run_id>
+ // }
+ // }
+ const notifyWorkflowStart = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_id' in message)) {
+ // error
+ errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_run_id' in message)) {
+ // error
+ errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let workflowRunId = message.workflow_run_id;
+
+ // there must be a workflow matching
+ // set the workflow kickstarted
+ eventrouter.setWorkflowRunKickstarted(workflowRunId);
+ cb(null, true);
+ return;
+ }
+
+ // WebSocket interface for workflow removal
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.remove',
+ // message: <workflow_id>
+ // }
+ const removeWorkflow = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let workflowId = message;
+ let result = eventrouter.removeWorkflow(workflowId);
+ cb(null, result);
+ return;
+ }
+
+ // WebSocket interface for workflow run removal
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.run.remove',
+ // message: {
+ // workflow_id: <workflow_id>,
+ // workflow_run_id: <workflow_run_id>
+ // }
+ // }
+ const removeWorkflowRun = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_id' in message)) {
+ // error
+ errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_run_id' in message)) {
+ // error
+ errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let workflowRunId = message.workflow_run_id;
+
+ let result = eventrouter.removeWorkflowRun(workflowRunId);
+ cb(null, result);
+ return;
+ }
+
+ const getRouter = () => {
+ return {
+ registWorkflow: {
+ topic: serviceEvents.WORKFLOW_REG,
+ handler: registWorkflow
+ },
+ registerWorkflowEssence: {
+ topic: serviceEvents.WORKFLOW_REG_ESSENCE,
+ handler: registerWorkflowEssence
+ },
+ listWorkflows: {
+ topic: serviceEvents.WORKFLOW_LIST,
+ handler: listWorkflows
+ },
+ listWorkflowRuns: {
+ topic: serviceEvents.WORKFLOW_RUN_LIST,
+ handler: listWorkflowRuns
+ },
+ checkWorkflow: {
+ topic: serviceEvents.WORKFLOW_CHECK,
+ handler: checkWorkflow
+ },
+ notifyWorkflowStart: {
+ topic: serviceEvents.WORKFLOW_KICKSTART,
+ handler: notifyWorkflowStart,
+ return: false
+ },
+ removeWorkflow: {
+ topic: serviceEvents.WORKFLOW_REMOVE,
+ handler: removeWorkflow
+ },
+ removeWorkflowRun: {
+ topic: serviceEvents.WORKFLOW_RUN_REMOVE,
+ handler: removeWorkflowRun
+ }
+ };
+ };
+
+ // out-going commands
+ const kickstartWorkflow = (workflowId, workflowRunId) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let clients = eventrouter.getWorkflowManagerClients();
+ _.forOwn(clients, (client, _clientId) => {
+ let socket = client.getSocket();
+ if(socket) {
+ socket.emit(serviceEvents.WORKFLOW_KICKSTART, {
+ workflow_id: workflowId,
+ workflow_run_id: workflowRunId
+ });
+ }
+ });
+ return;
+ };
+
+ module.exports = {
+ serviceEvents: serviceEvents,
+ getRouter: getRouter,
+ kickstartWorkflow: kickstartWorkflow
+ };
+})();
\ No newline at end of file
diff --git a/src/controllers/ws_workflowrun.js b/src/controllers/ws_workflowrun.js
new file mode 100644
index 0000000..f7ee474
--- /dev/null
+++ b/src/controllers/ws_workflowrun.js
@@ -0,0 +1,183 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+ 'use strict';
+
+ const logger = require('../config/logger.js');
+
+ let serviceEvents = {
+ WORKFLOW_RUN_UPDATE_STATUS: 'cord.workflow.ctlsvc.workflow.run.status',
+ WORKFLOW_RUN_FETCH_EVENT: 'cord.workflow.ctlsvc.workflow.run.fetch'
+ };
+
+ // WebSocket interface for workflow status update
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflowrun.status',
+ // message: {
+ // workflow_id: <workflow_id>,
+ // workflow_run_id: <workflow_run_id>,
+ // task_id: <task_id>,
+ // status: 'begin' or 'end'
+ // }
+ // }
+ const updateWorkflowRunStatus = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_id' in message)) {
+ // error
+ errorMessage = `workflow_id field is not in message body - ${message}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_run_id' in message)) {
+ // error
+ errorMessage = `workflow_run_id field is not in message body - ${message}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('task_id' in message)) {
+ // error
+ errorMessage = `task_id field is not in message body - ${message}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('status' in message)) {
+ // error
+ errorMessage = `status field is not in message body - ${message}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let result = eventrouter.updateWorkflowRunStatus(
+ message.workflow_id,
+ message.workflow_run_id,
+ message.task_id,
+ message.status.toLowerCase()
+ );
+ cb(null, result);
+ return;
+ };
+
+ // WebSocket interface for workflow status update
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflowrun.fetch',
+ // message: {
+ // workflow_id: <workflow_id>,
+ // workflow_run_id: <workflow_run_id>,
+ // task_id: <task_id>,
+ // topic: <expected topic>
+ // }
+ // }
+ const fetchEvent = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_id' in message)) {
+ // error
+ errorMessage = `workflow_id field is not in message body - ${message}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_run_id' in message)) {
+ // error
+ errorMessage = `workflow_run_id field is not in message body - ${message}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('task_id' in message)) {
+ // error
+ errorMessage = `task_id field is not in message body - ${message}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('topic' in message)) {
+ // error
+ errorMessage = `topic field is not in message body - ${message}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let result = eventrouter.fetchEvent(
+ message.workflow_run_id,
+ message.task_id,
+ message.topic
+ );
+ if(result) {
+ // empty object {} when no message
+ cb(null, result);
+ }
+ else {
+ cb(
+ `could not fetch event ${message.topic} from workflow run ${message.workflow_run_id}`,
+ null
+ );
+ }
+ return;
+ };
+
+ const getRouter = () => {
+ return {
+ updateWorkflowRunStatus: {
+ topic: serviceEvents.WORKFLOW_RUN_UPDATE_STATUS,
+ handler: updateWorkflowRunStatus
+ },
+ fetchEvent: {
+ topic: serviceEvents.WORKFLOW_RUN_FETCH_EVENT,
+ handler: fetchEvent
+ }
+ };
+ };
+
+ module.exports = {
+ serviceEvents: serviceEvents,
+ getRouter: getRouter
+ };
+})();
\ No newline at end of file