Make event emit API consistent to other APIs
Change-Id: I458b4a9bd3797638bf397e7dcb15b34118677af0
diff --git a/Makefile b/Makefile
index a26d3ce..9e9453d 100644
--- a/Makefile
+++ b/Makefile
@@ -32,6 +32,8 @@
DOCKER_LABEL_COMMIT_DATE ?= $(shell git diff-index --quiet HEAD -- && git show -s --format=%cd --date=iso-strict HEAD || echo "unknown" )
DOCKER_LABEL_BUILD_DATE ?= $(shell date -u "+%Y-%m-%dT%H:%M:%SZ")
+
+# Targets
all: test
docker-build:
diff --git a/package.json b/package.json
index fe064f5..d2bff8d 100644
--- a/package.json
+++ b/package.json
@@ -1,6 +1,6 @@
{
"name": "cord_workflow_controller",
- "version": "0.2.0",
+ "version": "0.3.0",
"description": "CORD Workflow Controller",
"main": "src/server.js",
"scripts": {
@@ -24,7 +24,6 @@
"async": "^3.1.0",
"node-yaml-config": "0.0.5",
"socket.io": "^2.2.0",
- "socketio-wildcard": "^2.0.0",
"double-ended-queue": "^2.1.0-0",
"test": "^0.6.0",
"winston": "^3.2.1",
diff --git a/spec/eventrouter.spec.js b/spec/eventrouter.spec.js
index 0b2a217..82261f7 100644
--- a/spec/eventrouter.spec.js
+++ b/spec/eventrouter.spec.js
@@ -168,7 +168,7 @@
// remove workflow runs
_.forOwn(workflowRunInfos, (workflowRunInfo) => {
- workflowManagerClients[0].emit(server.serviceEvents.WORKFLOW_REMOVE_RUN, {
+ workflowManagerClients[0].emit(eventrouter.serviceEvents.WORKFLOW_REMOVE_RUN, {
workflow_id: workflowRunInfo.workflowId,
workflow_run_id: workflowRunInfo.workflowRunId
});
@@ -177,7 +177,7 @@
// remove workflows
_.forOwn(workflowIds, (workflowId) => {
- workflowManagerClients[0].emit(server.serviceEvents.WORKFLOW_REMOVE, {
+ workflowManagerClients[0].emit(eventrouter.serviceEvents.WORKFLOW_REMOVE, {
workflow_id: workflowId
});
});
@@ -231,7 +231,10 @@
this.timeout(5000);
// kickstart the workflow
- probeClient.emit('onu.events', {serialNumber: 'testSerialXXX', other: 'test_other_field'});
+ probeClient.emit(eventrouter.serviceEvents.EVENT_EMIT, {
+ topic: 'onu.events',
+ message: {serialNumber: 'testSerialXXX', other: 'test_other_field'}
+ });
setTimeout(() => {
expect(receivedKickstartMessages.length, 'num of message stores').to.equal(2);
receivedKickstartMessages.forEach((receivedKickstartMessageStore) => {
@@ -245,7 +248,10 @@
this.timeout(5000);
// kickstart the workflow
- probeClient.emit('onu.events', {serialNumber: 'testSerialXXX', other: 'test_other_field'});
+ probeClient.emit(eventrouter.serviceEvents.EVENT_EMIT, {
+ topic: 'onu.events',
+ message: {serialNumber: 'testSerialXXX', other: 'test_other_field'}
+ });
setTimeout(() => {
// kickstart will take 2 seconds roughly
expect(workflowRunInfos.length, 'num of workflow runs').to.equal(1);
@@ -259,7 +265,10 @@
this.timeout(5000);
// kickstart the workflow
- probeClient.emit('onu.events', {serialNumber: 'testSerialXXX', other: 'test_other_field'});
+ probeClient.emit(eventrouter.serviceEvents.EVENT_EMIT, {
+ topic: 'onu.events',
+ message: {serialNumber: 'testSerialXXX', other: 'test_other_field'}
+ });
setTimeout(() => {
// kickstart will take 2 seconds roughly
expect(workflowRunInfos.length, 'num of workflow runs').to.equal(1);
@@ -297,14 +306,14 @@
this.timeout(5000);
// kickstart the workflow
- probeClient.emit(
- 'onu.events',
- {serialNumber: 'testSerialXXX', other: 'test_other_field'}
- );
- probeClient.emit(
- 'datamodel.AttWorkflowDriverServiceInstance',
- {operation: 'update', serialNumber: 'testSerialXXX', other: 'updated_test_other_field'}
- );
+ probeClient.emit(eventrouter.serviceEvents.EVENT_EMIT, {
+ topic: 'onu.events',
+ message: {serialNumber: 'testSerialXXX', other: 'test_other_field'}
+ });
+ probeClient.emit(eventrouter.serviceEvents.EVENT_EMIT, {
+ topic: 'datamodel.AttWorkflowDriverServiceInstance',
+ message: {operation: 'update', serialNumber: 'testSerialXXX', other: 'updated_test_other_field'}
+ });
setTimeout(() => {
// kickstart will take 2 seconds roughly
expect(workflowRunInfos.length, 'num of workflow runs').to.equal(1);
diff --git a/spec/websocket_clients.spec.js b/spec/websocket_clients.spec.js
index b1f6333..b9401bc 100644
--- a/spec/websocket_clients.spec.js
+++ b/spec/websocket_clients.spec.js
@@ -146,7 +146,10 @@
},
(callback) => {
// kickstart the test workflow
- probeClient.emit('onu.events', {serialNumber: 'testSerialXXX', other: 'test_other_field'});
+ probeClient.emit(eventrouter.serviceEvents.EVENT_EMIT, {
+ topic: 'onu.events',
+ message: {serialNumber: 'testSerialXXX', other: 'test_other_field'}
+ });
setTimeout(() => {
expect(workflowRunId).to.not.be.undefined;
callback(null, true);
diff --git a/src/controllers/eventrouter.js b/src/controllers/eventrouter.js
index 4919669..cc7d222 100644
--- a/src/controllers/eventrouter.js
+++ b/src/controllers/eventrouter.js
@@ -22,6 +22,7 @@
const logger = require('../config/logger.js');
const Client = require('../types/client.js');
const WorkflowRun = require('../types/workflowrun.js');
+ const ws_probe = require('./ws_probe.js');
const ws_manager = require('./ws_manager.js');
const ws_workflowrun = require('./ws_workflowrun.js');
@@ -44,12 +45,17 @@
GREETING: 'cord.workflow.ctlsvc.greeting'
};
- // add ws_mgroperation events
+ // add ws_probe events
+ _.forOwn(ws_probe.serviceEvents, (wsServiceEvent, key) => {
+ serviceEvents[key] = wsServiceEvent;
+ });
+
+ // add ws_manager events
_.forOwn(ws_manager.serviceEvents, (wsServiceEvent, key) => {
serviceEvents[key] = wsServiceEvent;
});
- // add ws_runoperation events
+ // add ws_workflowrun events
_.forOwn(ws_workflowrun.serviceEvents, (wsServiceEvent, key) => {
serviceEvents[key] = wsServiceEvent;
});
@@ -250,7 +256,7 @@
return true;
};
- const sendEvent = (topic, message) => {
+ const emitEvent = (topic, message) => {
// list of workflowIds
// to check if there are workflow runs for the events
let workflowIdsRunning = [];
@@ -359,41 +365,62 @@
}
if(c.getType() === Client.Type.PROBE) {
- // probe' messages are relayed
- // relay messages based on topic
+ // probe
// probe protocol:
// REQ:
- // topic: event topic
- // message: <data>
+ // topic: operation
+ // message: {
+ // req_id: <req_id>,
+ // topic: <topic>,
+ // message: <data>
+ // }
// RES:
// topic: topic sent
- // message: {result: <true/false>, message: <error message> }
+ // message: {
+ // req_id: <req_id>,
+ // error: <true/false>,
+ // result: <true/false>,
+ // message: <error message>
+ // }
allClients[clientId] = c;
probeClients[clientId] = c;
- socket.on('*', (msg) => {
- let jsonMsg = msg.data;
- if(jsonMsg.length === 2) {
- // must have two parts
- // first part is topic
- // second part is message body
- let topic = jsonMsg[0];
- let messageBody = jsonMsg[1];
+ // attach probe operations
+ let router = ws_probe.getRouter();
+ _.forOwn(router, (routerElem, _key) => {
+ socket.on(routerElem.topic, (msg) => {
+ // handle a common parameter - req_id
+ // when we get req_id, return the same req_id in response.
+ // this is to help identify a request from a response at client-side
+ let req_id = 101010; // default number, signiture
+ if(msg && checkObject(msg)) {
+ if('req_id' in msg) {
+ req_id = msg.req_id;
+ }
+ }
- sendEvent(topic, messageBody);
+ routerElem.handler(routerElem.topic, msg || {}, (err, result) => {
+ if(err) {
+ logger.log('warn', `unable to handle a message - ${err}`);
+ socket.emit(routerElem.topic, {
+ req_id: req_id,
+ error: true,
+ result: result,
+ message: err
+ });
+ return;
+ }
- // return true for success
- socket.emit(topic, {
- result: true
+ // we return result
+ if(routerElem.return === undefined || routerElem.return) {
+ socket.emit(routerElem.topic, {
+ req_id: req_id,
+ error: false,
+ result: result
+ });
+ }
});
- }
- else {
- logger.log('warn', `unexpected message is received - ${JSON.stringify(jsonMsg)}`);
- socket.emit(jsonMsg[0], {
- result: false,
- message: `unexpected message is received - ${JSON.stringify(jsonMsg)}`
- });
- }
+ });
});
return true;
}
@@ -601,7 +628,7 @@
getWorkflowRunClients: () => { return workflowRunClients; },
clientType: Client.Type,
//setIO: setIO,
- sendEvent: sendEvent,
+ emitEvent: emitEvent,
countQueuedEvents: countQueuedEvents,
fetchEvent: fetchEvent,
addClient: addClient,
diff --git a/src/controllers/websocket.js b/src/controllers/websocket.js
index b5eaf13..6112449 100644
--- a/src/controllers/websocket.js
+++ b/src/controllers/websocket.js
@@ -18,7 +18,6 @@
'use strict';
const socketio = require('socket.io');
- const ioWildcard = require('socketio-wildcard');
const client = require('../types/client.js');
const eventrouter = require('./eventrouter.js');
const logger = require('../config/logger.js');
@@ -30,7 +29,6 @@
pingInterval: 500,
pingTimeout: 2000,
});
- io.use(ioWildcard());
// set io to eventrouter
//eventrouter.setIO(io);
diff --git a/src/controllers/ws_probe.js b/src/controllers/ws_probe.js
new file mode 100644
index 0000000..a897086
--- /dev/null
+++ b/src/controllers/ws_probe.js
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+ 'use strict';
+
+ const logger = require('../config/logger.js');
+
+ let serviceEvents = {
+ // probe -> controller -> probe
+ EVENT_EMIT: 'cord.workflow.ctlsvc.event.emit'
+ };
+
+ // WebSocket interface for emitting an event
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.event.emit',
+ // message: {
+ // req_id: <req_id>, // optional
+ // topic: <topic>,
+ // message: <message>
+ // }
+ // }
+ const emitEvent = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('topic' in message)) {
+ // error
+ errorMessage = `field 'topic' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('message' in message)) {
+ // error
+ errorMessage = `field 'message' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let result = eventrouter.emitEvent(
+ message.topic,
+ message.message
+ );
+ if(!result) {
+ errorMessage = `failed to emit event ${message.topic} - ${message.message}`;
+ cb(errorMessage, false);
+ }
+ else {
+ cb(null, true);
+ }
+ return;
+ };
+
+ const getRouter = () => {
+ return {
+ emitEvent: {
+ topic: serviceEvents.EVENT_EMIT,
+ handler: emitEvent
+ }
+ };
+ };
+
+ module.exports = {
+ serviceEvents: serviceEvents,
+ getRouter: getRouter
+ };
+})();