Implement basic functionalities for workflow control.
- Manage join/leave of clients
- All clients communicate via socket.io
- Probes emit events
- Managers register workflows (by using a workflow essence)
- Send kickstart request to Managers to launch workflows
- Route events to workflow runs
- Queue events to not lose events between workflow tasks
- Fixed some issues found while working on testcases
- Set to perform coverage and unittest and generate outputs to files
Change-Id: I678723edc20df9247d63a4bf6380785ab8b2b221
diff --git a/.dockerignore b/.dockerignore
new file mode 100644
index 0000000..5171c54
--- /dev/null
+++ b/.dockerignore
@@ -0,0 +1,2 @@
+node_modules
+npm-debug.log
\ No newline at end of file
diff --git a/.eslintrc b/.eslintrc
new file mode 100644
index 0000000..09cf359
--- /dev/null
+++ b/.eslintrc
@@ -0,0 +1,37 @@
+{
+ "parserOptions": {
+ "ecmaFeatures": {
+ "blockBindings": true,
+ "forOf": true,
+ "destructuring": true,
+ "arrowFunctions": true,
+ "templateStrings": true,
+ "generators": true
+ }
+ },
+ "env": {
+ "node": true,
+ "es6": true
+ },
+ "rules": {
+ "quotes": [2, "single"],
+ "no-undef": [2],
+ "object-curly-spacing": [2, "never"],
+ "no-unused-vars": [2, {"vars": "all", "args": "after-used", "argsIgnorePattern": "^_"}],
+ "space-before-blocks": [2, {"keywords": "always", "functions":"always"}],
+ "brace-style": [2, "stroustrup", { "allowSingleLine": true }],
+ "no-param-reassign": [2, {"props": false}],
+ "max-len": [2, 120, 4],
+ "eqeqeq": [1, "smart"],
+ "new-cap": [2, {"capIsNewExceptions": ["Router"]}],
+ "func-names": 0,
+ "object-shorthand": [0, "never"],
+ "wrap-iife": [2, "any"],
+ "no-loop-func": 0,
+ "no-console": 0,
+ "padded-blocks": 0
+ },
+ "globals" :{
+ "require": true
+ }
+}
\ No newline at end of file
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..2d67128
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,19 @@
+node_modules/
+npm-debug.log
+.nyc_output
+coverage
+.idea
+package-lock.json
+.noseids
+.vscode
+build
+dist
+.coverage
+coverage.xml
+cover
+.tox
+.DS_Store
+nose2-results.xml
+results.xml
+venv-service
+*.pyc
\ No newline at end of file
diff --git a/.gitreview b/.gitreview
new file mode 100644
index 0000000..861567c
--- /dev/null
+++ b/.gitreview
@@ -0,0 +1,5 @@
+[gerrit]
+host=gerrit.opencord.org
+port=29418
+project=cord-workflow-controller.git
+defaultremote=origin
diff --git a/Dockerfile b/Dockerfile
new file mode 100644
index 0000000..d4104a9
--- /dev/null
+++ b/Dockerfile
@@ -0,0 +1,44 @@
+# opencord/cord-workflow-control-service
+# To build use: docker build -t opencord/cord-workflow-control-service .
+# To run use: docker run -p 3000:3000 -d opencord/cord-workflow-control-service
+
+FROM node:10.16-alpine
+
+# Set environment variables
+ENV CODE_SOURCE .
+ENV CODE_DEST /var/www
+
+# Create app directory
+WORKDIR ${CODE_DEST}
+
+# install librdkafka
+RUN apk --no-cache add -U python make bash g++
+
+# Copy over app dependencies and source files
+COPY ${CODE_SOURCE}/package.json ${CODE_DEST}/
+COPY ${CODE_SOURCE}/src/ ${CODE_DEST}/src/
+
+# Install app dependencies and create logdir
+RUN npm install --production \
+ && mkdir ${CODE_DEST}/logs
+
+EXPOSE 3000
+
+# Label image
+ARG org_label_schema_schema_version=1.0
+ARG org_label_schema_name=cord-workflow-controller
+ARG org_label_schema_version=unknown
+ARG org_label_schema_vcs_url=unknown
+ARG org_label_schema_vcs_ref=unknown
+ARG org_label_schema_build_date=unknown
+ARG org_opencord_vcs_commit_date=unknown
+
+LABEL org.label-schema.schema-version=$org_label_schema_schema_version \
+ org.label-schema.name=$org_label_schema_name \
+ org.label-schema.version=$org_label_schema_version \
+ org.label-schema.vcs-url=$org_label_schema_vcs_url \
+ org.label-schema.vcs-ref=$org_label_schema_vcs_ref \
+ org.label-schema.build-date=$org_label_schema_build_date \
+ org.opencord.vcs-commit-date=$org_opencord_vcs_commit_date
+
+CMD [ "npm", "start" ]
\ No newline at end of file
diff --git a/LICENSE.txt b/LICENSE.txt
new file mode 100644
index 0000000..261eeb9
--- /dev/null
+++ b/LICENSE.txt
@@ -0,0 +1,201 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..a26d3ce
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,64 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Configure shell
+SHELL = bash -e -o pipefail
+
+# Variables
+VERSION ?= $$(python -c 'import json,sys;obj=json.load(sys.stdin); print obj["version"]' < package.json)
+CONTAINER_NAME ?= $(notdir $(abspath .))
+
+## Docker related
+DOCKER_REGISTRY ?=
+DOCKER_REPOSITORY ?=
+DOCKER_BUILD_ARGS ?=
+DOCKER_TAG ?= ${VERSION}
+DOCKER_IMAGENAME := ${DOCKER_REGISTRY}${DOCKER_REPOSITORY}${CONTAINER_NAME}:${DOCKER_TAG}
+
+## Docker labels. Only set ref and commit date if committed
+DOCKER_LABEL_VCS_URL ?= $(shell git remote get-url $(shell git remote))
+DOCKER_LABEL_VCS_REF ?= $(shell git diff-index --quiet HEAD -- && git rev-parse HEAD || echo "unknown")
+DOCKER_LABEL_COMMIT_DATE ?= $(shell git diff-index --quiet HEAD -- && git show -s --format=%cd --date=iso-strict HEAD || echo "unknown" )
+DOCKER_LABEL_BUILD_DATE ?= $(shell date -u "+%Y-%m-%dT%H:%M:%SZ")
+
+all: test
+
+docker-build:
+ docker build $(DOCKER_BUILD_ARGS) \
+ -t ${DOCKER_IMAGENAME} \
+ --build-arg org_label_schema_version="${VERSION}" \
+ --build-arg org_label_schema_vcs_url="${DOCKER_LABEL_VCS_URL}" \
+ --build-arg org_label_schema_vcs_ref="${DOCKER_LABEL_VCS_REF}" \
+ --build-arg org_label_schema_build_date="${DOCKER_LABEL_BUILD_DATE}" \
+ --build-arg org_opencord_vcs_commit_date="${DOCKER_LABEL_COMMIT_DATE}" \
+ -f Dockerfile .
+
+docker-push:
+ docker push ${DOCKER_IMAGENAME}
+
+install:
+ npm install
+
+test: npm-test lint
+
+npm-test: install
+ npm test
+
+lint: install
+ npm run lint
+
+clean:
+ rm -rf .nyc_output
+ rm -rf coverage
+ rm -rf node_modules
\ No newline at end of file
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..70e7f78
--- /dev/null
+++ b/README.md
@@ -0,0 +1,5 @@
+CORD Workflow Controller
+========================
+
+CORD Workflow Controller works in-between CORD Workflow Probe and CORD Workflow Airflow to
+ manage workflows and relay events.
\ No newline at end of file
diff --git a/logs/.gitignore b/logs/.gitignore
new file mode 100644
index 0000000..c96a04f
--- /dev/null
+++ b/logs/.gitignore
@@ -0,0 +1,2 @@
+*
+!.gitignore
\ No newline at end of file
diff --git a/package.json b/package.json
new file mode 100644
index 0000000..614d00b
--- /dev/null
+++ b/package.json
@@ -0,0 +1,48 @@
+{
+ "name": "cord_workflow_controller",
+ "version": "0.1.0",
+ "description": "CORD Workflow Controller",
+ "main": "src/server.js",
+ "scripts": {
+ "start": "node src/server.js",
+ "test": "LOG_LEVEL=error nyc --reporter=cobertura --reporter=text mocha --exit spec/**/*.spec.js --reporter mocha-multi-reporters --reporter-options configFile=reporters.json",
+ "test:dev": "mocha --exit -w spec/**/*.spec.js",
+ "lint": "eslint spec src"
+ },
+ "author": "Open Networking Foundation",
+ "license": "Apache-2.0",
+ "repository": {
+ "type": "git",
+ "url": "git://gerrit.opencord.org/cord-workflow-controller.git"
+ },
+ "dependencies": {
+ "body-parser": "^1.19.0",
+ "cors": "^2.8.5",
+ "express": "^4.17.1",
+ "express-validator": "^6.0.1",
+ "lodash": "^4.17.11",
+ "async": "^3.1.0",
+ "node-yaml-config": "0.0.5",
+ "socket.io": "^2.2.0",
+ "socketio-wildcard": "^2.0.0",
+ "double-ended-queue": "^2.1.0-0",
+ "test": "^0.6.0",
+ "winston": "^3.2.1",
+ "yargs": "^13.2.4",
+ "dateformat": "^3.0.3"
+ },
+ "devDependencies": {
+ "chai": "^4.2.0",
+ "eslint": "5.16.0",
+ "eslint-plugin-import": "^2.17.3",
+ "mocha": "^6.1.4",
+ "mockery": "^2.1.0",
+ "nyc": "^14.1.1",
+ "sinon": "^7.3.2",
+ "sinon-chai": "^3.3.0",
+ "socket.io-client": "^2.2.0",
+ "supertest": "^4.0.2",
+ "mocha-multi-reporters": "^1.1.7",
+ "mocha-junit-reporter": "^1.23.0"
+ }
+ }
\ No newline at end of file
diff --git a/reporters.json b/reporters.json
new file mode 100644
index 0000000..03b9087
--- /dev/null
+++ b/reporters.json
@@ -0,0 +1,6 @@
+{
+ "reporterEnabled": "spec, mocha-junit-reporter",
+ "mochaJunitReporterReporterOptions": {
+ "mochaFile": "./results.xml"
+ }
+}
\ No newline at end of file
diff --git a/spec/.eslintrc b/spec/.eslintrc
new file mode 100644
index 0000000..497ba43
--- /dev/null
+++ b/spec/.eslintrc
@@ -0,0 +1,12 @@
+{
+ "globals" :{
+ "describe": true,
+ "it": true,
+ "xdescribe": true,
+ "xit": true,
+ "before": true,
+ "beforeEach": true,
+ "after": true,
+ "afterEach": true
+ }
+}
\ No newline at end of file
diff --git a/spec/clients.spec.js b/spec/clients.spec.js
new file mode 100644
index 0000000..534797d
--- /dev/null
+++ b/spec/clients.spec.js
@@ -0,0 +1,320 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+ 'use strict';
+
+ const path = require('path');
+ const chai = require('chai');
+ const expect = chai.expect;
+ const sinonChai = require('sinon-chai');
+ chai.use(sinonChai);
+ const io = require('socket.io-client');
+ const async = require('async');
+ const _ = require('lodash');
+ const server = require('../src/server.js');
+ const port = 4000;
+ const eventrouter = require('../src/controllers/eventrouter.js');
+ const essenceLoader = require('../src/workflows/loader.js');
+ const essenceFileName = path.join(__dirname, 'test_clients_workflow_essence.json');
+ const workflowIdInEssence = 'test_clients_workflow'
+
+ describe('Simple websocket client test', function() {
+
+ var probeClient;
+ var workflowManagerClient;
+ var workflowRunClient;
+ var workflowId;
+ var workflowRunId;
+
+ before(function() {
+ // Start our server
+ server.start(port);
+ });
+
+ after(function() {
+ server.stop();
+ });
+
+ beforeEach(function(done) {
+ let workflowCheckResults = [];
+ async.series([
+ (callback) => {
+ // connect a probe to the server
+ // to send events for test
+ probeClient = io.connect(`http://localhost:${port}`, {
+ query: 'id=probe_id&type=probe' +
+ '&name=probe@xos.org'
+ });
+
+ probeClient.on('connect', () => {
+ callback(null, true);
+ });
+ return;
+ },
+ (callback) => {
+ // connect a workflow manager to the server
+ // to register a test workflow
+ workflowManagerClient = io.connect(`http://localhost:${port}`, {
+ query: 'id=workflow_manager_id&type=workflow_manager' +
+ '&name=manager@xos.org'
+ });
+
+ workflowManagerClient.on(eventrouter.serviceEvents.WORKFLOW_KICKSTART, (message) => {
+ workflowRunId = message.workflow_run_id;
+
+ // call-back
+ workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_KICKSTART, {
+ workflow_id: workflowId,
+ workflow_run_id: workflowRunId
+ })
+ });
+
+ workflowManagerClient.on('connect', () => {
+ callback(null, true);
+ });
+ return;
+ },
+ (callback) => {
+ // check existance of the workflow
+ let essence = essenceLoader.loadEssence(essenceFileName, true);
+ let workflowCnt=0;
+
+ _.forOwn(essence, (_value, essenceWorkflowId) => {
+ workflowId = essenceWorkflowId; // preseve only the last one for test
+
+ workflowCnt++;
+
+ workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_CHECK, essenceWorkflowId);
+
+ workflowManagerClient.on(eventrouter.serviceEvents.WORKFLOW_CHECK, (workflowCheckResult) => {
+ workflowCnt--;
+ workflowCheckResults.push(workflowCheckResult.result);
+
+ if(workflowCnt <= 0) {
+ callback(null, workflowCheckResults);
+ }
+ });
+ });
+ return;
+ },
+ (callback) => {
+ // register the workflow
+ let register = false;
+ workflowCheckResults.forEach((workflowCheckResult) => {
+ if(!workflowCheckResult) {
+ register = true;
+ }
+ });
+
+ if(register) {
+ let essence = essenceLoader.loadEssence(essenceFileName, true);
+
+ workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_REG_ESSENCE, essence);
+
+ workflowManagerClient.on(
+ eventrouter.serviceEvents.WORKFLOW_REG_ESSENCE,
+ (workflowRegResult) => {
+ callback(null, workflowRegResult);
+ }
+ );
+ }
+ else {
+ callback(null, true);
+ }
+ return;
+ },
+ (callback) => {
+ // kickstart the test workflow
+ probeClient.emit('onu.events', {serialNumber: 'testSerialXXX', other: 'test_other_field'});
+ setTimeout(() => {
+ expect(workflowRunId).to.not.be.undefined;
+ callback(null, true);
+ }, 500);
+ return;
+ },
+ (callback) => {
+ // connect a workflow run client to the server
+ workflowRunClient = io.connect(`http://localhost:${port}`, {
+ query: 'id=workflow_run_id&type=workflow_run' +
+ `&workflow_id=${workflowIdInEssence}&workflow_run_id=${workflowRunId}` +
+ '&name=run@xos.org'
+ });
+
+ // when is connected start testing
+ workflowRunClient.on('connect', () => {
+ callback(null, true);
+ });
+ return;
+ }
+ ],
+ function(err, results) {
+ // we do not actually check results
+ if(results.includes(false)) {
+ done.fail(err);
+ }
+ else {
+ done();
+ }
+ });
+ return;
+ });
+
+ afterEach(function(done) {
+ // remove workflow run
+ workflowManagerClient.emit(server.serviceEvents.WORKFLOW_RUN_REMOVE, {
+ workflow_id: workflowId,
+ workflow_run_id: workflowRunId
+ });
+
+ // remove workflow
+ workflowManagerClient.emit(server.serviceEvents.WORKFLOW_REMOVE, workflowId);
+
+ workflowId = null;
+ workflowRunId = null;
+
+ // disconnect clients
+ if(workflowManagerClient.connected) {
+ workflowManagerClient.disconnect();
+ }
+ workflowManagerClient = null;
+
+ if(workflowRunClient.connected) {
+ workflowRunClient.disconnect();
+ }
+ workflowRunClient = null;
+
+ if(probeClient.connected) {
+ probeClient.disconnect();
+ }
+ probeClient = null;
+
+ done();
+ });
+
+ it('should have a probe, a workflow manager and a workflow run', function(done) {
+ const eventrouter = require('../src/controllers/eventrouter.js');
+ expect(
+ Object.keys(eventrouter.getWorkflowRunClients()).length,
+ 'num of workflow run clients'
+ ).to.equal(1);
+ expect(
+ Object.keys(eventrouter.getWorkflowManagerClients()).length,
+ 'num of workflow manager clients'
+ ).to.equal(1);
+ expect(
+ Object.keys(eventrouter.getProbeClients()).length,
+ 'num of probe clients'
+ ).to.equal(1);
+ expect(
+ Object.keys(eventrouter.getClients()).length,
+ 'total num of clients'
+ ).to.equal(3);
+
+ expect(
+ 'probe_id' in eventrouter.getClients(),
+ 'a client called prove_id exists'
+ ).to.equal(true);
+ expect(
+ 'workflow_manager_id' in eventrouter.getClients(),
+ 'a client called workflow_manager_id exists'
+ ).to.equal(true);
+ expect(
+ 'workflow_run_id' in eventrouter.getClients(),
+ 'a client called workflow_run_id exists'
+ ).to.equal(true);
+ done();
+ });
+
+ it('should store user details for a new connection', function() {
+ const eventrouter = require('../src/controllers/eventrouter.js');
+
+ const probe = eventrouter.getClients()['probe_id'];
+ expect(probe.getParams().name).to.equal('probe@xos.org');
+
+ const manager = eventrouter.getClients()['workflow_manager_id'];
+ expect(manager.getParams().name).to.equal('manager@xos.org');
+
+ const run = eventrouter.getClients()['workflow_run_id'];
+ expect(run.getParams().name).to.equal('run@xos.org');
+ });
+
+ it('should not store the same user twice', function(done) {
+ // This test case makes cleaning up process taking long time because it leaves
+ // a client socket. It seems there's no way to release it from server-side.
+
+ // connect a client to the server
+ const client2 = io.connect(`http://localhost:${port}`, {
+ query: 'id=probe_id&type=probe' +
+ '&name=probe@xos.org&value=different_value'
+ });
+
+ // when is connected start testing
+ client2.on('connect', () => {
+ setTimeout(() => {
+ const eventrouter = require('../src/controllers/eventrouter.js');
+ expect(
+ Object.keys(eventrouter.getWorkflowRunClients()).length,
+ 'num of workflow run clients'
+ ).to.equal(1);
+ expect(
+ Object.keys(eventrouter.getWorkflowManagerClients()).length,
+ 'num of workflow manager clients'
+ ).to.equal(1);
+ expect(
+ Object.keys(eventrouter.getProbeClients()).length,
+ 'num of probe clients'
+ ).to.equal(1);
+ expect(
+ Object.keys(eventrouter.getClients()).length,
+ 'total num of clients'
+ ).to.equal(3);
+
+ done();
+ }, 100);
+ });
+ });
+
+ it('should remove a user on disconnect', function(done) {
+ workflowManagerClient.disconnect();
+ workflowRunClient.disconnect();
+ probeClient.disconnect();
+
+ // we need to wait for the event to be dispatched
+ setTimeout(() => {
+ const eventrouter = require('../src/controllers/eventrouter.js');
+ expect(
+ Object.keys(eventrouter.getWorkflowRunClients()).length,
+ 'num of workflow run clients'
+ ).to.equal(0);
+ expect(
+ Object.keys(eventrouter.getWorkflowManagerClients()).length,
+ 'num of workflow manager clients'
+ ).to.equal(0);
+ expect(
+ Object.keys(eventrouter.getProbeClients()).length,
+ 'num of probe clients'
+ ).to.equal(0);
+ expect(
+ Object.keys(eventrouter.getClients()).length,
+ 'total num of clients'
+ ).to.equal(0);
+ done();
+ }, 100);
+ });
+ });
+})();
\ No newline at end of file
diff --git a/spec/eventrouter.spec.js b/spec/eventrouter.spec.js
new file mode 100644
index 0000000..6184f58
--- /dev/null
+++ b/spec/eventrouter.spec.js
@@ -0,0 +1,353 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+ 'use strict';
+
+ const path = require('path');
+ const chai = require('chai');
+ const expect = chai.expect;
+ const sinonChai = require('sinon-chai');
+ chai.use(sinonChai);
+ const io = require('socket.io-client');
+ const async = require('async');
+ const _ = require('lodash');
+ const server = require('../src/server.js');
+ const port = 4000;
+ const eventrouter = require('../src/controllers/eventrouter.js');
+ const essenceLoader = require('../src/workflows/loader.js');
+ const essenceFileName = path.join(__dirname, 'test_multi_workflow_essence.json');
+
+ var probeClient;
+ var workflowManagerClients = [];
+ var workflowRunClients = [];
+ var workflowIds = [];
+ var workflowRunInfos = [];
+
+ var receivedKickstartMessages = [[],[]];
+
+ describe('Workflow kickstart test', function() {
+
+ before(function() {
+ // Start our server
+ server.start(port);
+ });
+
+ after(function() {
+ server.stop();
+ });
+
+ beforeEach(function(done) {
+ async.series([
+ (callback) => {
+ // connect a probe to the server
+ // to send events for test
+ probeClient = io.connect(`http://localhost:${port}`, {
+ query: 'id=probe_id&type=probe' +
+ '&name=probe@xos.org'
+ });
+
+ probeClient.on('connect', () => {
+ callback(null, true);
+ });
+ return;
+ },
+ (callback) => {
+ // connect first workflow manager to the server
+ // this manager will kickstart a workflow
+ let workflowManagerClient = io.connect(`http://localhost:${port}`, {
+ query: 'id=workflow_manager_id1&type=workflow_manager' +
+ '&name=manager1@xos.org'
+ });
+
+ workflowManagerClient.on(eventrouter.serviceEvents.WORKFLOW_KICKSTART, (message) => {
+ // save it for check
+ receivedKickstartMessages[0].push(message);
+
+ // save workflow_id and workflow_run_id
+ workflowRunInfos.push({
+ workflowId: message.workflow_id,
+ workflowRunId: message.workflow_run_id
+ });
+
+ setTimeout(() => {
+ // call-back
+ workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_KICKSTART, {
+ workflow_id: message.workflow_id,
+ workflow_run_id: message.workflow_run_id
+ })
+ }, 2000);
+ });
+
+ workflowManagerClient.on('connect', () => {
+ callback(null, true);
+ });
+
+ workflowManagerClients.push(workflowManagerClient);
+ return;
+ },
+ (callback) => {
+ // connect second workflow manager to the server
+ // this manager will not kickstart a workflow
+ let workflowManagerClient = io.connect(`http://localhost:${port}`, {
+ query: 'id=workflow_manager_id2&type=workflow_manager' +
+ '&name=manager2@xos.org'
+ });
+
+ workflowManagerClient.on(eventrouter.serviceEvents.WORKFLOW_KICKSTART, (message) => {
+ receivedKickstartMessages[1].push(message);
+ });
+
+ workflowManagerClient.on('connect', () => {
+ callback(null, true);
+ });
+
+ workflowManagerClients.push(workflowManagerClient);
+ return;
+ },
+ (callback) => {
+ let essence = essenceLoader.loadEssence(essenceFileName, true);
+
+ // register the workflow
+ workflowManagerClients[0].emit(eventrouter.serviceEvents.WORKFLOW_REG_ESSENCE, essence);
+
+ _.forOwn(essence, (_value, workflowId) => {
+ // save
+ workflowIds.push(workflowId);
+ });
+
+ // handle return
+ workflowManagerClients[0].on(
+ eventrouter.serviceEvents.WORKFLOW_REG_ESSENCE,
+ (workflowRegResult) => {
+ callback(null, workflowRegResult);
+ }
+ );
+ return;
+ }
+ ],
+ function(err, results) {
+ // we do not actually check results;
+ if(results.includes(false)) {
+ done.fail(err);
+ }
+ else {
+ done();
+ }
+ });
+ return;
+ });
+
+ afterEach(function() {
+ // remove workflow runs
+ _.forOwn(workflowRunInfos, (workflowRunInfo) => {
+ workflowManagerClients[0].emit(server.serviceEvents.WORKFLOW_RUN_REMOVE, {
+ workflow_id: workflowRunInfo.workflowId,
+ workflow_run_id: workflowRunInfo.workflowRunId
+ });
+ });
+ workflowRunInfos.length = 0;
+
+ // remove workflows
+ _.forOwn(workflowIds, (workflowId) => {
+ workflowManagerClients[0].emit(server.serviceEvents.WORKFLOW_REMOVE, workflowId);
+ });
+ workflowIds.length = 0;
+
+ // remove message store
+ receivedKickstartMessages.forEach((receivedKickstartMessageStore) => {
+ receivedKickstartMessageStore.length = 0;
+ });
+
+ // disconnect clients
+ workflowManagerClients.forEach((workflowManagerClient) => {
+ if(workflowManagerClient.connected) {
+ workflowManagerClient.disconnect();
+ }
+ });
+ workflowManagerClients.length = 0;
+
+ workflowRunClients.forEach((workflowRunClient) => {
+ if(workflowRunClient.connected) {
+ workflowRunClient.disconnect();
+ }
+ });
+ workflowRunClients.length = 0;
+
+ if(probeClient.connected) {
+ probeClient.disconnect();
+ }
+ probeClient = null;
+ });
+
+ it('should have two workflows', function(done) {
+ workflowManagerClients[0].on(eventrouter.serviceEvents.WORKFLOW_LIST, (result) => {
+ let workflowsList = result.result;
+ expect(workflowsList.length).to.equal(2);
+ workflowsList.forEach((workflowIdInList) => {
+ expect(workflowIds).to.includes(workflowIdInList);
+ });
+ done();
+ });
+
+ workflowManagerClients[0].emit(eventrouter.serviceEvents.WORKFLOW_LIST, {});
+ });
+
+ it('all managers should receive kickstart messages', function(done) {
+ // kickstart the workflow
+ probeClient.emit('onu.events', {serialNumber: 'testSerialXXX', other: 'test_other_field'});
+ setTimeout(() => {
+ expect(receivedKickstartMessages.length, 'num of message stores').to.equal(2);
+ receivedKickstartMessages.forEach((receivedKickstartMessageStore) => {
+ expect(receivedKickstartMessageStore.length, 'num of messages in a store').to.equal(1);
+ });
+ done();
+ }, 500);
+ });
+
+ it('should have only one workflow run', function(done) {
+ this.timeout(5000);
+
+ // kickstart the workflow
+ probeClient.emit('onu.events', {serialNumber: 'testSerialXXX', other: 'test_other_field'});
+ setTimeout(() => {
+ // kickstart will take 2 seconds roughly
+ expect(workflowRunInfos.length, 'num of workflow runs').to.equal(1);
+ // the workflow must be 'should_be_called'
+ expect(workflowRunInfos[0].workflowId, 'workflow id').to.equal('should_be_called');
+ done();
+ }, 3000);
+ });
+
+ it('should be able to read an event that is used for workflow kickstart', function(done) {
+ this.timeout(5000);
+
+ // kickstart the workflow
+ probeClient.emit('onu.events', {serialNumber: 'testSerialXXX', other: 'test_other_field'});
+ setTimeout(() => {
+ // kickstart will take 2 seconds roughly
+ expect(workflowRunInfos.length, 'num of workflow runs').to.equal(1);
+ // the workflow must be 'should_be_called'
+ expect(workflowRunInfos[0].workflowId, 'workflow id').to.equal('should_be_called');
+
+ // connect a workflow run client to the server
+ let workflowRunClient = io.connect(`http://localhost:${port}`, {
+ query: 'id=workflow_run_id1&type=workflow_run' +
+ `&workflow_id=${workflowRunInfos[0].workflowId}` +
+ `&workflow_run_id=${workflowRunInfos[0].workflowRunId}` +
+ '&name=run1@xos.org'
+ });
+ workflowRunClients.push(workflowRunClient);
+
+ workflowRunClient.on('connect', () => {
+ workflowRunClient.emit(eventrouter.serviceEvents.WORKFLOW_RUN_FETCH_EVENT, {
+ workflow_id: workflowRunInfos[0].workflowId,
+ workflow_run_id: workflowRunInfos[0].workflowRunId,
+ task_id: 'onu_event_handler',
+ topic: 'onu.events'
+ });
+ });
+
+ workflowRunClient.on(eventrouter.serviceEvents.WORKFLOW_RUN_FETCH_EVENT, (result) => {
+ let event = result.result;
+ expect(event.topic).to.equal('onu.events');
+ expect(event.message.serialNumber).to.equal('testSerialXXX');
+ done();
+ });
+ }, 3000);
+ });
+
+ /*
+ it('should store user details for a new connection', () => {
+ const eventrouter = require('../src/controllers/eventrouter.js');
+
+ const probe = eventrouter.getClients()['probe_id'];
+ expect(probe.getParams().name).to.equal('probe@xos.org');
+
+ const manager = eventrouter.getClients()['workflow_manager_id'];
+ expect(manager.getParams().name).to.equal('manager@xos.org');
+
+ const run = eventrouter.getClients()['workflow_run_id'];
+ expect(run.getParams().name).to.equal('run@xos.org');
+ });
+
+ it('should not store the same user twice', (done) => {
+ // This test case makes cleaning up process taking long time because it leaves
+ // a client socket. It seems there's no way to release it from server-side.
+
+ // connect a client to the server
+ const client2 = io.connect(`http://localhost:${port}`, {
+ query: 'id=probe_id&type=probe' +
+ '&name=probe@xos.org&value=different_value'
+ });
+
+ // when is connected start testing
+ client2.on('connect', () => {
+ setTimeout(() => {
+ const eventrouter = require('../src/controllers/eventrouter.js');
+ expect(
+ Object.keys(eventrouter.getWorkflowRunClients()).length,
+ 'num of workflow run clients'
+ ).to.equal(1);
+ expect(
+ Object.keys(eventrouter.getWorkflowManagerClients()).length,
+ 'num of workflow manager clients'
+ ).to.equal(1);
+ expect(
+ Object.keys(eventrouter.getProbeClients()).length,
+ 'num of probe clients'
+ ).to.equal(1);
+ expect(
+ Object.keys(eventrouter.getClients()).length,
+ 'total num of clients'
+ ).to.equal(3);
+
+ done();
+ }, 100);
+ });
+ });
+
+ it('should remove a user on disconnect', (done) => {
+ workflowManagerClient.disconnect();
+ workflowRunClient.disconnect();
+ probeClient.disconnect();
+
+ // we need to wait for the event to be dispatched
+ setTimeout(() => {
+ const eventrouter = require('../src/controllers/eventrouter.js');
+ expect(
+ Object.keys(eventrouter.getWorkflowRunClients()).length,
+ 'num of workflow run clients'
+ ).to.equal(0);
+ expect(
+ Object.keys(eventrouter.getWorkflowManagerClients()).length,
+ 'num of workflow manager clients'
+ ).to.equal(0);
+ expect(
+ Object.keys(eventrouter.getProbeClients()).length,
+ 'num of probe clients'
+ ).to.equal(0);
+ expect(
+ Object.keys(eventrouter.getClients()).length,
+ 'total num of clients'
+ ).to.equal(0);
+ done();
+ }, 100);
+ });
+ */
+ });
+})();
\ No newline at end of file
diff --git a/spec/test_clients_workflow_essence.json b/spec/test_clients_workflow_essence.json
new file mode 100644
index 0000000..bd37499
--- /dev/null
+++ b/spec/test_clients_workflow_essence.json
@@ -0,0 +1,44 @@
+{
+ "test_clients_workflow": {
+ "dag": {
+ "dag_id": "test_clients_workflow",
+ "local_variable": "dag_test_clients_workflow"
+ },
+ "dependencies": {
+ "task1": {
+ "children": [
+ "task2"
+ ]
+ },
+ "task2": {
+ "parents": [
+ "task1"
+ ]
+ }
+ },
+ "tasks": {
+ "task1": {
+ "class": "XOSEventSensor",
+ "dag": "dag_test_clients_workflow",
+ "key_field": "serialNumber",
+ "local_variable": "task1",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "task1_func",
+ "task_id": "task1",
+ "topic": "onu.events"
+ },
+ "task2": {
+ "class": "XOSModelSensor",
+ "dag": "dag_test_clients_workflow",
+ "key_field": "serialNumber",
+ "local_variable": "task2",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "task2_func",
+ "task_id": "task2"
+ }
+ }
+ }
+}
diff --git a/spec/test_multi_workflow_essence.json b/spec/test_multi_workflow_essence.json
new file mode 100644
index 0000000..572a432
--- /dev/null
+++ b/spec/test_multi_workflow_essence.json
@@ -0,0 +1,118 @@
+{
+ "must_not_be_called": {
+ "dag": {
+ "dag_id": "must_not_be_called",
+ "local_variable": "dag_must_not_be_called"
+ },
+ "dependencies": {
+ "must_not_be_called_handler": {
+ "children": [
+ "onu_event_handler"
+ ]
+ },
+ "onu_event_handler": {
+ "parents": [
+ "must_not_be_called_handler"
+ ],
+ "children": [
+ "onu_model_event_handler"
+ ]
+ },
+ "onu_model_event_handler": {
+ "parents": [
+ "onu_event_handler"
+ ]
+ }
+ },
+ "tasks": {
+ "must_not_be_called_handler": {
+ "class": "UnknownSensor",
+ "dag": "dag_must_not_be_called",
+ "local_variable": "must_not_be_called_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "task_id": "must_not_be_called_handler"
+ },
+ "onu_event_handler": {
+ "class": "XOSEventSensor",
+ "dag": "dag_must_not_be_called",
+ "key_field": "serialNumber",
+ "local_variable": "onu_event_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "ONU_event",
+ "task_id": "onu_event_handler",
+ "topic": "onu.events"
+ },
+ "onu_model_event_handler": {
+ "class": "XOSModelSensor",
+ "dag": "dag_must_not_be_called",
+ "key_field": "serialNumber",
+ "local_variable": "onu_model_event_handler",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DriverService_event",
+ "task_id": "onu_model_event_handler"
+ }
+ }
+ },
+ "should_be_called": {
+ "dag": {
+ "dag_id": "should_be_called",
+ "local_variable": "dag_should_be_called"
+ },
+ "dependencies": {
+ "onu_event_handler": {
+ "children": [
+ "onu_model_event_handler"
+ ]
+ },
+ "onu_model_event_handler": {
+ "parents": [
+ "onu_event_handler"
+ ],
+ "children": [
+ "can_be_stuck_handler"
+ ]
+ },
+ "can_be_stuck_handler": {
+ "parents": [
+ "onu_model_event_handler"
+ ]
+ }
+ },
+ "tasks": {
+ "onu_event_handler": {
+ "class": "XOSEventSensor",
+ "dag": "dag_should_be_called",
+ "key_field": "serialNumber",
+ "local_variable": "onu_event_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "ONU_event",
+ "task_id": "onu_event_handler",
+ "topic": "onu.events"
+ },
+ "onu_model_event_handler": {
+ "class": "XOSModelSensor",
+ "dag": "dag_should_be_called",
+ "key_field": "serialNumber",
+ "local_variable": "onu_model_event_handler",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DriverService_event",
+ "task_id": "onu_model_event_handler"
+ },
+ "can_be_stuck_handler": {
+ "class": "UnknownSensor",
+ "dag": "dag_should_be_called",
+ "local_variable": "can_be_stuck_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "task_id": "can_be_stuck_handler"
+ }
+ }
+ }
+}
diff --git a/src/config/config.js b/src/config/config.js
new file mode 100644
index 0000000..288e184
--- /dev/null
+++ b/src/config/config.js
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+(function () {
+ 'use strict';
+
+ const argv = require('yargs').argv;
+ const path = require('path');
+ const YamlConfig = require('node-yaml-config');
+ const logger = require('../config/logger.js');
+
+ // if a config file is specified in as a CLI arguments use that one
+ const cfgFile = argv.config || 'config.yml';
+
+ let config;
+ try {
+ logger.log('debug', `Loading ${path.join(__dirname, cfgFile)}`);
+ config = YamlConfig.load(path.join(__dirname, cfgFile));
+ logger.log('debug', `Parsed config: ${JSON.stringify(config)}`);
+ }
+ catch(e) {
+ logger.log('debug', `No ${cfgFile} found, using default params`);
+ }
+
+ module.exports = {
+ service: {
+ port: (config && config.service) ? config.service.port : 3000
+ }
+ };
+})();
\ No newline at end of file
diff --git a/src/config/config.yml b/src/config/config.yml
new file mode 100644
index 0000000..6fb1570
--- /dev/null
+++ b/src/config/config.yml
@@ -0,0 +1,17 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+default:
+ service:
+ port: 3030
diff --git a/src/config/logger.js b/src/config/logger.js
new file mode 100644
index 0000000..0a70904
--- /dev/null
+++ b/src/config/logger.js
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+(function () {
+ 'use strict';
+
+ const winston = require('winston');
+ const fs = require('fs');
+ const path = require('path');
+ const level = process.env.LOG_LEVEL || 'debug';
+ winston.level = level;
+
+ const logFile = path.join(__dirname, '../../logs/cord-workflow-control-service');
+
+ // clear old logs
+ ['error', 'debug'].forEach(l => {
+ try {
+ fs.statSync(`${logFile}.${l}.log`)
+ fs.unlinkSync(`${logFile}.${l}.log`);
+ }
+ catch(e) {
+ // log does not exist
+ }
+ });
+
+ // create a custom logger with colorized console and persistance to file
+ const logger = winston.createLogger({
+ transports: [
+ new (winston.transports.Console)({level: level, colorize: true}),
+ new (winston.transports.File)({name: 'error-log', level: 'error', filename: `${logFile}.error.log`}),
+ new (winston.transports.File)({name: 'debug-log', level: 'debug', filename: `${logFile}.debug.log`})
+ ]
+ });
+
+ module.exports = logger;
+})();
\ No newline at end of file
diff --git a/src/controllers/eventrouter.js b/src/controllers/eventrouter.js
new file mode 100644
index 0000000..82d27db
--- /dev/null
+++ b/src/controllers/eventrouter.js
@@ -0,0 +1,512 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+ 'use strict';
+
+ const _ = require('lodash');
+ const logger = require('../config/logger.js');
+ const Client = require('../types/client.js');
+ const WorkflowRun = require('../types/workflowrun.js');
+ const ws_manager = require('./ws_manager.js');
+ const ws_workflowrun = require('./ws_workflowrun.js');
+
+ let allClients = {}; // has publishers and subscribers
+ let probeClients = {}; // a subset of clients
+ let workflowManagerClients = {}; // a subset of clients
+ let workflowRunClients = {}; // a subset of clients
+
+ //let io;
+
+ // key: workflow id
+ // value: Workflow instance
+ let workflows = {};
+
+ // key: workflow run id
+ // value: WorkflowRun instance
+ let workflowRuns = {};
+
+ let serviceEvents = {
+ GREETING: 'cord.workflow.ctlsvc.greeting'
+ };
+
+ // add ws_mgroperation events
+ _.forOwn(ws_manager.serviceEvents, (wsServiceEvent, key) => {
+ serviceEvents[key] = wsServiceEvent;
+ });
+
+ // add ws_runoperation events
+ _.forOwn(ws_workflowrun.serviceEvents, (wsServiceEvent, key) => {
+ serviceEvents[key] = wsServiceEvent;
+ });
+
+ //const setIO = (ioInstance) => {
+ // io = ioInstance;
+ //};
+
+ const destroy = () => {
+ removeClients();
+ clearWorkflowRuns();
+ clearWorkflows();
+ };
+
+ const listWorkflows = () => {
+ let workflowList = [];
+ _.forOwn(workflows, (_workflow, workflowId) => {
+ workflowList.push(workflowId);
+ });
+ return workflowList;
+ };
+
+ const checkWorkflow = (workflowId) => {
+ if(workflowId in workflows) {
+ return true;
+ }
+ return false;
+ };
+
+ const addWorkflow = (workflow) => {
+ if(workflow.getId() in workflows) {
+ logger.log('error', `there exists a workflow with the same id - ${workflow.getId()}`);
+ return false;
+ }
+
+ let workflowId = workflow.getId();
+ workflows[workflowId] = workflow;
+ return true;
+ };
+
+ const clearWorkflows = () => {
+ _.forOwn(workflows, (_workflow, workflowId) => {
+ delete workflows[workflowId];
+ });
+ };
+
+ const listWorkflowRuns = () => {
+ let workflowRunList = [];
+ _.forOwn(workflowRuns, (_workflowRun, workflowRunId) => {
+ workflowRunList.push(workflowRunId);
+ });
+ return workflowRunList;
+ };
+
+ const checkWorkflowRun = (workflowRunId) => {
+ if(workflowRunId in workflowRuns) {
+ return true;
+ }
+ return false;
+ };
+
+ const addWorkflowRun = (workflowRun) => {
+ let workflowId = workflowRun.getWorkflowId();
+ let workflowRunId = workflowRun.getId();
+
+ if(workflowRunId in workflowRuns) {
+ logger.log('warn', `there exists a workflow run with the same id - ${workflowRunId}`);
+ return false;
+ }
+
+ if(!(workflowId in workflows)) {
+ logger.log('warn', `cannot find a workflow with id - ${workflowId}`);
+ return false;
+ }
+
+ workflowRuns[workflowRunId] = workflowRun;
+ return true;
+ };
+
+ const clearWorkflowRuns = () => {
+ _.forOwn(workflowRuns, (_workflowRun, workflowRunId) => {
+ delete workflowRuns[workflowRunId];
+ });
+ };
+
+ const updateWorkflowRunStatus = (workflowRunId, taskId, status) => {
+ if(!(workflowRunId in workflowRuns)) {
+ logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
+ return false;
+ }
+
+ let workflowRun = workflowRuns[workflowRunId];
+ workflowRun.updateTaskStatus(taskId, status);
+ return true;
+ };
+
+ const setWorkflowRunKickstarted = (workflowRunId) => {
+ if(!(workflowRunId in workflowRuns)) {
+ logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
+ return false;
+ }
+
+ let workflowRun = workflowRuns[workflowRunId];
+ workflowRun.setKickstarted();
+ return true;
+ };
+
+ const kickstart = (workflowId, workflowRunId) => {
+ if(!(workflowId in workflows)) {
+ logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
+ return false;
+ }
+
+ if(!(workflowRunId in workflowRuns)) {
+ logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
+ return false;
+ }
+
+ ws_manager.kickstartWorkflow(workflowId, workflowRunId);
+ };
+
+ const removeWorkflow = (workflowId) => {
+ if(!(workflowId in workflows)) {
+ logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
+ return false;
+ }
+
+ // check if there are workflow runs
+ _.forOwn(workflowRuns, (workflowRun, _workflowRunId) => {
+ if(workflowRun.getWorkflowId() === workflowId) {
+ logger.log('warn', `there exists a workflow run for a workflow id - ${workflowId}`);
+ return false;
+ }
+ });
+
+ delete workflows[workflowId];
+ return true;
+ };
+
+ const removeWorkflowRun = (workflowRunId) => {
+ if(!(workflowRunId in workflowRuns)) {
+ logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
+ return false;
+ }
+
+ let workflowRun = workflowRuns[workflowRunId];
+ delete workflowRuns[workflowRunId];
+
+ workflowRun.setFinished();
+ return true;
+ };
+
+ const sendEvent = (topic, message) => {
+ // list of workflowIds
+ // to check if there are workflow runs for the events
+ let workflowIdsRunning = [];
+
+ // route event to running instances
+ _.forOwn(workflowRuns, (workflowRun, workflowRunId) => {
+ let workflowId = workflowRun.getWorkflowId();
+ let workflow = workflows[workflowId];
+
+ // event will be routed to workflow runs that meet following criteria
+ // 1) the workflow is currently interested in the same topic
+ // (already finished tasks are not counted)
+ // 2) the task's key field and value
+ if(workflowRun.isEventAcceptable(workflow, topic, message)) {
+ logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
+ workflowRun.enqueueEvent(topic, message);
+
+ if(!workflowIdsRunning.includes(workflowId)) {
+ workflowIdsRunning.push(workflowId);
+ }
+ }
+ });
+
+ // check if the event is a kickstart event
+ _.forOwn(workflows, (workflow, workflowId) => {
+ if(workflow.isKickstartTopic(topic)) {
+ // check if there is a workflow run for the event
+ // kickstart a workflow if there is no workflows runs for the event
+ if(!workflowIdsRunning.includes(workflowId)) {
+ // we need to buffer the event until workflow run is brought up
+ let workflowRun = WorkflowRun.WorkflowRun.makeNewRun(workflow);
+ let workflowRunId = workflowRun.getId();
+
+ // register for management
+ workflowRuns[workflowRunId] = workflowRun;
+
+ // route event
+ logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
+ workflowRun.enqueueEvent(topic, message);
+
+ // KICKSTART!
+ kickstart(workflowId, workflowRunId);
+ }
+ }
+ });
+ };
+
+ const fetchEvent = (workflowRunId, taskId, topic) => {
+ // this returns an event or an empty obj when there is no message
+ if(!(workflowRunId in workflowRuns)) {
+ logger.log('warn', `workflow run ${workflowRunId} does not exist`);
+ return null;
+ }
+
+ let workflowRun = workflowRuns[workflowRunId];
+ let workflowId = workflowRun.getWorkflowId();
+
+ if(!(workflowId in workflows)) {
+ logger.log('warn', `workflow ${workflowId} does not exist`);
+ return null;
+ }
+
+ let workflow = workflows[workflowId];
+
+ let task = workflow.getTask(taskId);
+ if(!task) {
+ logger.log('warn', `workflow ${workflowId} does not have task ${taskId}`);
+ return null;
+ }
+
+ logger.log('debug', `workflow run ${workflowRunId}, task ${taskId} fetches an event`);
+
+ let event = workflowRun.dequeueEvent(topic);
+ if(event) {
+ return event;
+ }
+ else {
+ return {};
+ }
+ };
+
+ const addClient = (c) => {
+ let clientId = c.getId();
+ let socket = c.getSocket();
+
+ // check id that client is already there
+ if(clientId in allClients) {
+ logger.log('warn', `there exists a client with the same id - ${clientId}`);
+ return false;
+ }
+
+ if(c.getType() === Client.Type.PROBE) {
+ // probe' messages are relayed
+ // relay messages based on topic
+ // probe protocol:
+ // REQ:
+ // topic: event topic
+ // message: <data>
+ // RES:
+ // topic: topic sent
+ // message: {result: <true/false>, message: <error message> }
+ allClients[clientId] = c;
+ probeClients[clientId] = c;
+
+ socket.on('*', (msg) => {
+ let jsonMsg = msg.data;
+ if(jsonMsg.length === 2) {
+ // must have two parts
+ // first part is topic
+ // second part is message body
+ let topic = jsonMsg[0];
+ let messageBody = jsonMsg[1];
+
+ sendEvent(topic, messageBody);
+
+ // return true for success
+ socket.emit(topic, {
+ result: true
+ });
+ }
+ else {
+ logger.log('warn', `unexpected message is received - ${JSON.stringify(jsonMsg)}`);
+ socket.emit(jsonMsg[0], {
+ result: false,
+ message: `unexpected message is received - ${JSON.stringify(jsonMsg)}`
+ });
+ }
+ });
+ return true;
+ }
+ else if(c.getType() === Client.Type.WORKFLOW_MANAGER) {
+ // manager
+ // manager protocol:
+ // REQ:
+ // topic: operation
+ // message: <data>
+ // RES:
+ // topic: topic sent
+ // message: {result: <true/false>, message: <error message> }F
+ allClients[clientId] = c;
+ workflowManagerClients[clientId] = c;
+
+ // attach manager operations
+ let router = ws_manager.getRouter();
+ _.forOwn(router, (routerElem, _key) => {
+ socket.on(routerElem.topic, (msg) => {
+ routerElem.handler(routerElem.topic, msg, (err, result) => {
+ if(err) {
+ logger.log('warn', `unable to handle a message - ${err}`);
+ socket.emit(routerElem.topic, {
+ result: false,
+ message: err
+ });
+ return;
+ }
+
+ if(routerElem.return === undefined || routerElem.return) {
+ socket.emit(routerElem.topic, {
+ result: result
+ });
+ }
+ });
+ });
+ });
+ return true;
+ }
+ else if(c.getType() === Client.Type.WORKFLOW_RUN) {
+ // workflow run
+ // workflow run protocol:
+ // REQ:
+ // topic: operation
+ // message: <data>
+ // RES:
+ // topic: topic sent
+ // message: {result: <true/false>, message: <error message> }
+
+ // map to WorkflowRun instance
+ let workflowId = c.getWorkflowId();
+ let workflowRunId = c.getWorkflowRunId();
+ let workflowRun;
+
+ if(!(workflowId in workflows)) {
+ logger.log('warn', `cannot find a workflow ${workflowId}`);
+ return false;
+ }
+
+ // register client to workflow run
+ if(!(workflowRunId in workflowRuns)) {
+ // workflow run not exist yet
+ logger.log('warn', `cannot find a workflow run ${workflowRunId}`);
+ return false;
+ }
+
+ //let workflow = workflows[workflowId];
+
+ allClients[clientId] = c;
+ workflowRunClients[clientId] = c;
+
+ // update
+ workflowRun = workflowRuns[workflowRunId];
+ workflowRun.addClientId(clientId);
+
+ // attach workflow run operations
+ let router = ws_workflowrun.getRouter();
+ _.forOwn(router, (routerElem, _key) => {
+ socket.on(routerElem.topic, (msg) => {
+ routerElem.handler(routerElem.topic, msg, (err, result) => {
+ if(err) {
+ logger.log('warn', `unable to handle a message - ${err}`);
+ socket.emit(routerElem.topic, {
+ result: false,
+ message: err
+ });
+ return;
+ }
+
+ if(routerElem.return === undefined || routerElem.return) {
+ socket.emit(routerElem.topic, {
+ result: result
+ });
+ }
+ });
+ });
+ });
+ return true;
+ }
+ return false;
+ };
+
+ const removeClient = (id) => {
+ if(id in allClients) {
+ let removedClient = allClients[id];
+ delete allClients[id];
+
+ let type = removedClient.getType();
+ if(type === Client.Type.PROBE) {
+ delete probeClients[id];
+ }
+ else if(type === Client.Type.WORKFLOW_MANAGER) {
+ delete workflowManagerClients[id];
+ }
+ else if(type === Client.Type.WORKFLOW_RUN) {
+ delete workflowRunClients[id];
+
+ let workflowRunId = removedClient.getWorkflowRunId();
+ let workflowRun = workflowRuns[workflowRunId];
+
+ if(workflowRun) {
+ workflowRun.removeClientId(id);
+
+ //TODO
+ // WorkflowRun can have no clients between tasks
+ // So we should not remove the run until the workflow run finishes
+ }
+ }
+ }
+ };
+
+ const removeClients = () => {
+ let probeClients = {};
+
+ _.forOwn(probeClients, (_probeClient, clientId) => {
+ delete probeClients[clientId];
+ });
+
+ _.forOwn(workflowManagerClients, (_workflowManagerClient, clientId) => {
+ delete workflowManagerClients[clientId];
+ });
+
+ _.forOwn(workflowRunClients, (_workflowRunClients, clientId) => {
+ delete workflowRunClients[clientId];
+ });
+
+ _.forOwn(allClients, (client, clientId) => {
+ client.getSocket().disconnect(true);
+ delete allClients[clientId];
+ });
+ }
+
+ module.exports = {
+ serviceEvents: serviceEvents,
+ destroy: destroy,
+ getClients: () => { return allClients; },
+ getProbeClients: () => { return probeClients; },
+ getWorkflowManagerClients: () => { return workflowManagerClients; },
+ getWorkflowRunClients: () => { return workflowRunClients; },
+ clientType: Client.Type,
+ //setIO: setIO,
+ sendEvent: sendEvent,
+ fetchEvent: fetchEvent,
+ addClient: addClient,
+ removeClient: removeClient,
+ removeClients: removeClients,
+ addWorkflow: addWorkflow,
+ listWorkflows: listWorkflows,
+ checkWorkflow: checkWorkflow,
+ removeWorkflow: removeWorkflow,
+ clearWorkflows: clearWorkflows,
+ addWorkflowRun: addWorkflowRun,
+ listWorkflowRuns: listWorkflowRuns,
+ checkWorkflowRun: checkWorkflowRun,
+ removeWorkflowRun: removeWorkflowRun,
+ clearWorkflowRuns: clearWorkflowRuns,
+ updateWorkflowRunStatus: updateWorkflowRunStatus,
+ setWorkflowRunKickstarted: setWorkflowRunKickstarted,
+ };
+})();
\ No newline at end of file
diff --git a/src/controllers/rest_probe.js b/src/controllers/rest_probe.js
new file mode 100644
index 0000000..9d67386
--- /dev/null
+++ b/src/controllers/rest_probe.js
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+ 'use strict';
+
+ const express = require('express');
+ const {checkSchema, validationResult} = require('express-validator');
+ const logger = require('../config/logger.js');
+ const eventrouter = require('./eventrouter.js');
+
+ // HTTP REST interface for message intake
+ // POST method
+ // Message format:
+ // {
+ // topic: 'topic here',
+ // message: 'message body here'
+ // }
+ // e.g., /intake?topic=aaa&message=bbb
+ const intakeMessageInputValidator = {
+ topic: {
+ in: ['params', 'query'],
+ errorMessage: 'Message topic is null or empty',
+ },
+ message: {
+ in: ['params', 'query'],
+ errorMessage: 'Message body is null or empty',
+ }
+ };
+
+ const intakeMessage = (req, res) => {
+ let errors = validationResult(req);
+ if(!errors.isEmpty()) {
+ res.status(400).send(
+ JSON.stringify({
+ errors: errors.array()
+ })
+ );
+ return;
+ }
+
+ let jsonMessage = req.body
+ logger.debug(`Received a message ${jsonMessage}`);
+
+ // send the message to the event distributor
+ eventrouter.sendEvent(jsonMessage.topic, jsonMessage.message);
+
+ res.status(200).send({
+ result: true
+ });
+ return;
+ };
+
+ const getRouter = () => {
+ var routerInstance = new express.Router();
+ routerInstance.use((req, res, next) => {
+ logger.info(`[REQ] ${req.method}, ${req.url}`);
+ next();
+ });
+
+ // intake apis
+ routerInstance.post('/intake', checkSchema(intakeMessageInputValidator), intakeMessage);
+ return routerInstance;
+ };
+
+ module.exports = {
+ getRouter: getRouter
+ };
+})();
\ No newline at end of file
diff --git a/src/controllers/websocket.js b/src/controllers/websocket.js
new file mode 100644
index 0000000..3f3216a
--- /dev/null
+++ b/src/controllers/websocket.js
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+(function () {
+ 'use strict';
+
+ const socketio = require('socket.io');
+ const ioWildcard = require('socketio-wildcard');
+ const client = require('../types/client.js');
+ const eventrouter = require('./eventrouter.js');
+ const logger = require('../config/logger.js');
+
+ let io;
+ const createSocketIO = (server) => {
+ // INSTANTIATE SOCKET.IO
+ io = socketio.listen(server);
+ io.use(ioWildcard());
+
+ // set io to eventrouter
+ //eventrouter.setIO(io);
+
+ // LISTEN TO "CONNECTION" EVENT (FROM SOCKET.IO)
+ io.on('connection', (socket) => {
+ let query = socket.handshake.query;
+ logger.log('debug', `connect ${JSON.stringify(query)}`);
+ let added = false;
+
+ // make a client
+ let c = client.Client.fromObj(query);
+ c.setSocket(socket);
+
+ if(!c.validate()) {
+ logger.log('warn', `client validation failed - ${JSON.stringify(query)}`);
+ return;
+ }
+
+ // register the client for management
+ if(eventrouter.addClient(c)) {
+ // Send a greeting message to the client
+ socket.emit(eventrouter.serviceEvents.GREETING, {
+ to: c.getId(),
+ message: 'Welcome to CORD Workflow Control Service'
+ });
+
+ added = true;
+ }
+ else {
+ logger.log('warn', `client could not be added - ${JSON.stringify(query)}`);
+ socket.disconnect(true);
+ }
+
+ // set a disconnect event handler
+ socket.on('disconnect', (reason) => {
+ logger.log('debug', `disconnect ${reason} ${JSON.stringify(query)}`);
+ if(added) {
+ eventrouter.removeClient(c.getId());
+ }
+ });
+ });
+ };
+
+ const destroySocketIO = () => {
+ io.close();
+ };
+
+ const getSocketIO = () => io;
+
+ module.exports = {
+ create: createSocketIO,
+ destroy: destroySocketIO,
+ get: getSocketIO
+ };
+
+ // USAGE
+ // const socketIo = require('./controllers/websocket.js');
+ // const socket = socketIo.get();
+ // socket.emit('eventName', data);
+
+})();
\ No newline at end of file
diff --git a/src/controllers/ws_manager.js b/src/controllers/ws_manager.js
new file mode 100644
index 0000000..b3e8877
--- /dev/null
+++ b/src/controllers/ws_manager.js
@@ -0,0 +1,328 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+ 'use strict';
+
+ const _ = require('lodash');
+ const Workflow = require('../types/workflow.js');
+ const logger = require('../config/logger.js');
+
+ let serviceEvents = {
+ WORKFLOW_REG: 'cord.workflow.ctlsvc.workflow.reg',
+ WORKFLOW_REG_ESSENCE: 'cord.workflow.ctlsvc.workflow.reg_essence',
+ WORKFLOW_LIST: 'cord.workflow.ctlsvc.workflow.list',
+ WORKFLOW_RUN_LIST: 'cord.workflow.ctlsvc.workflow.run.list',
+ WORKFLOW_CHECK: 'cord.workflow.ctlsvc.workflow.check',
+ WORKFLOW_KICKSTART: 'cord.workflow.ctlsvc.workflow.kickstart',
+ WORKFLOW_REMOVE: 'cord.workflow.ctlsvc.workflow.remove',
+ WORKFLOW_RUN_REMOVE: 'cord.workflow.ctlsvc.workflow.run.remove'
+ };
+
+ // WebSocket interface for workflow registration
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.reg',
+ // message: <workflow>
+ // }
+ const registWorkflow = (topic, message, cb) => {
+ const distributor = require('./eventrouter.js/index.js');
+
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let workflow = message;
+
+ logger.log('debug', `workflow - ${JSON.stringify(workflow)}`);
+
+ let result = distributor.addWorkflow(workflow);
+ if(!result) {
+ errorMessage = `failed to register a workflow ${workflow.getId()}`;
+ cb(errorMessage, false);
+ }
+ else {
+ cb(null, true);
+ }
+ return;
+ };
+
+ // WebSocket interface for workflow registration (via essence)
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.reg',
+ // message: <workflow essence>
+ // }
+ const registerWorkflowEssence = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let essence = message;
+ let result = true;
+ let errorResults = [];
+
+ logger.log('debug', `workflow essence - ${JSON.stringify(essence)}`);
+
+ let workflows = Workflow.loadWorkflowsFromEssence(essence);
+ workflows.forEach((workflow) => {
+ if(workflow) {
+ let localResult = eventrouter.addWorkflow(workflow);
+ errorResults.push(localResult);
+ result = result && localResult; // false if any of registrations fails
+ }
+ });
+
+ if(!result) {
+ errorMessage = `failed to register workflows ${errorResults}`;
+ cb(errorMessage, false);
+ }
+ else {
+ cb(null, true);
+ }
+ return;
+ };
+
+ // WebSocket interface for workflow listing
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.list',
+ // message: null
+ // }
+ const listWorkflows = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let result = eventrouter.listWorkflows();
+ cb(null, result);
+ return;
+ };
+
+ // WebSocket interface for workflow run listing
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.list',
+ // message: null
+ // }
+ const listWorkflowRuns = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let result = eventrouter.listWorkflowRuns();
+ cb(null, result);
+ return;
+ };
+
+ // WebSocket interface for workflow check
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.check',
+ // message: <workflow_id>
+ // }
+ const checkWorkflow = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let workflowId = message;
+ let result = eventrouter.checkWorkflow(workflowId);
+ cb(null, result);
+ return;
+ };
+
+ // WebSocket interface for workflow start notification
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.kickstart',
+ // message: {
+ // workflow_id: <workflow_id>,
+ // workflow_run_id: <workflow_run_id>
+ // }
+ // }
+ const notifyWorkflowStart = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_id' in message)) {
+ // error
+ errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_run_id' in message)) {
+ // error
+ errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let workflowRunId = message.workflow_run_id;
+
+ // there must be a workflow matching
+ // set the workflow kickstarted
+ eventrouter.setWorkflowRunKickstarted(workflowRunId);
+ cb(null, true);
+ return;
+ }
+
+ // WebSocket interface for workflow removal
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.remove',
+ // message: <workflow_id>
+ // }
+ const removeWorkflow = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let workflowId = message;
+ let result = eventrouter.removeWorkflow(workflowId);
+ cb(null, result);
+ return;
+ }
+
+ // WebSocket interface for workflow run removal
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.run.remove',
+ // message: {
+ // workflow_id: <workflow_id>,
+ // workflow_run_id: <workflow_run_id>
+ // }
+ // }
+ const removeWorkflowRun = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_id' in message)) {
+ // error
+ errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_run_id' in message)) {
+ // error
+ errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let workflowRunId = message.workflow_run_id;
+
+ let result = eventrouter.removeWorkflowRun(workflowRunId);
+ cb(null, result);
+ return;
+ }
+
+ const getRouter = () => {
+ return {
+ registWorkflow: {
+ topic: serviceEvents.WORKFLOW_REG,
+ handler: registWorkflow
+ },
+ registerWorkflowEssence: {
+ topic: serviceEvents.WORKFLOW_REG_ESSENCE,
+ handler: registerWorkflowEssence
+ },
+ listWorkflows: {
+ topic: serviceEvents.WORKFLOW_LIST,
+ handler: listWorkflows
+ },
+ listWorkflowRuns: {
+ topic: serviceEvents.WORKFLOW_RUN_LIST,
+ handler: listWorkflowRuns
+ },
+ checkWorkflow: {
+ topic: serviceEvents.WORKFLOW_CHECK,
+ handler: checkWorkflow
+ },
+ notifyWorkflowStart: {
+ topic: serviceEvents.WORKFLOW_KICKSTART,
+ handler: notifyWorkflowStart,
+ return: false
+ },
+ removeWorkflow: {
+ topic: serviceEvents.WORKFLOW_REMOVE,
+ handler: removeWorkflow
+ },
+ removeWorkflowRun: {
+ topic: serviceEvents.WORKFLOW_RUN_REMOVE,
+ handler: removeWorkflowRun
+ }
+ };
+ };
+
+ // out-going commands
+ const kickstartWorkflow = (workflowId, workflowRunId) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let clients = eventrouter.getWorkflowManagerClients();
+ _.forOwn(clients, (client, _clientId) => {
+ let socket = client.getSocket();
+ if(socket) {
+ socket.emit(serviceEvents.WORKFLOW_KICKSTART, {
+ workflow_id: workflowId,
+ workflow_run_id: workflowRunId
+ });
+ }
+ });
+ return;
+ };
+
+ module.exports = {
+ serviceEvents: serviceEvents,
+ getRouter: getRouter,
+ kickstartWorkflow: kickstartWorkflow
+ };
+})();
\ No newline at end of file
diff --git a/src/controllers/ws_workflowrun.js b/src/controllers/ws_workflowrun.js
new file mode 100644
index 0000000..f7ee474
--- /dev/null
+++ b/src/controllers/ws_workflowrun.js
@@ -0,0 +1,183 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+ 'use strict';
+
+ const logger = require('../config/logger.js');
+
+ let serviceEvents = {
+ WORKFLOW_RUN_UPDATE_STATUS: 'cord.workflow.ctlsvc.workflow.run.status',
+ WORKFLOW_RUN_FETCH_EVENT: 'cord.workflow.ctlsvc.workflow.run.fetch'
+ };
+
+ // WebSocket interface for workflow status update
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflowrun.status',
+ // message: {
+ // workflow_id: <workflow_id>,
+ // workflow_run_id: <workflow_run_id>,
+ // task_id: <task_id>,
+ // status: 'begin' or 'end'
+ // }
+ // }
+ const updateWorkflowRunStatus = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_id' in message)) {
+ // error
+ errorMessage = `workflow_id field is not in message body - ${message}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_run_id' in message)) {
+ // error
+ errorMessage = `workflow_run_id field is not in message body - ${message}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('task_id' in message)) {
+ // error
+ errorMessage = `task_id field is not in message body - ${message}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('status' in message)) {
+ // error
+ errorMessage = `status field is not in message body - ${message}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let result = eventrouter.updateWorkflowRunStatus(
+ message.workflow_id,
+ message.workflow_run_id,
+ message.task_id,
+ message.status.toLowerCase()
+ );
+ cb(null, result);
+ return;
+ };
+
+ // WebSocket interface for workflow status update
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflowrun.fetch',
+ // message: {
+ // workflow_id: <workflow_id>,
+ // workflow_run_id: <workflow_run_id>,
+ // task_id: <task_id>,
+ // topic: <expected topic>
+ // }
+ // }
+ const fetchEvent = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_id' in message)) {
+ // error
+ errorMessage = `workflow_id field is not in message body - ${message}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_run_id' in message)) {
+ // error
+ errorMessage = `workflow_run_id field is not in message body - ${message}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('task_id' in message)) {
+ // error
+ errorMessage = `task_id field is not in message body - ${message}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('topic' in message)) {
+ // error
+ errorMessage = `topic field is not in message body - ${message}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let result = eventrouter.fetchEvent(
+ message.workflow_run_id,
+ message.task_id,
+ message.topic
+ );
+ if(result) {
+ // empty object {} when no message
+ cb(null, result);
+ }
+ else {
+ cb(
+ `could not fetch event ${message.topic} from workflow run ${message.workflow_run_id}`,
+ null
+ );
+ }
+ return;
+ };
+
+ const getRouter = () => {
+ return {
+ updateWorkflowRunStatus: {
+ topic: serviceEvents.WORKFLOW_RUN_UPDATE_STATUS,
+ handler: updateWorkflowRunStatus
+ },
+ fetchEvent: {
+ topic: serviceEvents.WORKFLOW_RUN_FETCH_EVENT,
+ handler: fetchEvent
+ }
+ };
+ };
+
+ module.exports = {
+ serviceEvents: serviceEvents,
+ getRouter: getRouter
+ };
+})();
\ No newline at end of file
diff --git a/src/server.js b/src/server.js
new file mode 100644
index 0000000..6f70d3a
--- /dev/null
+++ b/src/server.js
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+ 'use strict';
+
+ const express = require('express');
+ const config = require('./config/config.js').service;
+ const bodyParser = require('body-parser');
+ const cors = require('cors');
+ const socketIo = require('./controllers/websocket.js');
+ const eventrouter = require('./controllers/eventrouter.js');
+ const workflowLoader = require('./workflows/loader.js');
+ const logger = require('./config/logger.js');
+ const rest_probe = require('./controllers/rest_probe.js');
+
+ const app = express();
+
+ // Apply middlewares
+ app.use(cors());
+ app.use(bodyParser.json());
+
+ // Set a router for intake interface
+ app.use('/', rest_probe.getRouter());
+
+ const startServer = (port) => {
+ // if is running just return it
+ if(app.server) {
+ return app.server;
+ }
+
+ const server = app.listen(port || config.port, () => {
+ logger.info(`Express is listening to http://localhost:${port || config.port}`);
+
+ // once server is ready setup WebSocket
+ socketIo.create(server);
+
+ // load built-in workflows
+ let workflows = workflowLoader.loadAllWorkflows();
+ for(let workflow in workflows) {
+ eventrouter.addWorkflow(workflow);
+ }
+ });
+ app.server = server;
+ return server;
+ };
+
+ const stopServer = () => {
+ if(app.server) {
+ socketIo.destroy();
+ app.server.close();
+ app.server = undefined;
+ eventrouter.destroy();
+ }
+ }
+
+ if(!module.parent) {
+ startServer();
+ }
+
+ module.exports = {
+ serviceEvents: eventrouter.serviceEvents,
+ app: app,
+ start: startServer,
+ stop: stopServer
+ };
+})();
\ No newline at end of file
diff --git a/src/types/client.js b/src/types/client.js
new file mode 100644
index 0000000..02e8eb6
--- /dev/null
+++ b/src/types/client.js
@@ -0,0 +1,184 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+ 'use strict';
+
+ const _ = require('lodash');
+ const logger = require('../config/logger.js');
+
+ const ClientType = {
+ PROBE: 'probe',
+ WORKFLOW_MANAGER: 'workflow_manager',
+ WORKFLOW_RUN: 'workflow_run',
+ UNKNOWN: 'unknown'
+ };
+
+ class Client {
+ constructor(id) {
+ this.id = id.toLowerCase();
+ // a field value can be one of followings
+ // - probe : message publisher
+ // - workflow_manager : workflow manager
+ // - workflow_run : workflow run
+ this.type = ClientType.UNKNOWN;
+ // used by workflow_run
+ this.workflowId = null;
+ this.workflowRunId = null;
+ this.socket = null;
+ // optional info.
+ this.params = {};
+ }
+
+ static parseClientType(strClientType) {
+ if(!strClientType) {
+ return ClientType.UNKNOWN;
+ }
+ else if(['probe', 'prb'].includes(strClientType.toLowerCase())) {
+ return ClientType.PROBE;
+ }
+ else if(['workflow_manager', 'manager'].includes(strClientType.toLowerCase())) {
+ return ClientType.WORKFLOW_MANAGER;
+ }
+ else if(['workflow_run', 'run'].includes(strClientType.toLowerCase())) {
+ return ClientType.WORKFLOW_RUN;
+ }
+ else {
+ return ClientType.UNKNOWN;
+ }
+ }
+
+ static fromObj(obj) {
+ if(obj) {
+ let client;
+ if('id' in obj) {
+ client = new Client(obj['id']);
+ }
+ else {
+ logger.log('error', 'id is not given');
+ return null;
+ }
+
+ if('type' in obj) {
+ client.setType(obj.type);
+ }
+
+ if('workflow_id' in obj) {
+ client.setWorkflowId(obj.workflow_id);
+ }
+
+ if('workflow_run_id' in obj) {
+ client.setWorkflowRunId(obj.workflow_run_id);
+ }
+
+ if('socket' in obj) {
+ client.setSocket(obj.socket);
+ }
+
+ // all others are sent to params
+ client.params = {};
+ _.forOwn(obj, (val, key) => {
+ client.params[key] = val;
+ });
+ return client;
+ }
+ else {
+ return null;
+ }
+ }
+
+ setId(id) {
+ this.id = id.toLowerCase();
+ }
+
+ getId() {
+ return this.id;
+ }
+
+ setType(type) {
+ let clientType = Client.parseClientType(type);
+ this.type = clientType;
+ }
+
+ getType() {
+ return this.type;
+ }
+
+ setWorkflowId(id) {
+ this.workflowId = id;
+ }
+
+ getWorkflowId() {
+ return this.workflowId;
+ }
+
+ setWorkflowRunId(id) {
+ this.workflowRunId = id;
+ }
+
+ getWorkflowRunId() {
+ return this.workflowRunId;
+ }
+
+ setParams(params={}) {
+ this.params = params;
+ }
+
+ getParams() {
+ return this.params;
+ }
+
+ setSocket(socket) {
+ this.socket = socket;
+ }
+
+ getSocket() {
+ return this.socket;
+ }
+
+ validate() {
+ // id field is required for all types of clients
+ if(!this.id) {
+ logger.log('error', 'id is not given');
+ return false;
+ }
+
+ if(this.type === ClientType.UNKNOWN) {
+ logger.log('error', 'type is not given properly');
+ return false;
+ }
+
+ if(this.type === ClientType.WORKFLOW_RUN) {
+ if(!this.workflowId) {
+ logger.log('error', 'workflowId is not given');
+ return false;
+ }
+
+ if(!this.workflowRunId) {
+ logger.log('error', 'workflowRunId is not given');
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+
+ module.exports = {
+ Type: ClientType,
+ Client: Client
+ };
+})();
\ No newline at end of file
diff --git a/src/types/workflow.js b/src/types/workflow.js
new file mode 100644
index 0000000..11ace2f
--- /dev/null
+++ b/src/types/workflow.js
@@ -0,0 +1,237 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+ 'use strict';
+
+ const _ = require('lodash');
+ const WorkflowTask = require('./workflowtask.js');
+ const logger = require('../config/logger.js');
+
+ const loadWorkflowsFromEssence = (essence) => {
+ // an essence can have multiple workflows
+ let workflows = [];
+ _.forOwn(essence, (workflowEssence, _workflowId) => {
+ let workflow = Workflow.fromEssence(workflowEssence);
+ if(workflow) {
+ workflows.push(workflow);
+ }
+ });
+ return workflows;
+ };
+
+ class Workflow {
+ constructor(id) {
+ // dag_id
+ this.id = id;
+
+ // key: topic
+ // value: an array of WorkflowTask objects
+ this.topics = {};
+
+ // key: task id
+ // value: WorkflowTask object
+ this.tasks = {};
+
+ // preserve raw essense
+ this.essence = {};
+ }
+
+ static fromEssence(essence) {
+ if(essence) {
+ let workflow;
+ if('dag' in essence) {
+ let dag = essence.dag;
+ if('dag_id' in dag) {
+ workflow = new Workflow(dag.dag_id);
+ }
+ else {
+ logger.log('error', 'dag is not given');
+ return null;
+ }
+ }
+ else {
+ logger.log('error', 'dag is not given');
+ return null;
+ }
+
+ // read this to detect kickstart events
+ // use map for fast look up
+ let headTasks = {};
+ if('dependencies' in essence) {
+ let dependencies = essence.dependencies;
+ _.forOwn(dependencies, (dependency, taskId) => {
+ // if the task does not have parents, it means the head task.
+ if(!('parents' in dependency)) {
+ // kickstart task
+ headTasks[taskId] = true;
+ }
+ else {
+ if(!dependency['parents'] || dependency['parents'].length === 0) {
+ // empty array
+ // kickstart task
+ headTasks[taskId] = true;
+ }
+ }
+ });
+ }
+
+ if('tasks' in essence) {
+ let tasks = essence.tasks;
+ _.forOwn(tasks, (taskEssence, _taskId) => {
+ let task = WorkflowTask.WorkflowTask.fromEssence(taskEssence);
+
+ // if its in head tasks, it has a kickstart event.
+ if(task.getId() in headTasks) {
+ task.setKickstart(true);
+ }
+
+ workflow.addTask(task);
+ });
+ }
+
+ workflow.essence = essence;
+ return workflow;
+ }
+ return undefined;
+ }
+
+ setId(id) {
+ this.id = id;
+ }
+
+ getId() {
+ return this.id;
+ }
+
+ getTopics() {
+ let allTopics = [];
+ _.forOwn(this.topics, (_tasks, topic) => {
+ // value is an array
+ if(!allTopics.includes(topic)) {
+ allTopics.push(topic);
+ }
+ });
+ return allTopics;
+ }
+
+ getTasksForTopic(topic) {
+ if(topic in this.topics) {
+ let workflowTasks = this.topics[topic];
+ return workflowTasks;
+ }
+ return undefined;
+ }
+
+ hasTasksForTopic(topic) {
+ if(topic in this.topics) {
+ return true;
+ }
+ return false;
+ }
+
+ getTasks() {
+ return this.tasks;
+ }
+
+ getTask(id) {
+ if(id in this.tasks) {
+ let workflowTask = this.tasks[id];
+ return workflowTask;
+ }
+ return undefined;
+ }
+
+ getKickstartTopics() {
+ let kickstartTopics = [];
+ _.forOwn(this.tasks, (task, _taskId) => {
+ if(task.isKickstart()) {
+ let topic = task.getTopic();
+ if(!kickstartTopics.includes(topic)) {
+ kickstartTopics.push(topic);
+ }
+ }
+ });
+ return kickstartTopics;
+ }
+
+ isKickstartTopic(topic) {
+ let kickstartTopics = this.getKickstartTopics();
+ if(kickstartTopics.includes(topic)) {
+ return true;
+ }
+ return false;
+ }
+
+ addTask(task) {
+ let taskId = task.getId();
+ if(taskId in this.tasks) {
+ logger.log('warn', `there exists a task with the same id - ${JSON.stringify(task)}`);
+ return false;
+ }
+
+ this.tasks[taskId] = task;
+
+ let taskTopic = task.getTopic();
+ if(taskTopic in this.topics) {
+ this.topics[taskTopic].push(task);
+ }
+ else {
+ this.topics[taskTopic] = [task];
+ }
+ return true;
+ }
+
+ setEssence(essence) {
+ this.essence = essence;
+ }
+
+ getEssence() {
+ return this.essence;
+ }
+
+ validate() {
+ if(!this.id) {
+ logger.log('error', 'id is not given');
+ return false;
+ }
+
+ if(!this.tasks || Object.keys(this.tasks).length > 0) {
+ logger.log('error', 'task is not given');
+ return false;
+ }
+
+ let countKickstartEvent = 0;
+ _.forOwn(this.tasks, (task, _taskId) => {
+ if(task.isKickstart()) {
+ countKickstartEvent++;
+ }
+ });
+
+ if(countKickstartEvent <= 0) {
+ logger.log('error', 'kickstart event is not given');
+ return false;
+ }
+ return true;
+ }
+ }
+
+ module.exports = {
+ Workflow: Workflow,
+ loadWorkflowsFromEssence: loadWorkflowsFromEssence
+ };
+})();
\ No newline at end of file
diff --git a/src/types/workflowrun.js b/src/types/workflowrun.js
new file mode 100644
index 0000000..65027ff
--- /dev/null
+++ b/src/types/workflowrun.js
@@ -0,0 +1,416 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+ 'use strict';
+
+ const _ = require('lodash');
+ const dateformat = require('dateformat');
+ const WorkflowRunTask = require('./workflowruntask.js');
+ const logger = require('../config/logger.js');
+
+ class WorkflowRun {
+ constructor(workflowId, workflowRunId) {
+ // workflow run id (dag_run_id)
+ this.id = workflowRunId;
+ // workflow id
+ this.workflowId = workflowId;
+
+ // workflow run tasks - for storing status
+ // id: task id
+ // value : workflow run task obj
+ this.runTasks = {};
+
+ // storing key-field, key-value pairs for <event, workflow run> mapping
+ // key: topic
+ // value: [{
+ // field:
+ // value:
+ // }, ...]
+ this.eventKeyFieldValues = {};
+
+ // client ids
+ this.clientIds = [];
+
+ // event queue
+ // {
+ // topic: topic,
+ // message: message
+ // }
+ this.eventQueue = [];
+ // trash bins
+ // dequeued events are sent to this queue
+ // for debugging
+ this.trashEventQueue = [];
+
+ this.kickstarted = false;
+ this.finished = false;
+ }
+
+ static makeWorkflowRunId(workflowId) {
+ let now = new Date();
+ let datetimestr = dateformat(now, 'yyyymmdd_HHMMssl');
+ return `${workflowId}_${datetimestr}`;
+ }
+
+ static makeNewRun(workflow) {
+ let workflowId = workflow.getId();
+ let workflowRunId = WorkflowRun.makeWorkflowRunId(workflowId);
+ let workflowRun = new WorkflowRun(workflowId, workflowRunId);
+
+ let tasks = workflow.getTasks();
+ _.forOwn(tasks, (task, taskId) => {
+ // set run tasks
+ let runTask = new WorkflowRunTask.WorkflowRunTask(taskId);
+ workflowRun.addRunTask(runTask);
+
+ // set key_field / value
+ workflowRun.setEventKeyFieldValue(task.getTopic(), task.getKeyField(), null); // init
+ });
+ return workflowRun;
+ }
+
+ setId(id) {
+ this.id = id;
+ }
+
+ getId() {
+ return this.id;
+ }
+
+ setWorkflowId(workflowId) {
+ this.workflowId = workflowId;
+ }
+
+ getWorkflowId() {
+ return this.workflowId;
+ }
+
+ addRunTask(runTask) {
+ this.runTasks[runTask.getTaskId()] = runTask;
+ }
+
+ getRunTask(taskId) {
+ if(taskId in this.runTasks) {
+ return this.runTasks[taskId];
+ }
+ return undefined;
+ }
+
+ getTaskStatus(taskId) {
+ return this.runTasks[taskId].getStatus();
+ }
+
+ updateTaskStatus(taskId, status) {
+ let runTask = this.runTasks[taskId].getStatus();
+ runTask.setStatus(status);
+ }
+
+ setEventKeyFieldValue(topic, field, value=null) {
+ let keyFieldValues;
+ if(!(topic in this.eventKeyFieldValues)) {
+ keyFieldValues = [];
+ // put a new empty array
+ this.eventKeyFieldValues[topic] = keyFieldValues;
+ }
+ else {
+ keyFieldValues = this.eventKeyFieldValues[topic];
+ }
+
+ let index = _.findIndex(keyFieldValues, (keyFieldValue) => {
+ return keyFieldValue.field === field;
+ });
+
+ if(index >= 0) {
+ // update
+ keyFieldValues[index] = {
+ field: field,
+ value: value
+ };
+ }
+ else {
+ // push a new
+ keyFieldValues.push({
+ field: field,
+ value: value
+ });
+ }
+ return true;
+ }
+
+ isEventKeyFieldValueAcceptable(topic, field, value) {
+ if(!(topic in this.eventKeyFieldValues)) {
+ // topic does not exist
+ return false;
+ }
+
+ let keyFieldValues = this.eventKeyFieldValues[topic];
+ let index = _.findIndex(keyFieldValues, (keyFieldValue) => {
+ return (keyFieldValue.field === field) &&
+ ((!keyFieldValue.value) || (keyFieldValue.value === value));
+ });
+
+ if(index >= 0) {
+ return true;
+ }
+ return false;
+ }
+
+ isEventAcceptableByKeyFieldValue(topic, message) {
+ if(!(topic in this.eventKeyFieldValues)) {
+ // topic does not exist
+ return false;
+ }
+
+ let keyFieldValues = this.eventKeyFieldValues[topic];
+ keyFieldValues.forEach((keyFieldValue) => {
+ if(keyFieldValue.field in message) {
+ // has same field in the message
+ // check value
+ if(keyFieldValue.value === message[keyFieldValue.field]) {
+ // has the same value
+ return true;
+ }
+ }
+ });
+ return false;
+ }
+
+ getFilteredRunTasks(includes, excludes) {
+ // returns tasks with filters
+ let includeStatuses=[];
+ let excludeStatuses=[];
+ let includeAll = false;
+
+ if(includes) {
+ if(Array.isArray(includes)) {
+ // array
+ includes.forEach((include) => {
+ if(!includeStatuses.includes(include)) {
+ includeStatuses.push(include);
+ }
+ });
+ }
+ else {
+ includeStatuses.push(includes);
+ }
+ }
+ else {
+ // undefined or null
+ // include all
+ includeAll = true;
+ }
+
+ if(excludes) {
+ if(Array.isArray(excludes)) {
+ // array
+ excludes.forEach((exclude) => {
+ if(!excludeStatuses.includes(exclude)) {
+ excludeStatuses.push(exclude);
+ }
+ });
+ }
+ else {
+ excludeStatuses.push(excludes);
+ }
+ }
+ else {
+ // in this case, nothing will be excluded
+ // leave the array empty
+ }
+
+ let filteredRunTasks = [];
+ _.forOwn(this.runTasks, (runTask, _runTaskId) => {
+ // 'excludes' has a higher priority than 'includes'
+ if(!excludes.includes(runTask.getStatus())) {
+ if(includeAll || includes.includes(runTask.getStatus())) {
+ // screen tasks that are not finished
+ filteredRunTasks.push(runTask);
+ }
+ }
+ });
+ return filteredRunTasks;
+ }
+
+ getFilteredTopics(workflow, includes, excludes) {
+ // returns topics with filters
+ let filteredRunTasks = this.getFilteredRunTasks(includes, excludes);
+ let filteredTopics = [];
+
+ filteredRunTasks.forEach((runTask) => {
+ let taskId = runTask.getTaskId();
+ let task = workflow.getTask(taskId);
+ let topic = task.getTopic();
+ if(!filteredTopics.includes(topic)) {
+ filteredTopics.push(topic);
+ }
+ });
+ return filteredTopics;
+ }
+
+ getAllTopics(workflow) {
+ return this.getFilteredTopics(workflow, null, null);
+ }
+
+ getAcceptableTopics(workflow) {
+ // return topics for tasks that are running or to be run in the future
+ // include all tasks that are not ended
+ return this.getFilteredTopics(workflow, null, [WorkflowRunTask.TaskStatus.END]);
+ }
+
+ isTopicAcceptable(workflow, topic) {
+ // get topics of tasks that are not completed yet
+ let filteredTopics = this.getFilteredTopics(
+ workflow,
+ null,
+ [WorkflowRunTask.TaskStatus.END]
+ );
+
+ if(filteredTopics.includes(topic)) {
+ return true;
+ }
+ else {
+ return false;
+ }
+ }
+
+ isEventAcceptable(workflow, topic, message) {
+ // event is acceptable if it meets following criteria
+ // 1) the workflow is currently interested in the same topic
+ // (finished tasks are not counted)
+ // 2) the task's key field and value
+ if(this.isTopicAcceptable(workflow, topic) &&
+ this.isEventAcceptableByKeyFieldValue(topic, message)) {
+ return true;
+ }
+ return false;
+ }
+
+ addClientId(clientId) {
+ if(!this.clientIds.includes(clientId)) {
+ this.clientIds.push(clientId);
+ }
+ }
+
+ removeClientId(clientId) {
+ _.pull(this.clientIds, clientId);
+ }
+
+ getClientIds() {
+ return this.clientIds;
+ }
+
+ enqueueEvent(topic, message) {
+ this.eventQueue.push({
+ topic: topic,
+ message: message
+ });
+ }
+
+ peekEvent() {
+ // if the queue is empty, this returns undefined
+ if(this.eventQueue.length > 0) {
+ return this.eventQueue[0];
+ }
+ return undefined;
+ }
+
+ dequeueEvent() {
+ // if the queue is empty, this returns undefined
+ if(this.eventQueue.length > 0) {
+ let events = _.pullAt(this.eventQueue, [0]);
+
+ // move to trash
+ this.trashEventQueue.push(events[0]);
+ return events[0];
+ }
+ return undefined;
+ }
+
+ peekEventByTopic(topic) {
+ // if the queue is empty, this returns undefined
+ let index = _.findIndex(this.eventQueue, (event) => {
+ return event.topic === topic;
+ });
+
+ if(index >= 0) {
+ return this.eventQueue[index];
+ }
+ return undefined;
+ }
+
+ dequeueEventByTopic(topic) {
+ // find event by topic.
+ // returns only first item in the queue
+ // if the queue is empty, this returns undefined
+ let index = _.findIndex(this.eventQueue, (event) => {
+ return event.topic === topic;
+ });
+
+ if(index >= 0) {
+ let events = _.pullAt(this.eventQueue, [index]);
+
+ // move to trash
+ this.trashEventQueue.push(events[0]);
+ return events[0];
+ }
+ return undefined;
+ }
+
+ getTrashEvents() {
+ return this.trashEventQueue;
+ }
+
+ lengthEventQueue() {
+ return this.eventQueue.length;
+ }
+
+ setKickstarted() {
+ this.kickstarted = true;
+ }
+
+ isKickstarted() {
+ return this.kickstarted;
+ }
+
+ setFinished() {
+ this.finished = true;
+ }
+
+ isFinished() {
+ return this.finished;
+ }
+
+ validate() {
+ if(!this.id) {
+ logger.log('error', 'id is not given');
+ return false;
+ }
+
+ if(!this.workflowId) {
+ logger.log('error', 'workflowId is not given');
+ return false;
+ }
+
+ return true;
+ }
+ }
+
+ module.exports = {
+ WorkflowRun: WorkflowRun
+ };
+})();
\ No newline at end of file
diff --git a/src/types/workflowruntask.js b/src/types/workflowruntask.js
new file mode 100644
index 0000000..fbaa604
--- /dev/null
+++ b/src/types/workflowruntask.js
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+ 'use strict';
+
+ const logger = require('../config/logger.js');
+
+ const TaskStatus = {
+ INIT: 'init',
+ BEGIN: 'begin',
+ END: 'end',
+ UNKNOWN: 'unknown'
+ };
+
+ class WorkflowRunTask {
+ constructor(taskId) {
+ this.taskId = taskId;
+ this.status = TaskStatus.UNKNOWN;
+ }
+
+ static parseStatus(strTaskStatus) {
+ if(!strTaskStatus) {
+ return TaskStatus.UNKNOWN;
+ }
+ else if(['i', 'init'].includes(strTaskStatus.toLowerCase())) {
+ return TaskStatus.END;
+ }
+ else if(['b', 'begin', 'start'].includes(strTaskStatus.toLowerCase())) {
+ return TaskStatus.BEGIN;
+ }
+ else if(['e', 'end', 'finish'].includes(strTaskStatus.toLowerCase())) {
+ return TaskStatus.END;
+ }
+ else {
+ return TaskStatus.UNKNOWN;
+ }
+ }
+
+ setTaskId(id) {
+ this.taskId = id;
+ }
+
+ getTaskId() {
+ return this.taskId;
+ }
+
+ setStatus(status) {
+ let taskStatus = WorkflowRunTask.parseStatus(status);
+ this.status = taskStatus;
+ }
+
+ getStatus() {
+ return this.status;
+ }
+
+ validate() {
+ if(!this.taskId) {
+ logger.log('error', 'id is not given');
+ return false;
+ }
+
+ if(!this.status) {
+ logger.log('error', 'status is not given');
+ return false;
+ }
+
+ return true;
+ }
+ }
+
+ module.exports = {
+ TaskStatus: TaskStatus,
+ WorkflowRunTask: WorkflowRunTask
+ };
+})();
\ No newline at end of file
diff --git a/src/types/workflowtask.js b/src/types/workflowtask.js
new file mode 100644
index 0000000..efde42b
--- /dev/null
+++ b/src/types/workflowtask.js
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+ 'use strict';
+
+ const logger = require('../config/logger.js');
+
+ class WorkflowTask {
+ constructor(id, kickstart=false) {
+ this.id = id;
+ this.topic = null;
+ this.kickstart = kickstart;
+ this.keyField = null;
+ this.essence = {};
+ }
+
+ static fromEssence(essence) {
+ if(essence) {
+ let workflowTask;
+ if('task_id' in essence) {
+ workflowTask = new WorkflowTask(essence.task_id);
+ }
+ else {
+ logger.log('error', 'task_id is not given');
+ return null;
+ }
+
+ if('topic' in essence) {
+ workflowTask.setTopic(essence.topic);
+ }
+
+ if('model_name' in essence) {
+ workflowTask.setTopic('datamodel.' + essence.model_name + '.create');
+ workflowTask.setTopic('datamodel.' + essence.model_name + '.update');
+ workflowTask.setTopic('datamodel.' + essence.model_name + '.delete');
+ }
+
+ if('key_field' in essence) {
+ workflowTask.setKeyField(essence.key_field);
+ }
+
+ workflowTask.setEssence(essence);
+ return workflowTask;
+ }
+ else {
+ return null;
+ }
+ }
+
+ setId(id) {
+ this.id = id;
+ }
+
+ getId() {
+ return this.id;
+ }
+
+ setTopic(topic) {
+ this.topic = topic;
+ }
+
+ getTopic() {
+ return this.topic;
+ }
+
+ setKickstart(kickstart=false) {
+ this.kickstart = kickstart;
+ }
+
+ isKickstart() {
+ return this.kickstart;
+ }
+
+ setKeyField(keyField) {
+ this.keyField = keyField;
+ }
+
+ getKeyField() {
+ return this.keyField;
+ }
+
+ setEssence(essence) {
+ this.essence = essence;
+ }
+
+ getEssence() {
+ return this.essence;
+ }
+
+ validate() {
+ if(!this.id) {
+ logger.log('error', 'id is not given');
+ return false;
+ }
+
+ // general Airflow operators other than XOS operators don't have these fields.
+ //
+ // if(!this.topic) {
+ // logger.log('error', 'topic is not given');
+ // return false;
+ // }
+
+ // if(!this.keyField) {
+ // logger.log('error', 'keyField is not given');
+ // return false;
+ // }
+
+ return true;
+ }
+ }
+
+ module.exports = {
+ WorkflowTask: WorkflowTask
+ };
+})();
\ No newline at end of file
diff --git a/src/workflows/hello_workflow.json b/src/workflows/hello_workflow.json
new file mode 100644
index 0000000..ebf94af
--- /dev/null
+++ b/src/workflows/hello_workflow.json
@@ -0,0 +1,24 @@
+{
+ "hello_workflow": {
+ "dag": {
+ "dag_id": "hello_workflow",
+ "local_variable": "dag_hello"
+ },
+ "dependencies": {
+ "onu_event_handler": {}
+ },
+ "tasks": {
+ "onu_event_handler": {
+ "class": "XOSEventSensor",
+ "dag": "dag_hello",
+ "key_field": "serialNumber",
+ "local_variable": "onu_event_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "ONU_event",
+ "task_id": "onu_event_handler",
+ "topic": "onu.events"
+ }
+ }
+ }
+}
diff --git a/src/workflows/loader.js b/src/workflows/loader.js
new file mode 100644
index 0000000..a7bc275
--- /dev/null
+++ b/src/workflows/loader.js
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+(function () {
+ 'use strict';
+
+ const path = require('path');
+ const fs = require('fs');
+ const _ = require('lodash');
+ const Workflow = require('../types/workflow.js');
+ const logger = require('../config/logger.js');
+
+ const loadEssence = (essenceFilename, absPath=false) => {
+ let filepath;
+ if(!absPath) {
+ filepath = path.join(__dirname, essenceFilename);
+ }
+ else {
+ filepath = essenceFilename;
+ }
+
+ try {
+ if (fs.existsSync(filepath)) {
+ logger.log('debug', `Loading an essence - ${filepath}`);
+ let rawdata = fs.readFileSync(filepath);
+ let essence = null;
+ try {
+ essence = JSON.parse(rawdata);
+ }
+ catch (objError) {
+ if (objError instanceof SyntaxError) {
+ logger.log('warn', `failed to parse a json data (syntax error) - ${rawdata}`);
+ }
+ else {
+ logger.log('warn', `failed to parse a json data - ${rawdata}`);
+ }
+ }
+ return essence;
+ }
+ else {
+ logger.log('warn', `No ${filepath} found`);
+ return null;
+ }
+ }
+ catch(e) {
+ logger.log('warn', `Cannot read ${filepath} - ${e}`);
+ return null;
+ }
+ };
+
+ const loadWorkflows = (essenceFilename) => {
+ let filepath = path.join(__dirname, essenceFilename);
+
+ try {
+ if (fs.existsSync(filepath)) {
+ logger.log('debug', `Loading an essence - ${filepath}`);
+ let rawdata = fs.readFileSync(filepath);
+ let workflows = [];
+
+ try {
+ let essence = JSON.parse(rawdata);
+
+ // an essence can have multiple workflows
+ _.forOwn(essence, (workflowEssence, workflowId) => {
+ let workflow = Workflow.Workflow.fromEssence(workflowEssence);
+ if(workflow) {
+ workflows.push(workflow);
+ logger.log('debug', `Loaded workflow: ${workflowId}`);
+ }
+ });
+ }
+ catch (objError) {
+ if (objError instanceof SyntaxError) {
+ logger.log('warn', `failed to parse a json data (syntax error) - ${rawdata}`);
+ }
+ else {
+ logger.log('warn', `failed to parse a json data - ${rawdata}`);
+ }
+ }
+
+ return workflows
+ }
+ else {
+ logger.log('warn', `No ${filepath} found`);
+ return null;
+ }
+ }
+ catch(e) {
+ logger.log('warn', `Cannot read ${filepath} - ${e}`);
+ return null;
+ }
+ };
+
+ const loadAllWorkflows = () => {
+ let dirpath = __dirname;
+
+ let allWorkflows = [];
+ let dirEntries = fs.readdirSync(dirpath);
+ dirEntries.forEach((dirEntry) => {
+ if(dirEntry.endsWith('.json')) {
+ // found workflow essence file in json format
+ let workflows = loadWorkflows(dirEntry);
+ if(workflows) {
+ for(let workflow in workflows) {
+ allWorkflows.push[workflow];
+ }
+ }
+ }
+ });
+ return allWorkflows;
+ };
+
+ module.exports = {
+ loadEssence: loadEssence,
+ loadAllWorkflows: loadAllWorkflows,
+ loadWorkflows: loadWorkflows
+ };
+})();
\ No newline at end of file