blob: 548bc48beae8711ed5868462760b6f113a1a1015 [file] [log] [blame]
/*
* Copyright 2019-present Open Networking Foundation
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
(function () {
'use strict';
const _ = require('lodash');
const logger = require('../config/logger.js');
const Client = require('../types/client.js');
const WorkflowRun = require('../types/workflowrun.js');
const ws_probe = require('./ws_probe.js');
const ws_manager = require('./ws_manager.js');
const ws_workflowrun = require('./ws_workflowrun.js');
let allClients = {}; // has publishers and subscribers
let probeClients = {}; // a subset of clients
let workflowManagerClients = {}; // a subset of clients
let workflowRunClients = {}; // a subset of clients
//let io;
// key: workflow id
// value: Workflow instance
let workflows = {};
// key: workflow run id
// value: WorkflowRun instance
let workflowRuns = {};
let serviceEvents = {
GREETING: 'cord.workflow.ctlsvc.greeting'
};
setInterval(function () {
let requests = [];
_.forOwn(workflowRuns, (workflowRun, workflowRunId) => {
let obj = {
workflow_id: workflowRun.getWorkflowId(),
workflow_run_id: workflowRunId
};
requests.push(obj);
});
checkWorkflowRunStatusBulk(requests);
}, 5000);
// add ws_probe events
_.forOwn(ws_probe.serviceEvents, (wsServiceEvent, key) => {
serviceEvents[key] = wsServiceEvent;
});
// add ws_manager events
_.forOwn(ws_manager.serviceEvents, (wsServiceEvent, key) => {
serviceEvents[key] = wsServiceEvent;
});
// add ws_workflowrun events
_.forOwn(ws_workflowrun.serviceEvents, (wsServiceEvent, key) => {
serviceEvents[key] = wsServiceEvent;
});
//const setIO = (ioInstance) => {
// io = ioInstance;
//};
const checkObject = (obj) => {
return Object.prototype.toString.call(obj) === '[object Object]';
};
const destroy = () => {
removeClients();
clearWorkflowRuns();
clearWorkflows();
};
const listWorkflows = () => {
let workflowList = [];
_.forOwn(workflows, (_workflow, workflowId) => {
workflowList.push(workflowId);
});
return workflowList;
};
const checkWorkflow = (workflowId) => {
if(workflowId in workflows) {
return true;
}
return false;
};
const addWorkflow = (workflow) => {
if(workflow.getId() in workflows) {
logger.log('error', `there exists a workflow with the same id - ${workflow.getId()}`);
return false;
}
let workflowId = workflow.getId();
workflows[workflowId] = workflow;
return true;
};
const getWorkflow = (workflowId) => {
if(workflowId in workflows) {
logger.log('warn', `cannot find a workflow with id - ${workflowId}`);
return null;
}
return workflows[workflowId];
};
const clearWorkflows = () => {
_.forOwn(workflows, (_workflow, workflowId) => {
delete workflows[workflowId];
});
};
const listWorkflowRuns = () => {
let workflowRunList = [];
_.forOwn(workflowRuns, (_workflowRun, workflowRunId) => {
workflowRunList.push(workflowRunId);
});
return workflowRunList;
};
const checkWorkflowRun = (workflowRunId) => {
if(workflowRunId in workflowRuns) {
return true;
}
return false;
};
const addWorkflowRun = (workflowRun) => {
let workflowId = workflowRun.getWorkflowId();
let workflowRunId = workflowRun.getId();
if(workflowRunId in workflowRuns) {
logger.log('warn', `there exists a workflow run with the same id - ${workflowRunId}`);
return false;
}
if(!(workflowId in workflows)) {
logger.log('warn', `cannot find a workflow with id - ${workflowId}`);
return false;
}
workflowRuns[workflowRunId] = workflowRun;
return true;
};
const getWorkflowRun = (workflowRunId) => {
if(workflowRunId in workflowRuns) {
logger.log('warn', `cannot find a workflow run with id - ${workflowRunId}`);
return null;
}
return workflowRuns[workflowRunId];
};
const clearWorkflowRuns = () => {
_.forOwn(workflowRuns, (_workflowRun, workflowRunId) => {
delete workflowRuns[workflowRunId];
});
};
const setWorkflowRunKickstarted = (workflowRunId) => {
if(!(workflowRunId in workflowRuns)) {
logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
return false;
}
let workflowRun = workflowRuns[workflowRunId];
workflowRun.setKickstarted();
return true;
};
const setWorkflowRunStatus = (workflowRunId, status) => {
if(!(workflowRunId in workflowRuns)) {
logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
return false;
}
if(status in ['success', 'failed', 'end']) {
removeWorkflowRun(workflowRunId);
}
return true;
};
const kickstart = (workflowId, workflowRunId) => {
if(!(workflowId in workflows)) {
logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
return false;
}
if(!(workflowRunId in workflowRuns)) {
logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
return false;
}
ws_manager.kickstartWorkflow(workflowId, workflowRunId);
return true;
};
/*
const checkWorkflowRunStatus = (workflowId, workflowRunId) => {
if(!(workflowId in workflows)) {
logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
return false;
}
if(!(workflowRunId in workflowRuns)) {
logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
return false;
}
ws_manager.checkWorkflowRunStatus(workflowId, workflowRunId);
return true;
};
*/
const checkWorkflowRunStatusBulk = (requests) => {
if(requests) {
ws_manager.checkWorkflowRunStatusBulk(requests);
return true;
}
return false;
};
const removeWorkflow = (workflowId) => {
if(!(workflowId in workflows)) {
logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
return false;
}
// check if there are workflow runs
for(let key in workflowRuns) {
if (!workflowRuns.hasOwnProperty(key)) {
continue;
}
let workflowRun = workflowRuns[key];
if(workflowRun.getWorkflowId() === workflowId) {
logger.log('warn', `there exists a workflow run for a workflow id - ${workflowId}`);
return false;
}
}
// we don't use below code becuase it cannot properly stop and return value with 'return'
// _.forOwn(workflowRuns, (workflowRun, _workflowRunId) => {
// if(workflowRun.getWorkflowId() === workflowId) {
// logger.log('warn', `there exists a workflow run for a workflow id - ${workflowId}`);
// return false;
// }
// });
delete workflows[workflowId];
return true;
};
const removeWorkflowRun = (workflowRunId) => {
if(!(workflowRunId in workflowRuns)) {
logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
return false;
}
let workflowRun = workflowRuns[workflowRunId];
delete workflowRuns[workflowRunId];
workflowRun.setFinished();
return true;
};
const emitEvent = (topic, message) => {
logger.log('debug', `event is raised : topic ${topic}, message ${JSON.stringify(message)}`);
// route event to running instances
_.forOwn(workflowRuns, (workflowRun, workflowRunId) => {
let workflowId = workflowRun.getWorkflowId();
let workflow = workflows[workflowId];
if(workflow.isEventAcceptable(topic)) {
logger.log('debug', `workflow ${workflowId} accept the event : topic ${topic}`);
// event is acceped if event has
// the same key field and its value as workflow_run
if(workflowRun.isEventAcceptable(topic, message)) {
logger.log('debug', `workflow run ${workflowRunId} accept the event : \
topic ${topic}, message ${JSON.stringify(message)}`);
workflowRun.updateEventKeyFieldValueFromMessage(topic, message);
logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
workflowRun.enqueueEvent(topic, message);
}
else {
logger.log('debug', `workflow run ${workflowRunId} reject the event : \
topic ${topic}, message ${JSON.stringify(message)}`);
}
}
});
// check if the event is a kickstart event
_.forOwn(workflows, (workflow, workflowId) => {
if(workflow.isKickstartTopic(topic)) {
// we need to buffer the event until workflow run is brought up
let workflowRun = WorkflowRun.WorkflowRun.makeNewRun(workflow);
workflowRun.updateEventKeyFieldValueFromMessage(topic, message);
let workflowRunId = workflowRun.getId();
// register for management
workflowRuns[workflowRunId] = workflowRun;
// route event
logger.log('debug', `event ${topic} is routed to a new workflow run ${workflowRunId}`);
workflowRun.enqueueEvent(topic, message);
// KICKSTART!
kickstart(workflowId, workflowRunId);
}
});
return true;
};
const countQueuedEvents = (workflowRunId) => {
// this counts queued events
if(!(workflowRunId in workflowRuns)) {
logger.log('warn', `workflow run ${workflowRunId} does not exist`);
return null;
}
let workflowRun = workflowRuns[workflowRunId];
return workflowRun.lengthEventQueue();
};
const fetchEvent = (workflowRunId, taskId, topic) => {
// this returns an event or an empty obj when there is no message
if(!(workflowRunId in workflowRuns)) {
logger.log('warn', `workflow run ${workflowRunId} does not exist`);
return null;
}
let workflowRun = workflowRuns[workflowRunId];
let workflowId = workflowRun.getWorkflowId();
if(!(workflowId in workflows)) {
logger.log('warn', `workflow ${workflowId} does not exist`);
return null;
}
let workflow = workflows[workflowId];
let task = workflow.getTask(taskId);
if(!task) {
logger.log('warn', `workflow ${workflowId} does not have task ${taskId}`);
return null;
}
logger.log('debug', `workflow run ${workflowRunId}, task ${taskId} fetches an event`);
let event = workflowRun.dequeueEvent(topic);
if(event) {
return event;
}
else {
return {};
}
};
const addClient = (c) => {
let clientId = c.getId();
let socket = c.getSocket();
// check id that client is already there
if(clientId in allClients) {
logger.log('warn', `there exists a client with the same id - ${clientId}`);
return false;
}
if(c.getType() === Client.Type.PROBE) {
// probe
// probe protocol:
// REQ:
// topic: operation
// message: {
// req_id: <req_id>,
// topic: <topic>,
// message: <data>
// }
// RES:
// topic: topic sent
// message: {
// req_id: <req_id>,
// error: <true/false>,
// result: <true/false>,
// message: <error message>
// }
allClients[clientId] = c;
probeClients[clientId] = c;
// attach probe operations
let router = ws_probe.getRouter();
_.forOwn(router, (routerElem, _key) => {
socket.on(routerElem.topic, (msg) => {
logger.log('debug', `received a probe event ${routerElem.topic} - ${JSON.stringify(msg)}`);
// handle a common parameter - req_id
// when we get req_id, return the same req_id in response.
// this is to help identify a request from a response at client-side
let req_id = 101010; // default number, signiture
if(msg && checkObject(msg)) {
if('req_id' in msg) {
req_id = msg.req_id;
}
}
routerElem.handler(routerElem.topic, msg || {}, (err, result) => {
if(err) {
logger.log('warn', `unable to handle a message - ${err}`);
socket.emit(routerElem.topic, {
req_id: req_id,
error: true,
result: result,
message: err
});
return;
}
// we return result
if(routerElem.return === undefined || routerElem.return) {
socket.emit(routerElem.topic, {
req_id: req_id,
error: false,
result: result
});
}
});
});
});
return true;
}
else if(c.getType() === Client.Type.WORKFLOW_MANAGER) {
// manager
// manager protocol:
// REQ:
// topic: operation
// message: {
// req_id: <req_id>,
// <data>...
// }
// RES:
// topic: topic sent
// message: {
// req_id: <req_id>,
// error: <true/false>,
// result: <true/false>,
// message: <error message>
// }
allClients[clientId] = c;
workflowManagerClients[clientId] = c;
// attach manager operations
let router = ws_manager.getRouter();
_.forOwn(router, (routerElem, _key) => {
socket.on(routerElem.topic, (msg) => {
logger.log('debug', `received a manager event ${routerElem.topic} - ${JSON.stringify(msg)}`);
// handle a common parameter - req_id
// when we get req_id, return the same req_id in response.
// this is to help identify a request from a response at client-side
let req_id = 101010; // default number, signiture
if(msg && checkObject(msg)) {
if('req_id' in msg) {
req_id = msg.req_id;
}
}
routerElem.handler(routerElem.topic, msg || {}, (err, result) => {
if(err) {
logger.log('warn', `unable to handle a message - ${err}`);
socket.emit(routerElem.topic, {
req_id: req_id,
error: true,
result: result,
message: err
});
return;
}
// we return result
if(routerElem.return === undefined || routerElem.return) {
socket.emit(routerElem.topic, {
req_id: req_id,
error: false,
result: result
});
}
});
});
});
return true;
}
else if(c.getType() === Client.Type.WORKFLOW_RUN) {
// workflow run
// workflow run protocol:
// REQ:
// topic: operation
// message: {
// req_id: <req_id>,
// <data>...
// }
// RES:
// topic: topic sent
// message: {
// req_id: <req_id>,
// error: <true/false>,
// result: <true/false>,
// message: <error message>
// }
// map to WorkflowRun instance
let workflowId = c.getWorkflowId();
let workflowRunId = c.getWorkflowRunId();
let workflowRun;
if(!(workflowId in workflows)) {
logger.log('warn', `cannot find a workflow ${workflowId}`);
return false;
}
// register client to workflow run
if(!(workflowRunId in workflowRuns)) {
// workflow run not exist yet
logger.log('warn', `cannot find a workflow run ${workflowRunId}`);
return false;
}
//let workflow = workflows[workflowId];
allClients[clientId] = c;
workflowRunClients[clientId] = c;
// update
workflowRun = workflowRuns[workflowRunId];
workflowRun.addClientId(clientId);
// attach workflow run operations
let router = ws_workflowrun.getRouter();
_.forOwn(router, (routerElem, _key) => {
socket.on(routerElem.topic, (msg) => {
logger.log('debug', `received a workflow run event ${routerElem.topic} - ${JSON.stringify(msg)}`);
// handle a common parameter - req_id
// when we get req_id, return the same req_id in response.
// this is to help identify a request from a response at client-side
let req_id = 101010; // default number, signiture
if(msg && checkObject(msg)) {
if('req_id' in msg) {
req_id = msg.req_id;
}
}
routerElem.handler(routerElem.topic, msg || {}, (err, result) => {
if(err) {
logger.log('warn', `unable to handle a message - ${err}`);
socket.emit(routerElem.topic, {
req_id: req_id,
error: true,
result: false,
message: err
});
return;
}
// we return result
if(routerElem.return === undefined || routerElem.return) {
socket.emit(routerElem.topic, {
req_id: req_id,
error: false,
result: result
});
}
});
});
});
return true;
}
return false;
};
const removeClient = (id) => {
if(id in allClients) {
let removedClient = allClients[id];
delete allClients[id];
let type = removedClient.getType();
if(type === Client.Type.PROBE) {
delete probeClients[id];
}
else if(type === Client.Type.WORKFLOW_MANAGER) {
delete workflowManagerClients[id];
}
else if(type === Client.Type.WORKFLOW_RUN) {
delete workflowRunClients[id];
let workflowRunId = removedClient.getWorkflowRunId();
let workflowRun = workflowRuns[workflowRunId];
if(workflowRun) {
workflowRun.removeClientId(id);
//TODO
// WorkflowRun can have no clients between tasks
// So we should not remove the run until the workflow run finishes
}
}
}
};
const removeClients = () => {
let probeClients = {};
_.forOwn(probeClients, (_probeClient, clientId) => {
delete probeClients[clientId];
});
_.forOwn(workflowManagerClients, (_workflowManagerClient, clientId) => {
delete workflowManagerClients[clientId];
});
_.forOwn(workflowRunClients, (_workflowRunClients, clientId) => {
delete workflowRunClients[clientId];
});
_.forOwn(allClients, (client, clientId) => {
client.getSocket().disconnect(true);
delete allClients[clientId];
});
}
module.exports = {
serviceEvents: serviceEvents,
destroy: destroy,
getClients: () => { return allClients; },
getProbeClients: () => { return probeClients; },
getWorkflowManagerClients: () => { return workflowManagerClients; },
getWorkflowRunClients: () => { return workflowRunClients; },
clientType: Client.Type,
//setIO: setIO,
emitEvent: emitEvent,
countQueuedEvents: countQueuedEvents,
fetchEvent: fetchEvent,
addClient: addClient,
removeClient: removeClient,
removeClients: removeClients,
addWorkflow: addWorkflow,
getWorkflow: getWorkflow,
listWorkflows: listWorkflows,
checkWorkflow: checkWorkflow,
removeWorkflow: removeWorkflow,
clearWorkflows: clearWorkflows,
addWorkflowRun: addWorkflowRun,
getWorkflowRun: getWorkflowRun,
listWorkflowRuns: listWorkflowRuns,
checkWorkflowRun: checkWorkflowRun,
removeWorkflowRun: removeWorkflowRun,
clearWorkflowRuns: clearWorkflowRuns,
setWorkflowRunKickstarted: setWorkflowRunKickstarted,
setWorkflowRunStatus: setWorkflowRunStatus
};
})();