Implement basic functionalities for workflow control.
- Manage join/leave of clients
- All clients communicate via socket.io
- Probes emit events
- Managers register workflows (by using a workflow essence)
- Send kickstart request to Managers to launch workflows
- Route events to workflow runs
- Queue events to not lose events between workflow tasks
- Fixed some issues found while working on testcases
- Set to perform coverage and unittest and generate outputs to files
Change-Id: I678723edc20df9247d63a4bf6380785ab8b2b221
diff --git a/src/config/config.js b/src/config/config.js
new file mode 100644
index 0000000..288e184
--- /dev/null
+++ b/src/config/config.js
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+(function () {
+ 'use strict';
+
+ const argv = require('yargs').argv;
+ const path = require('path');
+ const YamlConfig = require('node-yaml-config');
+ const logger = require('../config/logger.js');
+
+ // if a config file is specified in as a CLI arguments use that one
+ const cfgFile = argv.config || 'config.yml';
+
+ let config;
+ try {
+ logger.log('debug', `Loading ${path.join(__dirname, cfgFile)}`);
+ config = YamlConfig.load(path.join(__dirname, cfgFile));
+ logger.log('debug', `Parsed config: ${JSON.stringify(config)}`);
+ }
+ catch(e) {
+ logger.log('debug', `No ${cfgFile} found, using default params`);
+ }
+
+ module.exports = {
+ service: {
+ port: (config && config.service) ? config.service.port : 3000
+ }
+ };
+})();
\ No newline at end of file
diff --git a/src/config/config.yml b/src/config/config.yml
new file mode 100644
index 0000000..6fb1570
--- /dev/null
+++ b/src/config/config.yml
@@ -0,0 +1,17 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+default:
+ service:
+ port: 3030
diff --git a/src/config/logger.js b/src/config/logger.js
new file mode 100644
index 0000000..0a70904
--- /dev/null
+++ b/src/config/logger.js
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+(function () {
+ 'use strict';
+
+ const winston = require('winston');
+ const fs = require('fs');
+ const path = require('path');
+ const level = process.env.LOG_LEVEL || 'debug';
+ winston.level = level;
+
+ const logFile = path.join(__dirname, '../../logs/cord-workflow-control-service');
+
+ // clear old logs
+ ['error', 'debug'].forEach(l => {
+ try {
+ fs.statSync(`${logFile}.${l}.log`)
+ fs.unlinkSync(`${logFile}.${l}.log`);
+ }
+ catch(e) {
+ // log does not exist
+ }
+ });
+
+ // create a custom logger with colorized console and persistance to file
+ const logger = winston.createLogger({
+ transports: [
+ new (winston.transports.Console)({level: level, colorize: true}),
+ new (winston.transports.File)({name: 'error-log', level: 'error', filename: `${logFile}.error.log`}),
+ new (winston.transports.File)({name: 'debug-log', level: 'debug', filename: `${logFile}.debug.log`})
+ ]
+ });
+
+ module.exports = logger;
+})();
\ No newline at end of file
diff --git a/src/controllers/eventrouter.js b/src/controllers/eventrouter.js
new file mode 100644
index 0000000..82d27db
--- /dev/null
+++ b/src/controllers/eventrouter.js
@@ -0,0 +1,512 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+ 'use strict';
+
+ const _ = require('lodash');
+ const logger = require('../config/logger.js');
+ const Client = require('../types/client.js');
+ const WorkflowRun = require('../types/workflowrun.js');
+ const ws_manager = require('./ws_manager.js');
+ const ws_workflowrun = require('./ws_workflowrun.js');
+
+ let allClients = {}; // has publishers and subscribers
+ let probeClients = {}; // a subset of clients
+ let workflowManagerClients = {}; // a subset of clients
+ let workflowRunClients = {}; // a subset of clients
+
+ //let io;
+
+ // key: workflow id
+ // value: Workflow instance
+ let workflows = {};
+
+ // key: workflow run id
+ // value: WorkflowRun instance
+ let workflowRuns = {};
+
+ let serviceEvents = {
+ GREETING: 'cord.workflow.ctlsvc.greeting'
+ };
+
+ // add ws_mgroperation events
+ _.forOwn(ws_manager.serviceEvents, (wsServiceEvent, key) => {
+ serviceEvents[key] = wsServiceEvent;
+ });
+
+ // add ws_runoperation events
+ _.forOwn(ws_workflowrun.serviceEvents, (wsServiceEvent, key) => {
+ serviceEvents[key] = wsServiceEvent;
+ });
+
+ //const setIO = (ioInstance) => {
+ // io = ioInstance;
+ //};
+
+ const destroy = () => {
+ removeClients();
+ clearWorkflowRuns();
+ clearWorkflows();
+ };
+
+ const listWorkflows = () => {
+ let workflowList = [];
+ _.forOwn(workflows, (_workflow, workflowId) => {
+ workflowList.push(workflowId);
+ });
+ return workflowList;
+ };
+
+ const checkWorkflow = (workflowId) => {
+ if(workflowId in workflows) {
+ return true;
+ }
+ return false;
+ };
+
+ const addWorkflow = (workflow) => {
+ if(workflow.getId() in workflows) {
+ logger.log('error', `there exists a workflow with the same id - ${workflow.getId()}`);
+ return false;
+ }
+
+ let workflowId = workflow.getId();
+ workflows[workflowId] = workflow;
+ return true;
+ };
+
+ const clearWorkflows = () => {
+ _.forOwn(workflows, (_workflow, workflowId) => {
+ delete workflows[workflowId];
+ });
+ };
+
+ const listWorkflowRuns = () => {
+ let workflowRunList = [];
+ _.forOwn(workflowRuns, (_workflowRun, workflowRunId) => {
+ workflowRunList.push(workflowRunId);
+ });
+ return workflowRunList;
+ };
+
+ const checkWorkflowRun = (workflowRunId) => {
+ if(workflowRunId in workflowRuns) {
+ return true;
+ }
+ return false;
+ };
+
+ const addWorkflowRun = (workflowRun) => {
+ let workflowId = workflowRun.getWorkflowId();
+ let workflowRunId = workflowRun.getId();
+
+ if(workflowRunId in workflowRuns) {
+ logger.log('warn', `there exists a workflow run with the same id - ${workflowRunId}`);
+ return false;
+ }
+
+ if(!(workflowId in workflows)) {
+ logger.log('warn', `cannot find a workflow with id - ${workflowId}`);
+ return false;
+ }
+
+ workflowRuns[workflowRunId] = workflowRun;
+ return true;
+ };
+
+ const clearWorkflowRuns = () => {
+ _.forOwn(workflowRuns, (_workflowRun, workflowRunId) => {
+ delete workflowRuns[workflowRunId];
+ });
+ };
+
+ const updateWorkflowRunStatus = (workflowRunId, taskId, status) => {
+ if(!(workflowRunId in workflowRuns)) {
+ logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
+ return false;
+ }
+
+ let workflowRun = workflowRuns[workflowRunId];
+ workflowRun.updateTaskStatus(taskId, status);
+ return true;
+ };
+
+ const setWorkflowRunKickstarted = (workflowRunId) => {
+ if(!(workflowRunId in workflowRuns)) {
+ logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
+ return false;
+ }
+
+ let workflowRun = workflowRuns[workflowRunId];
+ workflowRun.setKickstarted();
+ return true;
+ };
+
+ const kickstart = (workflowId, workflowRunId) => {
+ if(!(workflowId in workflows)) {
+ logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
+ return false;
+ }
+
+ if(!(workflowRunId in workflowRuns)) {
+ logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
+ return false;
+ }
+
+ ws_manager.kickstartWorkflow(workflowId, workflowRunId);
+ };
+
+ const removeWorkflow = (workflowId) => {
+ if(!(workflowId in workflows)) {
+ logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
+ return false;
+ }
+
+ // check if there are workflow runs
+ _.forOwn(workflowRuns, (workflowRun, _workflowRunId) => {
+ if(workflowRun.getWorkflowId() === workflowId) {
+ logger.log('warn', `there exists a workflow run for a workflow id - ${workflowId}`);
+ return false;
+ }
+ });
+
+ delete workflows[workflowId];
+ return true;
+ };
+
+ const removeWorkflowRun = (workflowRunId) => {
+ if(!(workflowRunId in workflowRuns)) {
+ logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
+ return false;
+ }
+
+ let workflowRun = workflowRuns[workflowRunId];
+ delete workflowRuns[workflowRunId];
+
+ workflowRun.setFinished();
+ return true;
+ };
+
+ const sendEvent = (topic, message) => {
+ // list of workflowIds
+ // to check if there are workflow runs for the events
+ let workflowIdsRunning = [];
+
+ // route event to running instances
+ _.forOwn(workflowRuns, (workflowRun, workflowRunId) => {
+ let workflowId = workflowRun.getWorkflowId();
+ let workflow = workflows[workflowId];
+
+ // event will be routed to workflow runs that meet following criteria
+ // 1) the workflow is currently interested in the same topic
+ // (already finished tasks are not counted)
+ // 2) the task's key field and value
+ if(workflowRun.isEventAcceptable(workflow, topic, message)) {
+ logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
+ workflowRun.enqueueEvent(topic, message);
+
+ if(!workflowIdsRunning.includes(workflowId)) {
+ workflowIdsRunning.push(workflowId);
+ }
+ }
+ });
+
+ // check if the event is a kickstart event
+ _.forOwn(workflows, (workflow, workflowId) => {
+ if(workflow.isKickstartTopic(topic)) {
+ // check if there is a workflow run for the event
+ // kickstart a workflow if there is no workflows runs for the event
+ if(!workflowIdsRunning.includes(workflowId)) {
+ // we need to buffer the event until workflow run is brought up
+ let workflowRun = WorkflowRun.WorkflowRun.makeNewRun(workflow);
+ let workflowRunId = workflowRun.getId();
+
+ // register for management
+ workflowRuns[workflowRunId] = workflowRun;
+
+ // route event
+ logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
+ workflowRun.enqueueEvent(topic, message);
+
+ // KICKSTART!
+ kickstart(workflowId, workflowRunId);
+ }
+ }
+ });
+ };
+
+ const fetchEvent = (workflowRunId, taskId, topic) => {
+ // this returns an event or an empty obj when there is no message
+ if(!(workflowRunId in workflowRuns)) {
+ logger.log('warn', `workflow run ${workflowRunId} does not exist`);
+ return null;
+ }
+
+ let workflowRun = workflowRuns[workflowRunId];
+ let workflowId = workflowRun.getWorkflowId();
+
+ if(!(workflowId in workflows)) {
+ logger.log('warn', `workflow ${workflowId} does not exist`);
+ return null;
+ }
+
+ let workflow = workflows[workflowId];
+
+ let task = workflow.getTask(taskId);
+ if(!task) {
+ logger.log('warn', `workflow ${workflowId} does not have task ${taskId}`);
+ return null;
+ }
+
+ logger.log('debug', `workflow run ${workflowRunId}, task ${taskId} fetches an event`);
+
+ let event = workflowRun.dequeueEvent(topic);
+ if(event) {
+ return event;
+ }
+ else {
+ return {};
+ }
+ };
+
+ const addClient = (c) => {
+ let clientId = c.getId();
+ let socket = c.getSocket();
+
+ // check id that client is already there
+ if(clientId in allClients) {
+ logger.log('warn', `there exists a client with the same id - ${clientId}`);
+ return false;
+ }
+
+ if(c.getType() === Client.Type.PROBE) {
+ // probe' messages are relayed
+ // relay messages based on topic
+ // probe protocol:
+ // REQ:
+ // topic: event topic
+ // message: <data>
+ // RES:
+ // topic: topic sent
+ // message: {result: <true/false>, message: <error message> }
+ allClients[clientId] = c;
+ probeClients[clientId] = c;
+
+ socket.on('*', (msg) => {
+ let jsonMsg = msg.data;
+ if(jsonMsg.length === 2) {
+ // must have two parts
+ // first part is topic
+ // second part is message body
+ let topic = jsonMsg[0];
+ let messageBody = jsonMsg[1];
+
+ sendEvent(topic, messageBody);
+
+ // return true for success
+ socket.emit(topic, {
+ result: true
+ });
+ }
+ else {
+ logger.log('warn', `unexpected message is received - ${JSON.stringify(jsonMsg)}`);
+ socket.emit(jsonMsg[0], {
+ result: false,
+ message: `unexpected message is received - ${JSON.stringify(jsonMsg)}`
+ });
+ }
+ });
+ return true;
+ }
+ else if(c.getType() === Client.Type.WORKFLOW_MANAGER) {
+ // manager
+ // manager protocol:
+ // REQ:
+ // topic: operation
+ // message: <data>
+ // RES:
+ // topic: topic sent
+ // message: {result: <true/false>, message: <error message> }F
+ allClients[clientId] = c;
+ workflowManagerClients[clientId] = c;
+
+ // attach manager operations
+ let router = ws_manager.getRouter();
+ _.forOwn(router, (routerElem, _key) => {
+ socket.on(routerElem.topic, (msg) => {
+ routerElem.handler(routerElem.topic, msg, (err, result) => {
+ if(err) {
+ logger.log('warn', `unable to handle a message - ${err}`);
+ socket.emit(routerElem.topic, {
+ result: false,
+ message: err
+ });
+ return;
+ }
+
+ if(routerElem.return === undefined || routerElem.return) {
+ socket.emit(routerElem.topic, {
+ result: result
+ });
+ }
+ });
+ });
+ });
+ return true;
+ }
+ else if(c.getType() === Client.Type.WORKFLOW_RUN) {
+ // workflow run
+ // workflow run protocol:
+ // REQ:
+ // topic: operation
+ // message: <data>
+ // RES:
+ // topic: topic sent
+ // message: {result: <true/false>, message: <error message> }
+
+ // map to WorkflowRun instance
+ let workflowId = c.getWorkflowId();
+ let workflowRunId = c.getWorkflowRunId();
+ let workflowRun;
+
+ if(!(workflowId in workflows)) {
+ logger.log('warn', `cannot find a workflow ${workflowId}`);
+ return false;
+ }
+
+ // register client to workflow run
+ if(!(workflowRunId in workflowRuns)) {
+ // workflow run not exist yet
+ logger.log('warn', `cannot find a workflow run ${workflowRunId}`);
+ return false;
+ }
+
+ //let workflow = workflows[workflowId];
+
+ allClients[clientId] = c;
+ workflowRunClients[clientId] = c;
+
+ // update
+ workflowRun = workflowRuns[workflowRunId];
+ workflowRun.addClientId(clientId);
+
+ // attach workflow run operations
+ let router = ws_workflowrun.getRouter();
+ _.forOwn(router, (routerElem, _key) => {
+ socket.on(routerElem.topic, (msg) => {
+ routerElem.handler(routerElem.topic, msg, (err, result) => {
+ if(err) {
+ logger.log('warn', `unable to handle a message - ${err}`);
+ socket.emit(routerElem.topic, {
+ result: false,
+ message: err
+ });
+ return;
+ }
+
+ if(routerElem.return === undefined || routerElem.return) {
+ socket.emit(routerElem.topic, {
+ result: result
+ });
+ }
+ });
+ });
+ });
+ return true;
+ }
+ return false;
+ };
+
+ const removeClient = (id) => {
+ if(id in allClients) {
+ let removedClient = allClients[id];
+ delete allClients[id];
+
+ let type = removedClient.getType();
+ if(type === Client.Type.PROBE) {
+ delete probeClients[id];
+ }
+ else if(type === Client.Type.WORKFLOW_MANAGER) {
+ delete workflowManagerClients[id];
+ }
+ else if(type === Client.Type.WORKFLOW_RUN) {
+ delete workflowRunClients[id];
+
+ let workflowRunId = removedClient.getWorkflowRunId();
+ let workflowRun = workflowRuns[workflowRunId];
+
+ if(workflowRun) {
+ workflowRun.removeClientId(id);
+
+ //TODO
+ // WorkflowRun can have no clients between tasks
+ // So we should not remove the run until the workflow run finishes
+ }
+ }
+ }
+ };
+
+ const removeClients = () => {
+ let probeClients = {};
+
+ _.forOwn(probeClients, (_probeClient, clientId) => {
+ delete probeClients[clientId];
+ });
+
+ _.forOwn(workflowManagerClients, (_workflowManagerClient, clientId) => {
+ delete workflowManagerClients[clientId];
+ });
+
+ _.forOwn(workflowRunClients, (_workflowRunClients, clientId) => {
+ delete workflowRunClients[clientId];
+ });
+
+ _.forOwn(allClients, (client, clientId) => {
+ client.getSocket().disconnect(true);
+ delete allClients[clientId];
+ });
+ }
+
+ module.exports = {
+ serviceEvents: serviceEvents,
+ destroy: destroy,
+ getClients: () => { return allClients; },
+ getProbeClients: () => { return probeClients; },
+ getWorkflowManagerClients: () => { return workflowManagerClients; },
+ getWorkflowRunClients: () => { return workflowRunClients; },
+ clientType: Client.Type,
+ //setIO: setIO,
+ sendEvent: sendEvent,
+ fetchEvent: fetchEvent,
+ addClient: addClient,
+ removeClient: removeClient,
+ removeClients: removeClients,
+ addWorkflow: addWorkflow,
+ listWorkflows: listWorkflows,
+ checkWorkflow: checkWorkflow,
+ removeWorkflow: removeWorkflow,
+ clearWorkflows: clearWorkflows,
+ addWorkflowRun: addWorkflowRun,
+ listWorkflowRuns: listWorkflowRuns,
+ checkWorkflowRun: checkWorkflowRun,
+ removeWorkflowRun: removeWorkflowRun,
+ clearWorkflowRuns: clearWorkflowRuns,
+ updateWorkflowRunStatus: updateWorkflowRunStatus,
+ setWorkflowRunKickstarted: setWorkflowRunKickstarted,
+ };
+})();
\ No newline at end of file
diff --git a/src/controllers/rest_probe.js b/src/controllers/rest_probe.js
new file mode 100644
index 0000000..9d67386
--- /dev/null
+++ b/src/controllers/rest_probe.js
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+ 'use strict';
+
+ const express = require('express');
+ const {checkSchema, validationResult} = require('express-validator');
+ const logger = require('../config/logger.js');
+ const eventrouter = require('./eventrouter.js');
+
+ // HTTP REST interface for message intake
+ // POST method
+ // Message format:
+ // {
+ // topic: 'topic here',
+ // message: 'message body here'
+ // }
+ // e.g., /intake?topic=aaa&message=bbb
+ const intakeMessageInputValidator = {
+ topic: {
+ in: ['params', 'query'],
+ errorMessage: 'Message topic is null or empty',
+ },
+ message: {
+ in: ['params', 'query'],
+ errorMessage: 'Message body is null or empty',
+ }
+ };
+
+ const intakeMessage = (req, res) => {
+ let errors = validationResult(req);
+ if(!errors.isEmpty()) {
+ res.status(400).send(
+ JSON.stringify({
+ errors: errors.array()
+ })
+ );
+ return;
+ }
+
+ let jsonMessage = req.body
+ logger.debug(`Received a message ${jsonMessage}`);
+
+ // send the message to the event distributor
+ eventrouter.sendEvent(jsonMessage.topic, jsonMessage.message);
+
+ res.status(200).send({
+ result: true
+ });
+ return;
+ };
+
+ const getRouter = () => {
+ var routerInstance = new express.Router();
+ routerInstance.use((req, res, next) => {
+ logger.info(`[REQ] ${req.method}, ${req.url}`);
+ next();
+ });
+
+ // intake apis
+ routerInstance.post('/intake', checkSchema(intakeMessageInputValidator), intakeMessage);
+ return routerInstance;
+ };
+
+ module.exports = {
+ getRouter: getRouter
+ };
+})();
\ No newline at end of file
diff --git a/src/controllers/websocket.js b/src/controllers/websocket.js
new file mode 100644
index 0000000..3f3216a
--- /dev/null
+++ b/src/controllers/websocket.js
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+(function () {
+ 'use strict';
+
+ const socketio = require('socket.io');
+ const ioWildcard = require('socketio-wildcard');
+ const client = require('../types/client.js');
+ const eventrouter = require('./eventrouter.js');
+ const logger = require('../config/logger.js');
+
+ let io;
+ const createSocketIO = (server) => {
+ // INSTANTIATE SOCKET.IO
+ io = socketio.listen(server);
+ io.use(ioWildcard());
+
+ // set io to eventrouter
+ //eventrouter.setIO(io);
+
+ // LISTEN TO "CONNECTION" EVENT (FROM SOCKET.IO)
+ io.on('connection', (socket) => {
+ let query = socket.handshake.query;
+ logger.log('debug', `connect ${JSON.stringify(query)}`);
+ let added = false;
+
+ // make a client
+ let c = client.Client.fromObj(query);
+ c.setSocket(socket);
+
+ if(!c.validate()) {
+ logger.log('warn', `client validation failed - ${JSON.stringify(query)}`);
+ return;
+ }
+
+ // register the client for management
+ if(eventrouter.addClient(c)) {
+ // Send a greeting message to the client
+ socket.emit(eventrouter.serviceEvents.GREETING, {
+ to: c.getId(),
+ message: 'Welcome to CORD Workflow Control Service'
+ });
+
+ added = true;
+ }
+ else {
+ logger.log('warn', `client could not be added - ${JSON.stringify(query)}`);
+ socket.disconnect(true);
+ }
+
+ // set a disconnect event handler
+ socket.on('disconnect', (reason) => {
+ logger.log('debug', `disconnect ${reason} ${JSON.stringify(query)}`);
+ if(added) {
+ eventrouter.removeClient(c.getId());
+ }
+ });
+ });
+ };
+
+ const destroySocketIO = () => {
+ io.close();
+ };
+
+ const getSocketIO = () => io;
+
+ module.exports = {
+ create: createSocketIO,
+ destroy: destroySocketIO,
+ get: getSocketIO
+ };
+
+ // USAGE
+ // const socketIo = require('./controllers/websocket.js');
+ // const socket = socketIo.get();
+ // socket.emit('eventName', data);
+
+})();
\ No newline at end of file
diff --git a/src/controllers/ws_manager.js b/src/controllers/ws_manager.js
new file mode 100644
index 0000000..b3e8877
--- /dev/null
+++ b/src/controllers/ws_manager.js
@@ -0,0 +1,328 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+ 'use strict';
+
+ const _ = require('lodash');
+ const Workflow = require('../types/workflow.js');
+ const logger = require('../config/logger.js');
+
+ let serviceEvents = {
+ WORKFLOW_REG: 'cord.workflow.ctlsvc.workflow.reg',
+ WORKFLOW_REG_ESSENCE: 'cord.workflow.ctlsvc.workflow.reg_essence',
+ WORKFLOW_LIST: 'cord.workflow.ctlsvc.workflow.list',
+ WORKFLOW_RUN_LIST: 'cord.workflow.ctlsvc.workflow.run.list',
+ WORKFLOW_CHECK: 'cord.workflow.ctlsvc.workflow.check',
+ WORKFLOW_KICKSTART: 'cord.workflow.ctlsvc.workflow.kickstart',
+ WORKFLOW_REMOVE: 'cord.workflow.ctlsvc.workflow.remove',
+ WORKFLOW_RUN_REMOVE: 'cord.workflow.ctlsvc.workflow.run.remove'
+ };
+
+ // WebSocket interface for workflow registration
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.reg',
+ // message: <workflow>
+ // }
+ const registWorkflow = (topic, message, cb) => {
+ const distributor = require('./eventrouter.js/index.js');
+
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let workflow = message;
+
+ logger.log('debug', `workflow - ${JSON.stringify(workflow)}`);
+
+ let result = distributor.addWorkflow(workflow);
+ if(!result) {
+ errorMessage = `failed to register a workflow ${workflow.getId()}`;
+ cb(errorMessage, false);
+ }
+ else {
+ cb(null, true);
+ }
+ return;
+ };
+
+ // WebSocket interface for workflow registration (via essence)
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.reg',
+ // message: <workflow essence>
+ // }
+ const registerWorkflowEssence = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let essence = message;
+ let result = true;
+ let errorResults = [];
+
+ logger.log('debug', `workflow essence - ${JSON.stringify(essence)}`);
+
+ let workflows = Workflow.loadWorkflowsFromEssence(essence);
+ workflows.forEach((workflow) => {
+ if(workflow) {
+ let localResult = eventrouter.addWorkflow(workflow);
+ errorResults.push(localResult);
+ result = result && localResult; // false if any of registrations fails
+ }
+ });
+
+ if(!result) {
+ errorMessage = `failed to register workflows ${errorResults}`;
+ cb(errorMessage, false);
+ }
+ else {
+ cb(null, true);
+ }
+ return;
+ };
+
+ // WebSocket interface for workflow listing
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.list',
+ // message: null
+ // }
+ const listWorkflows = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let result = eventrouter.listWorkflows();
+ cb(null, result);
+ return;
+ };
+
+ // WebSocket interface for workflow run listing
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.list',
+ // message: null
+ // }
+ const listWorkflowRuns = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let result = eventrouter.listWorkflowRuns();
+ cb(null, result);
+ return;
+ };
+
+ // WebSocket interface for workflow check
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.check',
+ // message: <workflow_id>
+ // }
+ const checkWorkflow = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let workflowId = message;
+ let result = eventrouter.checkWorkflow(workflowId);
+ cb(null, result);
+ return;
+ };
+
+ // WebSocket interface for workflow start notification
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.kickstart',
+ // message: {
+ // workflow_id: <workflow_id>,
+ // workflow_run_id: <workflow_run_id>
+ // }
+ // }
+ const notifyWorkflowStart = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_id' in message)) {
+ // error
+ errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_run_id' in message)) {
+ // error
+ errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let workflowRunId = message.workflow_run_id;
+
+ // there must be a workflow matching
+ // set the workflow kickstarted
+ eventrouter.setWorkflowRunKickstarted(workflowRunId);
+ cb(null, true);
+ return;
+ }
+
+ // WebSocket interface for workflow removal
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.remove',
+ // message: <workflow_id>
+ // }
+ const removeWorkflow = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let workflowId = message;
+ let result = eventrouter.removeWorkflow(workflowId);
+ cb(null, result);
+ return;
+ }
+
+ // WebSocket interface for workflow run removal
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.run.remove',
+ // message: {
+ // workflow_id: <workflow_id>,
+ // workflow_run_id: <workflow_run_id>
+ // }
+ // }
+ const removeWorkflowRun = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_id' in message)) {
+ // error
+ errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_run_id' in message)) {
+ // error
+ errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let workflowRunId = message.workflow_run_id;
+
+ let result = eventrouter.removeWorkflowRun(workflowRunId);
+ cb(null, result);
+ return;
+ }
+
+ const getRouter = () => {
+ return {
+ registWorkflow: {
+ topic: serviceEvents.WORKFLOW_REG,
+ handler: registWorkflow
+ },
+ registerWorkflowEssence: {
+ topic: serviceEvents.WORKFLOW_REG_ESSENCE,
+ handler: registerWorkflowEssence
+ },
+ listWorkflows: {
+ topic: serviceEvents.WORKFLOW_LIST,
+ handler: listWorkflows
+ },
+ listWorkflowRuns: {
+ topic: serviceEvents.WORKFLOW_RUN_LIST,
+ handler: listWorkflowRuns
+ },
+ checkWorkflow: {
+ topic: serviceEvents.WORKFLOW_CHECK,
+ handler: checkWorkflow
+ },
+ notifyWorkflowStart: {
+ topic: serviceEvents.WORKFLOW_KICKSTART,
+ handler: notifyWorkflowStart,
+ return: false
+ },
+ removeWorkflow: {
+ topic: serviceEvents.WORKFLOW_REMOVE,
+ handler: removeWorkflow
+ },
+ removeWorkflowRun: {
+ topic: serviceEvents.WORKFLOW_RUN_REMOVE,
+ handler: removeWorkflowRun
+ }
+ };
+ };
+
+ // out-going commands
+ const kickstartWorkflow = (workflowId, workflowRunId) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let clients = eventrouter.getWorkflowManagerClients();
+ _.forOwn(clients, (client, _clientId) => {
+ let socket = client.getSocket();
+ if(socket) {
+ socket.emit(serviceEvents.WORKFLOW_KICKSTART, {
+ workflow_id: workflowId,
+ workflow_run_id: workflowRunId
+ });
+ }
+ });
+ return;
+ };
+
+ module.exports = {
+ serviceEvents: serviceEvents,
+ getRouter: getRouter,
+ kickstartWorkflow: kickstartWorkflow
+ };
+})();
\ No newline at end of file
diff --git a/src/controllers/ws_workflowrun.js b/src/controllers/ws_workflowrun.js
new file mode 100644
index 0000000..f7ee474
--- /dev/null
+++ b/src/controllers/ws_workflowrun.js
@@ -0,0 +1,183 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+ 'use strict';
+
+ const logger = require('../config/logger.js');
+
+ let serviceEvents = {
+ WORKFLOW_RUN_UPDATE_STATUS: 'cord.workflow.ctlsvc.workflow.run.status',
+ WORKFLOW_RUN_FETCH_EVENT: 'cord.workflow.ctlsvc.workflow.run.fetch'
+ };
+
+ // WebSocket interface for workflow status update
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflowrun.status',
+ // message: {
+ // workflow_id: <workflow_id>,
+ // workflow_run_id: <workflow_run_id>,
+ // task_id: <task_id>,
+ // status: 'begin' or 'end'
+ // }
+ // }
+ const updateWorkflowRunStatus = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_id' in message)) {
+ // error
+ errorMessage = `workflow_id field is not in message body - ${message}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_run_id' in message)) {
+ // error
+ errorMessage = `workflow_run_id field is not in message body - ${message}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('task_id' in message)) {
+ // error
+ errorMessage = `task_id field is not in message body - ${message}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('status' in message)) {
+ // error
+ errorMessage = `status field is not in message body - ${message}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let result = eventrouter.updateWorkflowRunStatus(
+ message.workflow_id,
+ message.workflow_run_id,
+ message.task_id,
+ message.status.toLowerCase()
+ );
+ cb(null, result);
+ return;
+ };
+
+ // WebSocket interface for workflow status update
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflowrun.fetch',
+ // message: {
+ // workflow_id: <workflow_id>,
+ // workflow_run_id: <workflow_run_id>,
+ // task_id: <task_id>,
+ // topic: <expected topic>
+ // }
+ // }
+ const fetchEvent = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_id' in message)) {
+ // error
+ errorMessage = `workflow_id field is not in message body - ${message}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_run_id' in message)) {
+ // error
+ errorMessage = `workflow_run_id field is not in message body - ${message}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('task_id' in message)) {
+ // error
+ errorMessage = `task_id field is not in message body - ${message}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('topic' in message)) {
+ // error
+ errorMessage = `topic field is not in message body - ${message}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let result = eventrouter.fetchEvent(
+ message.workflow_run_id,
+ message.task_id,
+ message.topic
+ );
+ if(result) {
+ // empty object {} when no message
+ cb(null, result);
+ }
+ else {
+ cb(
+ `could not fetch event ${message.topic} from workflow run ${message.workflow_run_id}`,
+ null
+ );
+ }
+ return;
+ };
+
+ const getRouter = () => {
+ return {
+ updateWorkflowRunStatus: {
+ topic: serviceEvents.WORKFLOW_RUN_UPDATE_STATUS,
+ handler: updateWorkflowRunStatus
+ },
+ fetchEvent: {
+ topic: serviceEvents.WORKFLOW_RUN_FETCH_EVENT,
+ handler: fetchEvent
+ }
+ };
+ };
+
+ module.exports = {
+ serviceEvents: serviceEvents,
+ getRouter: getRouter
+ };
+})();
\ No newline at end of file
diff --git a/src/server.js b/src/server.js
new file mode 100644
index 0000000..6f70d3a
--- /dev/null
+++ b/src/server.js
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+ 'use strict';
+
+ const express = require('express');
+ const config = require('./config/config.js').service;
+ const bodyParser = require('body-parser');
+ const cors = require('cors');
+ const socketIo = require('./controllers/websocket.js');
+ const eventrouter = require('./controllers/eventrouter.js');
+ const workflowLoader = require('./workflows/loader.js');
+ const logger = require('./config/logger.js');
+ const rest_probe = require('./controllers/rest_probe.js');
+
+ const app = express();
+
+ // Apply middlewares
+ app.use(cors());
+ app.use(bodyParser.json());
+
+ // Set a router for intake interface
+ app.use('/', rest_probe.getRouter());
+
+ const startServer = (port) => {
+ // if is running just return it
+ if(app.server) {
+ return app.server;
+ }
+
+ const server = app.listen(port || config.port, () => {
+ logger.info(`Express is listening to http://localhost:${port || config.port}`);
+
+ // once server is ready setup WebSocket
+ socketIo.create(server);
+
+ // load built-in workflows
+ let workflows = workflowLoader.loadAllWorkflows();
+ for(let workflow in workflows) {
+ eventrouter.addWorkflow(workflow);
+ }
+ });
+ app.server = server;
+ return server;
+ };
+
+ const stopServer = () => {
+ if(app.server) {
+ socketIo.destroy();
+ app.server.close();
+ app.server = undefined;
+ eventrouter.destroy();
+ }
+ }
+
+ if(!module.parent) {
+ startServer();
+ }
+
+ module.exports = {
+ serviceEvents: eventrouter.serviceEvents,
+ app: app,
+ start: startServer,
+ stop: stopServer
+ };
+})();
\ No newline at end of file
diff --git a/src/types/client.js b/src/types/client.js
new file mode 100644
index 0000000..02e8eb6
--- /dev/null
+++ b/src/types/client.js
@@ -0,0 +1,184 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+ 'use strict';
+
+ const _ = require('lodash');
+ const logger = require('../config/logger.js');
+
+ const ClientType = {
+ PROBE: 'probe',
+ WORKFLOW_MANAGER: 'workflow_manager',
+ WORKFLOW_RUN: 'workflow_run',
+ UNKNOWN: 'unknown'
+ };
+
+ class Client {
+ constructor(id) {
+ this.id = id.toLowerCase();
+ // a field value can be one of followings
+ // - probe : message publisher
+ // - workflow_manager : workflow manager
+ // - workflow_run : workflow run
+ this.type = ClientType.UNKNOWN;
+ // used by workflow_run
+ this.workflowId = null;
+ this.workflowRunId = null;
+ this.socket = null;
+ // optional info.
+ this.params = {};
+ }
+
+ static parseClientType(strClientType) {
+ if(!strClientType) {
+ return ClientType.UNKNOWN;
+ }
+ else if(['probe', 'prb'].includes(strClientType.toLowerCase())) {
+ return ClientType.PROBE;
+ }
+ else if(['workflow_manager', 'manager'].includes(strClientType.toLowerCase())) {
+ return ClientType.WORKFLOW_MANAGER;
+ }
+ else if(['workflow_run', 'run'].includes(strClientType.toLowerCase())) {
+ return ClientType.WORKFLOW_RUN;
+ }
+ else {
+ return ClientType.UNKNOWN;
+ }
+ }
+
+ static fromObj(obj) {
+ if(obj) {
+ let client;
+ if('id' in obj) {
+ client = new Client(obj['id']);
+ }
+ else {
+ logger.log('error', 'id is not given');
+ return null;
+ }
+
+ if('type' in obj) {
+ client.setType(obj.type);
+ }
+
+ if('workflow_id' in obj) {
+ client.setWorkflowId(obj.workflow_id);
+ }
+
+ if('workflow_run_id' in obj) {
+ client.setWorkflowRunId(obj.workflow_run_id);
+ }
+
+ if('socket' in obj) {
+ client.setSocket(obj.socket);
+ }
+
+ // all others are sent to params
+ client.params = {};
+ _.forOwn(obj, (val, key) => {
+ client.params[key] = val;
+ });
+ return client;
+ }
+ else {
+ return null;
+ }
+ }
+
+ setId(id) {
+ this.id = id.toLowerCase();
+ }
+
+ getId() {
+ return this.id;
+ }
+
+ setType(type) {
+ let clientType = Client.parseClientType(type);
+ this.type = clientType;
+ }
+
+ getType() {
+ return this.type;
+ }
+
+ setWorkflowId(id) {
+ this.workflowId = id;
+ }
+
+ getWorkflowId() {
+ return this.workflowId;
+ }
+
+ setWorkflowRunId(id) {
+ this.workflowRunId = id;
+ }
+
+ getWorkflowRunId() {
+ return this.workflowRunId;
+ }
+
+ setParams(params={}) {
+ this.params = params;
+ }
+
+ getParams() {
+ return this.params;
+ }
+
+ setSocket(socket) {
+ this.socket = socket;
+ }
+
+ getSocket() {
+ return this.socket;
+ }
+
+ validate() {
+ // id field is required for all types of clients
+ if(!this.id) {
+ logger.log('error', 'id is not given');
+ return false;
+ }
+
+ if(this.type === ClientType.UNKNOWN) {
+ logger.log('error', 'type is not given properly');
+ return false;
+ }
+
+ if(this.type === ClientType.WORKFLOW_RUN) {
+ if(!this.workflowId) {
+ logger.log('error', 'workflowId is not given');
+ return false;
+ }
+
+ if(!this.workflowRunId) {
+ logger.log('error', 'workflowRunId is not given');
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+
+ module.exports = {
+ Type: ClientType,
+ Client: Client
+ };
+})();
\ No newline at end of file
diff --git a/src/types/workflow.js b/src/types/workflow.js
new file mode 100644
index 0000000..11ace2f
--- /dev/null
+++ b/src/types/workflow.js
@@ -0,0 +1,237 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+ 'use strict';
+
+ const _ = require('lodash');
+ const WorkflowTask = require('./workflowtask.js');
+ const logger = require('../config/logger.js');
+
+ const loadWorkflowsFromEssence = (essence) => {
+ // an essence can have multiple workflows
+ let workflows = [];
+ _.forOwn(essence, (workflowEssence, _workflowId) => {
+ let workflow = Workflow.fromEssence(workflowEssence);
+ if(workflow) {
+ workflows.push(workflow);
+ }
+ });
+ return workflows;
+ };
+
+ class Workflow {
+ constructor(id) {
+ // dag_id
+ this.id = id;
+
+ // key: topic
+ // value: an array of WorkflowTask objects
+ this.topics = {};
+
+ // key: task id
+ // value: WorkflowTask object
+ this.tasks = {};
+
+ // preserve raw essense
+ this.essence = {};
+ }
+
+ static fromEssence(essence) {
+ if(essence) {
+ let workflow;
+ if('dag' in essence) {
+ let dag = essence.dag;
+ if('dag_id' in dag) {
+ workflow = new Workflow(dag.dag_id);
+ }
+ else {
+ logger.log('error', 'dag is not given');
+ return null;
+ }
+ }
+ else {
+ logger.log('error', 'dag is not given');
+ return null;
+ }
+
+ // read this to detect kickstart events
+ // use map for fast look up
+ let headTasks = {};
+ if('dependencies' in essence) {
+ let dependencies = essence.dependencies;
+ _.forOwn(dependencies, (dependency, taskId) => {
+ // if the task does not have parents, it means the head task.
+ if(!('parents' in dependency)) {
+ // kickstart task
+ headTasks[taskId] = true;
+ }
+ else {
+ if(!dependency['parents'] || dependency['parents'].length === 0) {
+ // empty array
+ // kickstart task
+ headTasks[taskId] = true;
+ }
+ }
+ });
+ }
+
+ if('tasks' in essence) {
+ let tasks = essence.tasks;
+ _.forOwn(tasks, (taskEssence, _taskId) => {
+ let task = WorkflowTask.WorkflowTask.fromEssence(taskEssence);
+
+ // if its in head tasks, it has a kickstart event.
+ if(task.getId() in headTasks) {
+ task.setKickstart(true);
+ }
+
+ workflow.addTask(task);
+ });
+ }
+
+ workflow.essence = essence;
+ return workflow;
+ }
+ return undefined;
+ }
+
+ setId(id) {
+ this.id = id;
+ }
+
+ getId() {
+ return this.id;
+ }
+
+ getTopics() {
+ let allTopics = [];
+ _.forOwn(this.topics, (_tasks, topic) => {
+ // value is an array
+ if(!allTopics.includes(topic)) {
+ allTopics.push(topic);
+ }
+ });
+ return allTopics;
+ }
+
+ getTasksForTopic(topic) {
+ if(topic in this.topics) {
+ let workflowTasks = this.topics[topic];
+ return workflowTasks;
+ }
+ return undefined;
+ }
+
+ hasTasksForTopic(topic) {
+ if(topic in this.topics) {
+ return true;
+ }
+ return false;
+ }
+
+ getTasks() {
+ return this.tasks;
+ }
+
+ getTask(id) {
+ if(id in this.tasks) {
+ let workflowTask = this.tasks[id];
+ return workflowTask;
+ }
+ return undefined;
+ }
+
+ getKickstartTopics() {
+ let kickstartTopics = [];
+ _.forOwn(this.tasks, (task, _taskId) => {
+ if(task.isKickstart()) {
+ let topic = task.getTopic();
+ if(!kickstartTopics.includes(topic)) {
+ kickstartTopics.push(topic);
+ }
+ }
+ });
+ return kickstartTopics;
+ }
+
+ isKickstartTopic(topic) {
+ let kickstartTopics = this.getKickstartTopics();
+ if(kickstartTopics.includes(topic)) {
+ return true;
+ }
+ return false;
+ }
+
+ addTask(task) {
+ let taskId = task.getId();
+ if(taskId in this.tasks) {
+ logger.log('warn', `there exists a task with the same id - ${JSON.stringify(task)}`);
+ return false;
+ }
+
+ this.tasks[taskId] = task;
+
+ let taskTopic = task.getTopic();
+ if(taskTopic in this.topics) {
+ this.topics[taskTopic].push(task);
+ }
+ else {
+ this.topics[taskTopic] = [task];
+ }
+ return true;
+ }
+
+ setEssence(essence) {
+ this.essence = essence;
+ }
+
+ getEssence() {
+ return this.essence;
+ }
+
+ validate() {
+ if(!this.id) {
+ logger.log('error', 'id is not given');
+ return false;
+ }
+
+ if(!this.tasks || Object.keys(this.tasks).length > 0) {
+ logger.log('error', 'task is not given');
+ return false;
+ }
+
+ let countKickstartEvent = 0;
+ _.forOwn(this.tasks, (task, _taskId) => {
+ if(task.isKickstart()) {
+ countKickstartEvent++;
+ }
+ });
+
+ if(countKickstartEvent <= 0) {
+ logger.log('error', 'kickstart event is not given');
+ return false;
+ }
+ return true;
+ }
+ }
+
+ module.exports = {
+ Workflow: Workflow,
+ loadWorkflowsFromEssence: loadWorkflowsFromEssence
+ };
+})();
\ No newline at end of file
diff --git a/src/types/workflowrun.js b/src/types/workflowrun.js
new file mode 100644
index 0000000..65027ff
--- /dev/null
+++ b/src/types/workflowrun.js
@@ -0,0 +1,416 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+ 'use strict';
+
+ const _ = require('lodash');
+ const dateformat = require('dateformat');
+ const WorkflowRunTask = require('./workflowruntask.js');
+ const logger = require('../config/logger.js');
+
+ class WorkflowRun {
+ constructor(workflowId, workflowRunId) {
+ // workflow run id (dag_run_id)
+ this.id = workflowRunId;
+ // workflow id
+ this.workflowId = workflowId;
+
+ // workflow run tasks - for storing status
+ // id: task id
+ // value : workflow run task obj
+ this.runTasks = {};
+
+ // storing key-field, key-value pairs for <event, workflow run> mapping
+ // key: topic
+ // value: [{
+ // field:
+ // value:
+ // }, ...]
+ this.eventKeyFieldValues = {};
+
+ // client ids
+ this.clientIds = [];
+
+ // event queue
+ // {
+ // topic: topic,
+ // message: message
+ // }
+ this.eventQueue = [];
+ // trash bins
+ // dequeued events are sent to this queue
+ // for debugging
+ this.trashEventQueue = [];
+
+ this.kickstarted = false;
+ this.finished = false;
+ }
+
+ static makeWorkflowRunId(workflowId) {
+ let now = new Date();
+ let datetimestr = dateformat(now, 'yyyymmdd_HHMMssl');
+ return `${workflowId}_${datetimestr}`;
+ }
+
+ static makeNewRun(workflow) {
+ let workflowId = workflow.getId();
+ let workflowRunId = WorkflowRun.makeWorkflowRunId(workflowId);
+ let workflowRun = new WorkflowRun(workflowId, workflowRunId);
+
+ let tasks = workflow.getTasks();
+ _.forOwn(tasks, (task, taskId) => {
+ // set run tasks
+ let runTask = new WorkflowRunTask.WorkflowRunTask(taskId);
+ workflowRun.addRunTask(runTask);
+
+ // set key_field / value
+ workflowRun.setEventKeyFieldValue(task.getTopic(), task.getKeyField(), null); // init
+ });
+ return workflowRun;
+ }
+
+ setId(id) {
+ this.id = id;
+ }
+
+ getId() {
+ return this.id;
+ }
+
+ setWorkflowId(workflowId) {
+ this.workflowId = workflowId;
+ }
+
+ getWorkflowId() {
+ return this.workflowId;
+ }
+
+ addRunTask(runTask) {
+ this.runTasks[runTask.getTaskId()] = runTask;
+ }
+
+ getRunTask(taskId) {
+ if(taskId in this.runTasks) {
+ return this.runTasks[taskId];
+ }
+ return undefined;
+ }
+
+ getTaskStatus(taskId) {
+ return this.runTasks[taskId].getStatus();
+ }
+
+ updateTaskStatus(taskId, status) {
+ let runTask = this.runTasks[taskId].getStatus();
+ runTask.setStatus(status);
+ }
+
+ setEventKeyFieldValue(topic, field, value=null) {
+ let keyFieldValues;
+ if(!(topic in this.eventKeyFieldValues)) {
+ keyFieldValues = [];
+ // put a new empty array
+ this.eventKeyFieldValues[topic] = keyFieldValues;
+ }
+ else {
+ keyFieldValues = this.eventKeyFieldValues[topic];
+ }
+
+ let index = _.findIndex(keyFieldValues, (keyFieldValue) => {
+ return keyFieldValue.field === field;
+ });
+
+ if(index >= 0) {
+ // update
+ keyFieldValues[index] = {
+ field: field,
+ value: value
+ };
+ }
+ else {
+ // push a new
+ keyFieldValues.push({
+ field: field,
+ value: value
+ });
+ }
+ return true;
+ }
+
+ isEventKeyFieldValueAcceptable(topic, field, value) {
+ if(!(topic in this.eventKeyFieldValues)) {
+ // topic does not exist
+ return false;
+ }
+
+ let keyFieldValues = this.eventKeyFieldValues[topic];
+ let index = _.findIndex(keyFieldValues, (keyFieldValue) => {
+ return (keyFieldValue.field === field) &&
+ ((!keyFieldValue.value) || (keyFieldValue.value === value));
+ });
+
+ if(index >= 0) {
+ return true;
+ }
+ return false;
+ }
+
+ isEventAcceptableByKeyFieldValue(topic, message) {
+ if(!(topic in this.eventKeyFieldValues)) {
+ // topic does not exist
+ return false;
+ }
+
+ let keyFieldValues = this.eventKeyFieldValues[topic];
+ keyFieldValues.forEach((keyFieldValue) => {
+ if(keyFieldValue.field in message) {
+ // has same field in the message
+ // check value
+ if(keyFieldValue.value === message[keyFieldValue.field]) {
+ // has the same value
+ return true;
+ }
+ }
+ });
+ return false;
+ }
+
+ getFilteredRunTasks(includes, excludes) {
+ // returns tasks with filters
+ let includeStatuses=[];
+ let excludeStatuses=[];
+ let includeAll = false;
+
+ if(includes) {
+ if(Array.isArray(includes)) {
+ // array
+ includes.forEach((include) => {
+ if(!includeStatuses.includes(include)) {
+ includeStatuses.push(include);
+ }
+ });
+ }
+ else {
+ includeStatuses.push(includes);
+ }
+ }
+ else {
+ // undefined or null
+ // include all
+ includeAll = true;
+ }
+
+ if(excludes) {
+ if(Array.isArray(excludes)) {
+ // array
+ excludes.forEach((exclude) => {
+ if(!excludeStatuses.includes(exclude)) {
+ excludeStatuses.push(exclude);
+ }
+ });
+ }
+ else {
+ excludeStatuses.push(excludes);
+ }
+ }
+ else {
+ // in this case, nothing will be excluded
+ // leave the array empty
+ }
+
+ let filteredRunTasks = [];
+ _.forOwn(this.runTasks, (runTask, _runTaskId) => {
+ // 'excludes' has a higher priority than 'includes'
+ if(!excludes.includes(runTask.getStatus())) {
+ if(includeAll || includes.includes(runTask.getStatus())) {
+ // screen tasks that are not finished
+ filteredRunTasks.push(runTask);
+ }
+ }
+ });
+ return filteredRunTasks;
+ }
+
+ getFilteredTopics(workflow, includes, excludes) {
+ // returns topics with filters
+ let filteredRunTasks = this.getFilteredRunTasks(includes, excludes);
+ let filteredTopics = [];
+
+ filteredRunTasks.forEach((runTask) => {
+ let taskId = runTask.getTaskId();
+ let task = workflow.getTask(taskId);
+ let topic = task.getTopic();
+ if(!filteredTopics.includes(topic)) {
+ filteredTopics.push(topic);
+ }
+ });
+ return filteredTopics;
+ }
+
+ getAllTopics(workflow) {
+ return this.getFilteredTopics(workflow, null, null);
+ }
+
+ getAcceptableTopics(workflow) {
+ // return topics for tasks that are running or to be run in the future
+ // include all tasks that are not ended
+ return this.getFilteredTopics(workflow, null, [WorkflowRunTask.TaskStatus.END]);
+ }
+
+ isTopicAcceptable(workflow, topic) {
+ // get topics of tasks that are not completed yet
+ let filteredTopics = this.getFilteredTopics(
+ workflow,
+ null,
+ [WorkflowRunTask.TaskStatus.END]
+ );
+
+ if(filteredTopics.includes(topic)) {
+ return true;
+ }
+ else {
+ return false;
+ }
+ }
+
+ isEventAcceptable(workflow, topic, message) {
+ // event is acceptable if it meets following criteria
+ // 1) the workflow is currently interested in the same topic
+ // (finished tasks are not counted)
+ // 2) the task's key field and value
+ if(this.isTopicAcceptable(workflow, topic) &&
+ this.isEventAcceptableByKeyFieldValue(topic, message)) {
+ return true;
+ }
+ return false;
+ }
+
+ addClientId(clientId) {
+ if(!this.clientIds.includes(clientId)) {
+ this.clientIds.push(clientId);
+ }
+ }
+
+ removeClientId(clientId) {
+ _.pull(this.clientIds, clientId);
+ }
+
+ getClientIds() {
+ return this.clientIds;
+ }
+
+ enqueueEvent(topic, message) {
+ this.eventQueue.push({
+ topic: topic,
+ message: message
+ });
+ }
+
+ peekEvent() {
+ // if the queue is empty, this returns undefined
+ if(this.eventQueue.length > 0) {
+ return this.eventQueue[0];
+ }
+ return undefined;
+ }
+
+ dequeueEvent() {
+ // if the queue is empty, this returns undefined
+ if(this.eventQueue.length > 0) {
+ let events = _.pullAt(this.eventQueue, [0]);
+
+ // move to trash
+ this.trashEventQueue.push(events[0]);
+ return events[0];
+ }
+ return undefined;
+ }
+
+ peekEventByTopic(topic) {
+ // if the queue is empty, this returns undefined
+ let index = _.findIndex(this.eventQueue, (event) => {
+ return event.topic === topic;
+ });
+
+ if(index >= 0) {
+ return this.eventQueue[index];
+ }
+ return undefined;
+ }
+
+ dequeueEventByTopic(topic) {
+ // find event by topic.
+ // returns only first item in the queue
+ // if the queue is empty, this returns undefined
+ let index = _.findIndex(this.eventQueue, (event) => {
+ return event.topic === topic;
+ });
+
+ if(index >= 0) {
+ let events = _.pullAt(this.eventQueue, [index]);
+
+ // move to trash
+ this.trashEventQueue.push(events[0]);
+ return events[0];
+ }
+ return undefined;
+ }
+
+ getTrashEvents() {
+ return this.trashEventQueue;
+ }
+
+ lengthEventQueue() {
+ return this.eventQueue.length;
+ }
+
+ setKickstarted() {
+ this.kickstarted = true;
+ }
+
+ isKickstarted() {
+ return this.kickstarted;
+ }
+
+ setFinished() {
+ this.finished = true;
+ }
+
+ isFinished() {
+ return this.finished;
+ }
+
+ validate() {
+ if(!this.id) {
+ logger.log('error', 'id is not given');
+ return false;
+ }
+
+ if(!this.workflowId) {
+ logger.log('error', 'workflowId is not given');
+ return false;
+ }
+
+ return true;
+ }
+ }
+
+ module.exports = {
+ WorkflowRun: WorkflowRun
+ };
+})();
\ No newline at end of file
diff --git a/src/types/workflowruntask.js b/src/types/workflowruntask.js
new file mode 100644
index 0000000..fbaa604
--- /dev/null
+++ b/src/types/workflowruntask.js
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+ 'use strict';
+
+ const logger = require('../config/logger.js');
+
+ const TaskStatus = {
+ INIT: 'init',
+ BEGIN: 'begin',
+ END: 'end',
+ UNKNOWN: 'unknown'
+ };
+
+ class WorkflowRunTask {
+ constructor(taskId) {
+ this.taskId = taskId;
+ this.status = TaskStatus.UNKNOWN;
+ }
+
+ static parseStatus(strTaskStatus) {
+ if(!strTaskStatus) {
+ return TaskStatus.UNKNOWN;
+ }
+ else if(['i', 'init'].includes(strTaskStatus.toLowerCase())) {
+ return TaskStatus.END;
+ }
+ else if(['b', 'begin', 'start'].includes(strTaskStatus.toLowerCase())) {
+ return TaskStatus.BEGIN;
+ }
+ else if(['e', 'end', 'finish'].includes(strTaskStatus.toLowerCase())) {
+ return TaskStatus.END;
+ }
+ else {
+ return TaskStatus.UNKNOWN;
+ }
+ }
+
+ setTaskId(id) {
+ this.taskId = id;
+ }
+
+ getTaskId() {
+ return this.taskId;
+ }
+
+ setStatus(status) {
+ let taskStatus = WorkflowRunTask.parseStatus(status);
+ this.status = taskStatus;
+ }
+
+ getStatus() {
+ return this.status;
+ }
+
+ validate() {
+ if(!this.taskId) {
+ logger.log('error', 'id is not given');
+ return false;
+ }
+
+ if(!this.status) {
+ logger.log('error', 'status is not given');
+ return false;
+ }
+
+ return true;
+ }
+ }
+
+ module.exports = {
+ TaskStatus: TaskStatus,
+ WorkflowRunTask: WorkflowRunTask
+ };
+})();
\ No newline at end of file
diff --git a/src/types/workflowtask.js b/src/types/workflowtask.js
new file mode 100644
index 0000000..efde42b
--- /dev/null
+++ b/src/types/workflowtask.js
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+ 'use strict';
+
+ const logger = require('../config/logger.js');
+
+ class WorkflowTask {
+ constructor(id, kickstart=false) {
+ this.id = id;
+ this.topic = null;
+ this.kickstart = kickstart;
+ this.keyField = null;
+ this.essence = {};
+ }
+
+ static fromEssence(essence) {
+ if(essence) {
+ let workflowTask;
+ if('task_id' in essence) {
+ workflowTask = new WorkflowTask(essence.task_id);
+ }
+ else {
+ logger.log('error', 'task_id is not given');
+ return null;
+ }
+
+ if('topic' in essence) {
+ workflowTask.setTopic(essence.topic);
+ }
+
+ if('model_name' in essence) {
+ workflowTask.setTopic('datamodel.' + essence.model_name + '.create');
+ workflowTask.setTopic('datamodel.' + essence.model_name + '.update');
+ workflowTask.setTopic('datamodel.' + essence.model_name + '.delete');
+ }
+
+ if('key_field' in essence) {
+ workflowTask.setKeyField(essence.key_field);
+ }
+
+ workflowTask.setEssence(essence);
+ return workflowTask;
+ }
+ else {
+ return null;
+ }
+ }
+
+ setId(id) {
+ this.id = id;
+ }
+
+ getId() {
+ return this.id;
+ }
+
+ setTopic(topic) {
+ this.topic = topic;
+ }
+
+ getTopic() {
+ return this.topic;
+ }
+
+ setKickstart(kickstart=false) {
+ this.kickstart = kickstart;
+ }
+
+ isKickstart() {
+ return this.kickstart;
+ }
+
+ setKeyField(keyField) {
+ this.keyField = keyField;
+ }
+
+ getKeyField() {
+ return this.keyField;
+ }
+
+ setEssence(essence) {
+ this.essence = essence;
+ }
+
+ getEssence() {
+ return this.essence;
+ }
+
+ validate() {
+ if(!this.id) {
+ logger.log('error', 'id is not given');
+ return false;
+ }
+
+ // general Airflow operators other than XOS operators don't have these fields.
+ //
+ // if(!this.topic) {
+ // logger.log('error', 'topic is not given');
+ // return false;
+ // }
+
+ // if(!this.keyField) {
+ // logger.log('error', 'keyField is not given');
+ // return false;
+ // }
+
+ return true;
+ }
+ }
+
+ module.exports = {
+ WorkflowTask: WorkflowTask
+ };
+})();
\ No newline at end of file
diff --git a/src/workflows/hello_workflow.json b/src/workflows/hello_workflow.json
new file mode 100644
index 0000000..ebf94af
--- /dev/null
+++ b/src/workflows/hello_workflow.json
@@ -0,0 +1,24 @@
+{
+ "hello_workflow": {
+ "dag": {
+ "dag_id": "hello_workflow",
+ "local_variable": "dag_hello"
+ },
+ "dependencies": {
+ "onu_event_handler": {}
+ },
+ "tasks": {
+ "onu_event_handler": {
+ "class": "XOSEventSensor",
+ "dag": "dag_hello",
+ "key_field": "serialNumber",
+ "local_variable": "onu_event_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "ONU_event",
+ "task_id": "onu_event_handler",
+ "topic": "onu.events"
+ }
+ }
+ }
+}
diff --git a/src/workflows/loader.js b/src/workflows/loader.js
new file mode 100644
index 0000000..a7bc275
--- /dev/null
+++ b/src/workflows/loader.js
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+(function () {
+ 'use strict';
+
+ const path = require('path');
+ const fs = require('fs');
+ const _ = require('lodash');
+ const Workflow = require('../types/workflow.js');
+ const logger = require('../config/logger.js');
+
+ const loadEssence = (essenceFilename, absPath=false) => {
+ let filepath;
+ if(!absPath) {
+ filepath = path.join(__dirname, essenceFilename);
+ }
+ else {
+ filepath = essenceFilename;
+ }
+
+ try {
+ if (fs.existsSync(filepath)) {
+ logger.log('debug', `Loading an essence - ${filepath}`);
+ let rawdata = fs.readFileSync(filepath);
+ let essence = null;
+ try {
+ essence = JSON.parse(rawdata);
+ }
+ catch (objError) {
+ if (objError instanceof SyntaxError) {
+ logger.log('warn', `failed to parse a json data (syntax error) - ${rawdata}`);
+ }
+ else {
+ logger.log('warn', `failed to parse a json data - ${rawdata}`);
+ }
+ }
+ return essence;
+ }
+ else {
+ logger.log('warn', `No ${filepath} found`);
+ return null;
+ }
+ }
+ catch(e) {
+ logger.log('warn', `Cannot read ${filepath} - ${e}`);
+ return null;
+ }
+ };
+
+ const loadWorkflows = (essenceFilename) => {
+ let filepath = path.join(__dirname, essenceFilename);
+
+ try {
+ if (fs.existsSync(filepath)) {
+ logger.log('debug', `Loading an essence - ${filepath}`);
+ let rawdata = fs.readFileSync(filepath);
+ let workflows = [];
+
+ try {
+ let essence = JSON.parse(rawdata);
+
+ // an essence can have multiple workflows
+ _.forOwn(essence, (workflowEssence, workflowId) => {
+ let workflow = Workflow.Workflow.fromEssence(workflowEssence);
+ if(workflow) {
+ workflows.push(workflow);
+ logger.log('debug', `Loaded workflow: ${workflowId}`);
+ }
+ });
+ }
+ catch (objError) {
+ if (objError instanceof SyntaxError) {
+ logger.log('warn', `failed to parse a json data (syntax error) - ${rawdata}`);
+ }
+ else {
+ logger.log('warn', `failed to parse a json data - ${rawdata}`);
+ }
+ }
+
+ return workflows
+ }
+ else {
+ logger.log('warn', `No ${filepath} found`);
+ return null;
+ }
+ }
+ catch(e) {
+ logger.log('warn', `Cannot read ${filepath} - ${e}`);
+ return null;
+ }
+ };
+
+ const loadAllWorkflows = () => {
+ let dirpath = __dirname;
+
+ let allWorkflows = [];
+ let dirEntries = fs.readdirSync(dirpath);
+ dirEntries.forEach((dirEntry) => {
+ if(dirEntry.endsWith('.json')) {
+ // found workflow essence file in json format
+ let workflows = loadWorkflows(dirEntry);
+ if(workflows) {
+ for(let workflow in workflows) {
+ allWorkflows.push[workflow];
+ }
+ }
+ }
+ });
+ return allWorkflows;
+ };
+
+ module.exports = {
+ loadEssence: loadEssence,
+ loadAllWorkflows: loadAllWorkflows,
+ loadWorkflows: loadWorkflows
+ };
+})();
\ No newline at end of file