Implement basic functionalities for workflow control.
- Manage join/leave of clients
- All clients communicate via socket.io
- Probes emit events
- Managers register workflows (by using a workflow essence)
- Send kickstart request to Managers to launch workflows
- Route events to workflow runs
- Queue events to not lose events between workflow tasks
- Fixed some issues found while working on testcases
- Set to perform coverage and unittest and generate outputs to files
Change-Id: I678723edc20df9247d63a4bf6380785ab8b2b221
diff --git a/spec/.eslintrc b/spec/.eslintrc
new file mode 100644
index 0000000..497ba43
--- /dev/null
+++ b/spec/.eslintrc
@@ -0,0 +1,12 @@
+{
+ "globals" :{
+ "describe": true,
+ "it": true,
+ "xdescribe": true,
+ "xit": true,
+ "before": true,
+ "beforeEach": true,
+ "after": true,
+ "afterEach": true
+ }
+}
\ No newline at end of file
diff --git a/spec/clients.spec.js b/spec/clients.spec.js
new file mode 100644
index 0000000..534797d
--- /dev/null
+++ b/spec/clients.spec.js
@@ -0,0 +1,320 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+ 'use strict';
+
+ const path = require('path');
+ const chai = require('chai');
+ const expect = chai.expect;
+ const sinonChai = require('sinon-chai');
+ chai.use(sinonChai);
+ const io = require('socket.io-client');
+ const async = require('async');
+ const _ = require('lodash');
+ const server = require('../src/server.js');
+ const port = 4000;
+ const eventrouter = require('../src/controllers/eventrouter.js');
+ const essenceLoader = require('../src/workflows/loader.js');
+ const essenceFileName = path.join(__dirname, 'test_clients_workflow_essence.json');
+ const workflowIdInEssence = 'test_clients_workflow'
+
+ describe('Simple websocket client test', function() {
+
+ var probeClient;
+ var workflowManagerClient;
+ var workflowRunClient;
+ var workflowId;
+ var workflowRunId;
+
+ before(function() {
+ // Start our server
+ server.start(port);
+ });
+
+ after(function() {
+ server.stop();
+ });
+
+ beforeEach(function(done) {
+ let workflowCheckResults = [];
+ async.series([
+ (callback) => {
+ // connect a probe to the server
+ // to send events for test
+ probeClient = io.connect(`http://localhost:${port}`, {
+ query: 'id=probe_id&type=probe' +
+ '&name=probe@xos.org'
+ });
+
+ probeClient.on('connect', () => {
+ callback(null, true);
+ });
+ return;
+ },
+ (callback) => {
+ // connect a workflow manager to the server
+ // to register a test workflow
+ workflowManagerClient = io.connect(`http://localhost:${port}`, {
+ query: 'id=workflow_manager_id&type=workflow_manager' +
+ '&name=manager@xos.org'
+ });
+
+ workflowManagerClient.on(eventrouter.serviceEvents.WORKFLOW_KICKSTART, (message) => {
+ workflowRunId = message.workflow_run_id;
+
+ // call-back
+ workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_KICKSTART, {
+ workflow_id: workflowId,
+ workflow_run_id: workflowRunId
+ })
+ });
+
+ workflowManagerClient.on('connect', () => {
+ callback(null, true);
+ });
+ return;
+ },
+ (callback) => {
+ // check existance of the workflow
+ let essence = essenceLoader.loadEssence(essenceFileName, true);
+ let workflowCnt=0;
+
+ _.forOwn(essence, (_value, essenceWorkflowId) => {
+ workflowId = essenceWorkflowId; // preseve only the last one for test
+
+ workflowCnt++;
+
+ workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_CHECK, essenceWorkflowId);
+
+ workflowManagerClient.on(eventrouter.serviceEvents.WORKFLOW_CHECK, (workflowCheckResult) => {
+ workflowCnt--;
+ workflowCheckResults.push(workflowCheckResult.result);
+
+ if(workflowCnt <= 0) {
+ callback(null, workflowCheckResults);
+ }
+ });
+ });
+ return;
+ },
+ (callback) => {
+ // register the workflow
+ let register = false;
+ workflowCheckResults.forEach((workflowCheckResult) => {
+ if(!workflowCheckResult) {
+ register = true;
+ }
+ });
+
+ if(register) {
+ let essence = essenceLoader.loadEssence(essenceFileName, true);
+
+ workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_REG_ESSENCE, essence);
+
+ workflowManagerClient.on(
+ eventrouter.serviceEvents.WORKFLOW_REG_ESSENCE,
+ (workflowRegResult) => {
+ callback(null, workflowRegResult);
+ }
+ );
+ }
+ else {
+ callback(null, true);
+ }
+ return;
+ },
+ (callback) => {
+ // kickstart the test workflow
+ probeClient.emit('onu.events', {serialNumber: 'testSerialXXX', other: 'test_other_field'});
+ setTimeout(() => {
+ expect(workflowRunId).to.not.be.undefined;
+ callback(null, true);
+ }, 500);
+ return;
+ },
+ (callback) => {
+ // connect a workflow run client to the server
+ workflowRunClient = io.connect(`http://localhost:${port}`, {
+ query: 'id=workflow_run_id&type=workflow_run' +
+ `&workflow_id=${workflowIdInEssence}&workflow_run_id=${workflowRunId}` +
+ '&name=run@xos.org'
+ });
+
+ // when is connected start testing
+ workflowRunClient.on('connect', () => {
+ callback(null, true);
+ });
+ return;
+ }
+ ],
+ function(err, results) {
+ // we do not actually check results
+ if(results.includes(false)) {
+ done.fail(err);
+ }
+ else {
+ done();
+ }
+ });
+ return;
+ });
+
+ afterEach(function(done) {
+ // remove workflow run
+ workflowManagerClient.emit(server.serviceEvents.WORKFLOW_RUN_REMOVE, {
+ workflow_id: workflowId,
+ workflow_run_id: workflowRunId
+ });
+
+ // remove workflow
+ workflowManagerClient.emit(server.serviceEvents.WORKFLOW_REMOVE, workflowId);
+
+ workflowId = null;
+ workflowRunId = null;
+
+ // disconnect clients
+ if(workflowManagerClient.connected) {
+ workflowManagerClient.disconnect();
+ }
+ workflowManagerClient = null;
+
+ if(workflowRunClient.connected) {
+ workflowRunClient.disconnect();
+ }
+ workflowRunClient = null;
+
+ if(probeClient.connected) {
+ probeClient.disconnect();
+ }
+ probeClient = null;
+
+ done();
+ });
+
+ it('should have a probe, a workflow manager and a workflow run', function(done) {
+ const eventrouter = require('../src/controllers/eventrouter.js');
+ expect(
+ Object.keys(eventrouter.getWorkflowRunClients()).length,
+ 'num of workflow run clients'
+ ).to.equal(1);
+ expect(
+ Object.keys(eventrouter.getWorkflowManagerClients()).length,
+ 'num of workflow manager clients'
+ ).to.equal(1);
+ expect(
+ Object.keys(eventrouter.getProbeClients()).length,
+ 'num of probe clients'
+ ).to.equal(1);
+ expect(
+ Object.keys(eventrouter.getClients()).length,
+ 'total num of clients'
+ ).to.equal(3);
+
+ expect(
+ 'probe_id' in eventrouter.getClients(),
+ 'a client called prove_id exists'
+ ).to.equal(true);
+ expect(
+ 'workflow_manager_id' in eventrouter.getClients(),
+ 'a client called workflow_manager_id exists'
+ ).to.equal(true);
+ expect(
+ 'workflow_run_id' in eventrouter.getClients(),
+ 'a client called workflow_run_id exists'
+ ).to.equal(true);
+ done();
+ });
+
+ it('should store user details for a new connection', function() {
+ const eventrouter = require('../src/controllers/eventrouter.js');
+
+ const probe = eventrouter.getClients()['probe_id'];
+ expect(probe.getParams().name).to.equal('probe@xos.org');
+
+ const manager = eventrouter.getClients()['workflow_manager_id'];
+ expect(manager.getParams().name).to.equal('manager@xos.org');
+
+ const run = eventrouter.getClients()['workflow_run_id'];
+ expect(run.getParams().name).to.equal('run@xos.org');
+ });
+
+ it('should not store the same user twice', function(done) {
+ // This test case makes cleaning up process taking long time because it leaves
+ // a client socket. It seems there's no way to release it from server-side.
+
+ // connect a client to the server
+ const client2 = io.connect(`http://localhost:${port}`, {
+ query: 'id=probe_id&type=probe' +
+ '&name=probe@xos.org&value=different_value'
+ });
+
+ // when is connected start testing
+ client2.on('connect', () => {
+ setTimeout(() => {
+ const eventrouter = require('../src/controllers/eventrouter.js');
+ expect(
+ Object.keys(eventrouter.getWorkflowRunClients()).length,
+ 'num of workflow run clients'
+ ).to.equal(1);
+ expect(
+ Object.keys(eventrouter.getWorkflowManagerClients()).length,
+ 'num of workflow manager clients'
+ ).to.equal(1);
+ expect(
+ Object.keys(eventrouter.getProbeClients()).length,
+ 'num of probe clients'
+ ).to.equal(1);
+ expect(
+ Object.keys(eventrouter.getClients()).length,
+ 'total num of clients'
+ ).to.equal(3);
+
+ done();
+ }, 100);
+ });
+ });
+
+ it('should remove a user on disconnect', function(done) {
+ workflowManagerClient.disconnect();
+ workflowRunClient.disconnect();
+ probeClient.disconnect();
+
+ // we need to wait for the event to be dispatched
+ setTimeout(() => {
+ const eventrouter = require('../src/controllers/eventrouter.js');
+ expect(
+ Object.keys(eventrouter.getWorkflowRunClients()).length,
+ 'num of workflow run clients'
+ ).to.equal(0);
+ expect(
+ Object.keys(eventrouter.getWorkflowManagerClients()).length,
+ 'num of workflow manager clients'
+ ).to.equal(0);
+ expect(
+ Object.keys(eventrouter.getProbeClients()).length,
+ 'num of probe clients'
+ ).to.equal(0);
+ expect(
+ Object.keys(eventrouter.getClients()).length,
+ 'total num of clients'
+ ).to.equal(0);
+ done();
+ }, 100);
+ });
+ });
+})();
\ No newline at end of file
diff --git a/spec/eventrouter.spec.js b/spec/eventrouter.spec.js
new file mode 100644
index 0000000..6184f58
--- /dev/null
+++ b/spec/eventrouter.spec.js
@@ -0,0 +1,353 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+ 'use strict';
+
+ const path = require('path');
+ const chai = require('chai');
+ const expect = chai.expect;
+ const sinonChai = require('sinon-chai');
+ chai.use(sinonChai);
+ const io = require('socket.io-client');
+ const async = require('async');
+ const _ = require('lodash');
+ const server = require('../src/server.js');
+ const port = 4000;
+ const eventrouter = require('../src/controllers/eventrouter.js');
+ const essenceLoader = require('../src/workflows/loader.js');
+ const essenceFileName = path.join(__dirname, 'test_multi_workflow_essence.json');
+
+ var probeClient;
+ var workflowManagerClients = [];
+ var workflowRunClients = [];
+ var workflowIds = [];
+ var workflowRunInfos = [];
+
+ var receivedKickstartMessages = [[],[]];
+
+ describe('Workflow kickstart test', function() {
+
+ before(function() {
+ // Start our server
+ server.start(port);
+ });
+
+ after(function() {
+ server.stop();
+ });
+
+ beforeEach(function(done) {
+ async.series([
+ (callback) => {
+ // connect a probe to the server
+ // to send events for test
+ probeClient = io.connect(`http://localhost:${port}`, {
+ query: 'id=probe_id&type=probe' +
+ '&name=probe@xos.org'
+ });
+
+ probeClient.on('connect', () => {
+ callback(null, true);
+ });
+ return;
+ },
+ (callback) => {
+ // connect first workflow manager to the server
+ // this manager will kickstart a workflow
+ let workflowManagerClient = io.connect(`http://localhost:${port}`, {
+ query: 'id=workflow_manager_id1&type=workflow_manager' +
+ '&name=manager1@xos.org'
+ });
+
+ workflowManagerClient.on(eventrouter.serviceEvents.WORKFLOW_KICKSTART, (message) => {
+ // save it for check
+ receivedKickstartMessages[0].push(message);
+
+ // save workflow_id and workflow_run_id
+ workflowRunInfos.push({
+ workflowId: message.workflow_id,
+ workflowRunId: message.workflow_run_id
+ });
+
+ setTimeout(() => {
+ // call-back
+ workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_KICKSTART, {
+ workflow_id: message.workflow_id,
+ workflow_run_id: message.workflow_run_id
+ })
+ }, 2000);
+ });
+
+ workflowManagerClient.on('connect', () => {
+ callback(null, true);
+ });
+
+ workflowManagerClients.push(workflowManagerClient);
+ return;
+ },
+ (callback) => {
+ // connect second workflow manager to the server
+ // this manager will not kickstart a workflow
+ let workflowManagerClient = io.connect(`http://localhost:${port}`, {
+ query: 'id=workflow_manager_id2&type=workflow_manager' +
+ '&name=manager2@xos.org'
+ });
+
+ workflowManagerClient.on(eventrouter.serviceEvents.WORKFLOW_KICKSTART, (message) => {
+ receivedKickstartMessages[1].push(message);
+ });
+
+ workflowManagerClient.on('connect', () => {
+ callback(null, true);
+ });
+
+ workflowManagerClients.push(workflowManagerClient);
+ return;
+ },
+ (callback) => {
+ let essence = essenceLoader.loadEssence(essenceFileName, true);
+
+ // register the workflow
+ workflowManagerClients[0].emit(eventrouter.serviceEvents.WORKFLOW_REG_ESSENCE, essence);
+
+ _.forOwn(essence, (_value, workflowId) => {
+ // save
+ workflowIds.push(workflowId);
+ });
+
+ // handle return
+ workflowManagerClients[0].on(
+ eventrouter.serviceEvents.WORKFLOW_REG_ESSENCE,
+ (workflowRegResult) => {
+ callback(null, workflowRegResult);
+ }
+ );
+ return;
+ }
+ ],
+ function(err, results) {
+ // we do not actually check results;
+ if(results.includes(false)) {
+ done.fail(err);
+ }
+ else {
+ done();
+ }
+ });
+ return;
+ });
+
+ afterEach(function() {
+ // remove workflow runs
+ _.forOwn(workflowRunInfos, (workflowRunInfo) => {
+ workflowManagerClients[0].emit(server.serviceEvents.WORKFLOW_RUN_REMOVE, {
+ workflow_id: workflowRunInfo.workflowId,
+ workflow_run_id: workflowRunInfo.workflowRunId
+ });
+ });
+ workflowRunInfos.length = 0;
+
+ // remove workflows
+ _.forOwn(workflowIds, (workflowId) => {
+ workflowManagerClients[0].emit(server.serviceEvents.WORKFLOW_REMOVE, workflowId);
+ });
+ workflowIds.length = 0;
+
+ // remove message store
+ receivedKickstartMessages.forEach((receivedKickstartMessageStore) => {
+ receivedKickstartMessageStore.length = 0;
+ });
+
+ // disconnect clients
+ workflowManagerClients.forEach((workflowManagerClient) => {
+ if(workflowManagerClient.connected) {
+ workflowManagerClient.disconnect();
+ }
+ });
+ workflowManagerClients.length = 0;
+
+ workflowRunClients.forEach((workflowRunClient) => {
+ if(workflowRunClient.connected) {
+ workflowRunClient.disconnect();
+ }
+ });
+ workflowRunClients.length = 0;
+
+ if(probeClient.connected) {
+ probeClient.disconnect();
+ }
+ probeClient = null;
+ });
+
+ it('should have two workflows', function(done) {
+ workflowManagerClients[0].on(eventrouter.serviceEvents.WORKFLOW_LIST, (result) => {
+ let workflowsList = result.result;
+ expect(workflowsList.length).to.equal(2);
+ workflowsList.forEach((workflowIdInList) => {
+ expect(workflowIds).to.includes(workflowIdInList);
+ });
+ done();
+ });
+
+ workflowManagerClients[0].emit(eventrouter.serviceEvents.WORKFLOW_LIST, {});
+ });
+
+ it('all managers should receive kickstart messages', function(done) {
+ // kickstart the workflow
+ probeClient.emit('onu.events', {serialNumber: 'testSerialXXX', other: 'test_other_field'});
+ setTimeout(() => {
+ expect(receivedKickstartMessages.length, 'num of message stores').to.equal(2);
+ receivedKickstartMessages.forEach((receivedKickstartMessageStore) => {
+ expect(receivedKickstartMessageStore.length, 'num of messages in a store').to.equal(1);
+ });
+ done();
+ }, 500);
+ });
+
+ it('should have only one workflow run', function(done) {
+ this.timeout(5000);
+
+ // kickstart the workflow
+ probeClient.emit('onu.events', {serialNumber: 'testSerialXXX', other: 'test_other_field'});
+ setTimeout(() => {
+ // kickstart will take 2 seconds roughly
+ expect(workflowRunInfos.length, 'num of workflow runs').to.equal(1);
+ // the workflow must be 'should_be_called'
+ expect(workflowRunInfos[0].workflowId, 'workflow id').to.equal('should_be_called');
+ done();
+ }, 3000);
+ });
+
+ it('should be able to read an event that is used for workflow kickstart', function(done) {
+ this.timeout(5000);
+
+ // kickstart the workflow
+ probeClient.emit('onu.events', {serialNumber: 'testSerialXXX', other: 'test_other_field'});
+ setTimeout(() => {
+ // kickstart will take 2 seconds roughly
+ expect(workflowRunInfos.length, 'num of workflow runs').to.equal(1);
+ // the workflow must be 'should_be_called'
+ expect(workflowRunInfos[0].workflowId, 'workflow id').to.equal('should_be_called');
+
+ // connect a workflow run client to the server
+ let workflowRunClient = io.connect(`http://localhost:${port}`, {
+ query: 'id=workflow_run_id1&type=workflow_run' +
+ `&workflow_id=${workflowRunInfos[0].workflowId}` +
+ `&workflow_run_id=${workflowRunInfos[0].workflowRunId}` +
+ '&name=run1@xos.org'
+ });
+ workflowRunClients.push(workflowRunClient);
+
+ workflowRunClient.on('connect', () => {
+ workflowRunClient.emit(eventrouter.serviceEvents.WORKFLOW_RUN_FETCH_EVENT, {
+ workflow_id: workflowRunInfos[0].workflowId,
+ workflow_run_id: workflowRunInfos[0].workflowRunId,
+ task_id: 'onu_event_handler',
+ topic: 'onu.events'
+ });
+ });
+
+ workflowRunClient.on(eventrouter.serviceEvents.WORKFLOW_RUN_FETCH_EVENT, (result) => {
+ let event = result.result;
+ expect(event.topic).to.equal('onu.events');
+ expect(event.message.serialNumber).to.equal('testSerialXXX');
+ done();
+ });
+ }, 3000);
+ });
+
+ /*
+ it('should store user details for a new connection', () => {
+ const eventrouter = require('../src/controllers/eventrouter.js');
+
+ const probe = eventrouter.getClients()['probe_id'];
+ expect(probe.getParams().name).to.equal('probe@xos.org');
+
+ const manager = eventrouter.getClients()['workflow_manager_id'];
+ expect(manager.getParams().name).to.equal('manager@xos.org');
+
+ const run = eventrouter.getClients()['workflow_run_id'];
+ expect(run.getParams().name).to.equal('run@xos.org');
+ });
+
+ it('should not store the same user twice', (done) => {
+ // This test case makes cleaning up process taking long time because it leaves
+ // a client socket. It seems there's no way to release it from server-side.
+
+ // connect a client to the server
+ const client2 = io.connect(`http://localhost:${port}`, {
+ query: 'id=probe_id&type=probe' +
+ '&name=probe@xos.org&value=different_value'
+ });
+
+ // when is connected start testing
+ client2.on('connect', () => {
+ setTimeout(() => {
+ const eventrouter = require('../src/controllers/eventrouter.js');
+ expect(
+ Object.keys(eventrouter.getWorkflowRunClients()).length,
+ 'num of workflow run clients'
+ ).to.equal(1);
+ expect(
+ Object.keys(eventrouter.getWorkflowManagerClients()).length,
+ 'num of workflow manager clients'
+ ).to.equal(1);
+ expect(
+ Object.keys(eventrouter.getProbeClients()).length,
+ 'num of probe clients'
+ ).to.equal(1);
+ expect(
+ Object.keys(eventrouter.getClients()).length,
+ 'total num of clients'
+ ).to.equal(3);
+
+ done();
+ }, 100);
+ });
+ });
+
+ it('should remove a user on disconnect', (done) => {
+ workflowManagerClient.disconnect();
+ workflowRunClient.disconnect();
+ probeClient.disconnect();
+
+ // we need to wait for the event to be dispatched
+ setTimeout(() => {
+ const eventrouter = require('../src/controllers/eventrouter.js');
+ expect(
+ Object.keys(eventrouter.getWorkflowRunClients()).length,
+ 'num of workflow run clients'
+ ).to.equal(0);
+ expect(
+ Object.keys(eventrouter.getWorkflowManagerClients()).length,
+ 'num of workflow manager clients'
+ ).to.equal(0);
+ expect(
+ Object.keys(eventrouter.getProbeClients()).length,
+ 'num of probe clients'
+ ).to.equal(0);
+ expect(
+ Object.keys(eventrouter.getClients()).length,
+ 'total num of clients'
+ ).to.equal(0);
+ done();
+ }, 100);
+ });
+ */
+ });
+})();
\ No newline at end of file
diff --git a/spec/test_clients_workflow_essence.json b/spec/test_clients_workflow_essence.json
new file mode 100644
index 0000000..bd37499
--- /dev/null
+++ b/spec/test_clients_workflow_essence.json
@@ -0,0 +1,44 @@
+{
+ "test_clients_workflow": {
+ "dag": {
+ "dag_id": "test_clients_workflow",
+ "local_variable": "dag_test_clients_workflow"
+ },
+ "dependencies": {
+ "task1": {
+ "children": [
+ "task2"
+ ]
+ },
+ "task2": {
+ "parents": [
+ "task1"
+ ]
+ }
+ },
+ "tasks": {
+ "task1": {
+ "class": "XOSEventSensor",
+ "dag": "dag_test_clients_workflow",
+ "key_field": "serialNumber",
+ "local_variable": "task1",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "task1_func",
+ "task_id": "task1",
+ "topic": "onu.events"
+ },
+ "task2": {
+ "class": "XOSModelSensor",
+ "dag": "dag_test_clients_workflow",
+ "key_field": "serialNumber",
+ "local_variable": "task2",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "task2_func",
+ "task_id": "task2"
+ }
+ }
+ }
+}
diff --git a/spec/test_multi_workflow_essence.json b/spec/test_multi_workflow_essence.json
new file mode 100644
index 0000000..572a432
--- /dev/null
+++ b/spec/test_multi_workflow_essence.json
@@ -0,0 +1,118 @@
+{
+ "must_not_be_called": {
+ "dag": {
+ "dag_id": "must_not_be_called",
+ "local_variable": "dag_must_not_be_called"
+ },
+ "dependencies": {
+ "must_not_be_called_handler": {
+ "children": [
+ "onu_event_handler"
+ ]
+ },
+ "onu_event_handler": {
+ "parents": [
+ "must_not_be_called_handler"
+ ],
+ "children": [
+ "onu_model_event_handler"
+ ]
+ },
+ "onu_model_event_handler": {
+ "parents": [
+ "onu_event_handler"
+ ]
+ }
+ },
+ "tasks": {
+ "must_not_be_called_handler": {
+ "class": "UnknownSensor",
+ "dag": "dag_must_not_be_called",
+ "local_variable": "must_not_be_called_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "task_id": "must_not_be_called_handler"
+ },
+ "onu_event_handler": {
+ "class": "XOSEventSensor",
+ "dag": "dag_must_not_be_called",
+ "key_field": "serialNumber",
+ "local_variable": "onu_event_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "ONU_event",
+ "task_id": "onu_event_handler",
+ "topic": "onu.events"
+ },
+ "onu_model_event_handler": {
+ "class": "XOSModelSensor",
+ "dag": "dag_must_not_be_called",
+ "key_field": "serialNumber",
+ "local_variable": "onu_model_event_handler",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DriverService_event",
+ "task_id": "onu_model_event_handler"
+ }
+ }
+ },
+ "should_be_called": {
+ "dag": {
+ "dag_id": "should_be_called",
+ "local_variable": "dag_should_be_called"
+ },
+ "dependencies": {
+ "onu_event_handler": {
+ "children": [
+ "onu_model_event_handler"
+ ]
+ },
+ "onu_model_event_handler": {
+ "parents": [
+ "onu_event_handler"
+ ],
+ "children": [
+ "can_be_stuck_handler"
+ ]
+ },
+ "can_be_stuck_handler": {
+ "parents": [
+ "onu_model_event_handler"
+ ]
+ }
+ },
+ "tasks": {
+ "onu_event_handler": {
+ "class": "XOSEventSensor",
+ "dag": "dag_should_be_called",
+ "key_field": "serialNumber",
+ "local_variable": "onu_event_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "ONU_event",
+ "task_id": "onu_event_handler",
+ "topic": "onu.events"
+ },
+ "onu_model_event_handler": {
+ "class": "XOSModelSensor",
+ "dag": "dag_should_be_called",
+ "key_field": "serialNumber",
+ "local_variable": "onu_model_event_handler",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "provide_context": true,
+ "python_callable": "DriverService_event",
+ "task_id": "onu_model_event_handler"
+ },
+ "can_be_stuck_handler": {
+ "class": "UnknownSensor",
+ "dag": "dag_should_be_called",
+ "local_variable": "can_be_stuck_handler",
+ "poke_interval": 5,
+ "provide_context": true,
+ "task_id": "can_be_stuck_handler"
+ }
+ }
+ }
+}