changed to executors
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
index 9e97ee9..a44e12b 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
@@ -117,12 +117,8 @@
notificationIds = coreService.getIdGenerator("fpc-notification-ids");
- jettyServer = new Thread(JettyServer::init);
- parseStream = new Thread(new ParseStream());
-
- jettyServer.start();
- parseStream.start();
-
+ JettyServer.createInstance().open();
+ ParseStream.createInstance().open();
NBEventWorkerManager.createInstance(20, restconfService).open();
HTTPNotifier.createInstance().open();
@@ -143,9 +139,8 @@
NBEventWorkerManager.getInstance().close();
HTTPNotifier.getInstance().close();
-
- jettyServer.stop();
- parseStream.stop();
+ ParseStream.getInstance().close();
+ JettyServer.getInstance().close();
log.info("FPC Service Stopped");
}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventClient.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventClient.java
index ec480cb..664d9d9 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventClient.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventClient.java
@@ -70,14 +70,10 @@
@Override
protected void onCharReceived(final CharBuffer buf, final IOControl ioctrl) {
- try {
- char[] charArray = new char[buf.remaining()];
- System.arraycopy(buf.array(), 0, charArray, 0, buf.remaining());
- CharBuffer charBuffer = CharBuffer.wrap(charArray);
- ParseStream.blockingQueue.put(new AbstractMap.SimpleEntry<>(clientUri, charBuffer));
- } catch (InterruptedException e) {
- log.error(ExceptionUtils.getFullStackTrace(e));
- }
+ char[] charArray = new char[buf.remaining()];
+ System.arraycopy(buf.array(), 0, charArray, 0, buf.remaining());
+ CharBuffer charBuffer = CharBuffer.wrap(charArray);
+ ParseStream.getInstance().send(new AbstractMap.SimpleEntry<>(clientUri, charBuffer));
}
@Override
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java
index deba165..7da2de9 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java
@@ -14,37 +14,63 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
/**
* SSE Server implementation
*/
-public class JettyServer {
+public class JettyServer implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(JettyServer.class);
+ private static JettyServer _isntance;
+ private Server server;
+ public static JettyServer createInstance() {
+ if (_isntance == null) {
+ _isntance = new JettyServer();
+ }
+ return _isntance;
+ }
+
+ public static JettyServer getInstance() {
+ return _isntance;
+ }
/**
* Method used to initialize and start the SSE server
*/
- public static void init() {
- Server server = new Server(8070);
- ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
- context.setContextPath("/");
+ public void open() {
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ executorService.submit(() -> {
+ server = new Server(8070);
+ ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+ context.setContextPath("/");
- ServletHolder requestServletHolder = new ServletHolder(EventServer.class);
- context.addServlet(requestServletHolder, "/response");
+ ServletHolder requestServletHolder = new ServletHolder(EventServer.class);
+ context.addServlet(requestServletHolder, "/response");
- ServletHolder notificationServletHolder = new ServletHolder(NotificationServer.class);
- context.addServlet(notificationServletHolder, "/notification");
+ ServletHolder notificationServletHolder = new ServletHolder(NotificationServer.class);
+ context.addServlet(notificationServletHolder, "/notification");
- server.setHandler(context);
- context.addEventListener(new ConfigureService());
- context.addEventListener(new NotificationService());
+ server.setHandler(context);
+ context.addEventListener(new ConfigureService());
+ context.addEventListener(new NotificationService());
- try {
- server.start();
- log.info("Server Started");
- server.join();
- } catch (Exception e) {
- log.error(ExceptionUtils.getFullStackTrace(e));
- }
+ try {
+ server.start();
+ log.info("Jetty-Server Started");
+ server.join();
+ } catch (Exception e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ });
}
+ @Override
+ public void close() {
+ try {
+ server.stop();
+ } catch (Exception e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ }
}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java
index 2a7bd54..3397bd6 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java
@@ -21,10 +21,7 @@
import java.util.AbstractMap;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.*;
/**
* Implements a Worker to process event-data pairs sent by the FPC Client
@@ -66,53 +63,56 @@
}
public void open() {
- this.run = true;
- Entry<String, Entry<String, String>> contents = null;
- while (run) {
- try {
- contents = blockingQueue.take();
+ ExecutorService executorService = Executors.newFixedThreadPool(this.poolSize);
+ executorService.submit(() -> {
+ this.run = true;
+ Entry<String, Entry<String, String>> contents = null;
+ while (run) {
+ try {
+ contents = blockingQueue.take();
- // call RESTCONF service
- ObjectMapper mapper = new ObjectMapper();
- ObjectNode node = (ObjectNode) mapper.readTree(contents.getValue().getValue());
- if (contents.getValue().getKey().contains("ietf-dmm-fpcagent:configure")) {
- CompletableFuture<RestconfRpcOutput> output = restconfService.runRpc(
- new URI("/onos/restconf/operations/ietf-dmm-fpcagent:configure"),
- node,
- contents.getKey()
- );
- ConfigureService.blockingQueue.add(
- new AbstractMap.SimpleEntry(
- contents.getKey(),
- "event:application/json;/restconf/operations/ietf-dmm-fpcagent:configure\ndata:" + output.get().output().toString() + "\r\n"
- )
- );
- } else if (contents.getValue().getKey().contains("fpc:register-client")) {
- CompletableFuture<RestconfRpcOutput> output = restconfService.runRpc(
- new URI("/onos/restconf/operations/fpc:register-client"),
- node,
- contents.getKey()
- );
- addClientIdToUriMapping(output.get().output().toString(), contents.getKey());
- ConfigureService.registerClientQueue.add(
- new AbstractMap.SimpleEntry(
- contents.getKey(),
- "event:application/json;/restconf/operations/fpc:register_client\ndata:" + output.get().output().toString() + "\r\n"
- )
- );
- } else if (contents.getValue().getKey().contains("fpc:deregister-client")) {
- CompletableFuture<RestconfRpcOutput> output = restconfService.runRpc(
- new URI("/onos/restconf/operations/fpc:deregister-client"),
- node,
- contents.getKey()
- );
- removeClientIdToUriMapping(contents.getValue().getValue());
+ // call RESTCONF service
+ ObjectMapper mapper = new ObjectMapper();
+ ObjectNode node = (ObjectNode) mapper.readTree(contents.getValue().getValue());
+ if (contents.getValue().getKey().contains("ietf-dmm-fpcagent:configure")) {
+ CompletableFuture<RestconfRpcOutput> output = restconfService.runRpc(
+ new URI("/onos/restconf/operations/ietf-dmm-fpcagent:configure"),
+ node,
+ contents.getKey()
+ );
+ ConfigureService.blockingQueue.add(
+ new AbstractMap.SimpleEntry(
+ contents.getKey(),
+ "event:application/json;/restconf/operations/ietf-dmm-fpcagent:configure\ndata:" + output.get().output().toString() + "\r\n"
+ )
+ );
+ } else if (contents.getValue().getKey().contains("fpc:register-client")) {
+ CompletableFuture<RestconfRpcOutput> output = restconfService.runRpc(
+ new URI("/onos/restconf/operations/fpc:register-client"),
+ node,
+ contents.getKey()
+ );
+ addClientIdToUriMapping(output.get().output().toString(), contents.getKey());
+ ConfigureService.registerClientQueue.add(
+ new AbstractMap.SimpleEntry(
+ contents.getKey(),
+ "event:application/json;/restconf/operations/fpc:register_client\ndata:" + output.get().output().toString() + "\r\n"
+ )
+ );
+ } else if (contents.getValue().getKey().contains("fpc:deregister-client")) {
+ CompletableFuture<RestconfRpcOutput> output = restconfService.runRpc(
+ new URI("/onos/restconf/operations/fpc:deregister-client"),
+ node,
+ contents.getKey()
+ );
+ removeClientIdToUriMapping(contents.getValue().getValue());
+ }
+
+ } catch (Exception e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
}
-
- } catch (Exception e) {
- log.error(ExceptionUtils.getFullStackTrace(e));
}
- }
+ });
}
/**
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java
index edd2415..7e94ad4 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java
@@ -15,17 +15,45 @@
import java.util.AbstractMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
/**
* A class that parses the request stream and enqueues creates event-data pairs for processing
*/
-public class ParseStream implements Runnable {
+public class ParseStream implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(ParseStream.class);
- public static BlockingQueue<Map.Entry<String, CharBuffer>> blockingQueue = new LinkedBlockingQueue<Map.Entry<String, CharBuffer>>();
+ private BlockingQueue<Map.Entry<String, CharBuffer>> blockingQueue;
private CharBuffer charBuf;
private String partialChunk; //Chunked data left over at the end of the buffer
- boolean chunkedBuffer;
+ private boolean chunkedBuffer;
+ private boolean run;
+
+ private static ParseStream _instance;
+
+ private ParseStream() {
+ blockingQueue = new LinkedBlockingQueue<>();
+ }
+
+ public static ParseStream createInstance() {
+ if (_instance == null) {
+ _instance = new ParseStream();
+ }
+ return _instance;
+ }
+
+ public static ParseStream getInstance() {
+ return _instance;
+ }
+
+ public void send(Map.Entry<String, CharBuffer> buf) {
+ try {
+ blockingQueue.put(buf);
+ } catch (InterruptedException e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ }
private String getLine() {
StringBuilder s = new StringBuilder();
@@ -61,41 +89,48 @@
return null;
}
- @Override
- public void run() {
- chunkedBuffer = false;
- String event = null, data = null;
- while (true) {
- try {
- Map.Entry<String, CharBuffer> entry = blockingQueue.take();
- this.charBuf = entry.getValue();
- while (this.charBuf.hasRemaining()) {
- String line = getLine();
- if (line != null) {
- if (line.startsWith("event")) {
- event = line.split(":", 2)[1];
+ public void open() {
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ executorService.submit(() -> {
+ run = true;
+ chunkedBuffer = false;
+ String event = null, data = null;
+ while (run) {
+ try {
+ Map.Entry<String, CharBuffer> entry = blockingQueue.take();
+ this.charBuf = entry.getValue();
+ while (this.charBuf.hasRemaining()) {
+ String line = getLine();
+ if (line != null) {
+ if (line.startsWith("event")) {
+ event = line.split(":", 2)[1];
- } else if (line.startsWith("data")) {
- data = line.split(":", 2)[1];
- if (event != null && data != null) {
- NBEventWorkerManager.getInstance()
- .send(
- new AbstractMap.SimpleEntry<>(
- entry.getKey(),
- new AbstractMap.SimpleEntry<>(event, data)
- )
- );
+ } else if (line.startsWith("data")) {
+ data = line.split(":", 2)[1];
+ if (event != null && data != null) {
+ NBEventWorkerManager.getInstance()
+ .send(
+ new AbstractMap.SimpleEntry<>(
+ entry.getKey(),
+ new AbstractMap.SimpleEntry<>(event, data)
+ )
+ );
+ }
+ event = null;
+ data = null;
}
- event = null;
- data = null;
}
- }
+ }
+ } catch (InterruptedException e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
}
- } catch (InterruptedException e) {
- log.error(ExceptionUtils.getFullStackTrace(e));
}
- }
+ });
}
+ @Override
+ public void close() {
+ run = false;
+ }
}