Merge branch 'master' of ssh://gerrit.opencord.org:29418/fpcagent
Conflicts:
apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java
apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
index 9e97ee9..a44e12b 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
@@ -117,12 +117,8 @@
notificationIds = coreService.getIdGenerator("fpc-notification-ids");
- jettyServer = new Thread(JettyServer::init);
- parseStream = new Thread(new ParseStream());
-
- jettyServer.start();
- parseStream.start();
-
+ JettyServer.createInstance().open();
+ ParseStream.createInstance().open();
NBEventWorkerManager.createInstance(20, restconfService).open();
HTTPNotifier.createInstance().open();
@@ -143,9 +139,8 @@
NBEventWorkerManager.getInstance().close();
HTTPNotifier.getInstance().close();
-
- jettyServer.stop();
- parseStream.stop();
+ ParseStream.getInstance().close();
+ JettyServer.getInstance().close();
log.info("FPC Service Stopped");
}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventClient.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventClient.java
index 4728b79..88b0201 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventClient.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventClient.java
@@ -71,14 +71,10 @@
@Override
protected void onCharReceived(final CharBuffer buf, final IOControl ioctrl) {
- try {
- char[] charArray = new char[buf.remaining()];
- System.arraycopy(buf.array(), 0, charArray, 0, buf.remaining());
- CharBuffer charBuffer = CharBuffer.wrap(charArray);
- ParseStream.blockingQueue.put(new AbstractMap.SimpleEntry<>(clientUri, charBuffer));
- } catch (InterruptedException e) {
- log.error(ExceptionUtils.getFullStackTrace(e));
- }
+ char[] charArray = new char[buf.remaining()];
+ System.arraycopy(buf.array(), 0, charArray, 0, buf.remaining());
+ CharBuffer charBuffer = CharBuffer.wrap(charArray);
+ ParseStream.getInstance().send(new AbstractMap.SimpleEntry<>(clientUri, charBuffer));
}
@Override
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java
index df6526a..7da2de9 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java
@@ -14,37 +14,63 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
/**
* SSE Server implementation
*/
-public class JettyServer {
+public class JettyServer implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(JettyServer.class);
+ private static JettyServer _isntance;
+ private Server server;
+ public static JettyServer createInstance() {
+ if (_isntance == null) {
+ _isntance = new JettyServer();
+ }
+ return _isntance;
+ }
+
+ public static JettyServer getInstance() {
+ return _isntance;
+ }
/**
* Method used to initialize and start the SSE server
*/
- public static void init() {
- Server server = new Server(8070);
- ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
- context.setContextPath("/");
+ public void open() {
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ executorService.submit(() -> {
+ server = new Server(8070);
+ ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+ context.setContextPath("/");
- ServletHolder requestServletHolder = new ServletHolder(EventServer.class);
- context.addServlet(requestServletHolder, "/response");
+ ServletHolder requestServletHolder = new ServletHolder(EventServer.class);
+ context.addServlet(requestServletHolder, "/response");
- ServletHolder notificationServletHolder = new ServletHolder(NotificationServer.class);
- context.addServlet(notificationServletHolder, "/notification");
+ ServletHolder notificationServletHolder = new ServletHolder(NotificationServer.class);
+ context.addServlet(notificationServletHolder, "/notification");
- server.setHandler(context);
- context.addEventListener(new ConfigureService());
- context.addEventListener(new NotificationService());
+ server.setHandler(context);
+ context.addEventListener(new ConfigureService());
+ context.addEventListener(new NotificationService());
- try {
- server.start();
- log.info("Jetty server started");
- server.join();
- } catch (Exception e) {
- log.error(ExceptionUtils.getFullStackTrace(e));
- }
+ try {
+ server.start();
+ log.info("Jetty-Server Started");
+ server.join();
+ } catch (Exception e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ });
}
+ @Override
+ public void close() {
+ try {
+ server.stop();
+ } catch (Exception e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ }
}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java
index 0705f4c..7e94ad4 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java
@@ -15,17 +15,45 @@
import java.util.AbstractMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
/**
* A class that parses the request stream and enqueues creates event-data pairs for processing
*/
-public class ParseStream implements Runnable {
+public class ParseStream implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(ParseStream.class);
- public static BlockingQueue<Map.Entry<String, CharBuffer>> blockingQueue = new LinkedBlockingQueue<Map.Entry<String, CharBuffer>>();
+ private BlockingQueue<Map.Entry<String, CharBuffer>> blockingQueue;
private CharBuffer charBuf;
private String partialChunk; //Chunked data left over at the end of the buffer
- boolean chunkedBuffer;
+ private boolean chunkedBuffer;
+ private boolean run;
+
+ private static ParseStream _instance;
+
+ private ParseStream() {
+ blockingQueue = new LinkedBlockingQueue<>();
+ }
+
+ public static ParseStream createInstance() {
+ if (_instance == null) {
+ _instance = new ParseStream();
+ }
+ return _instance;
+ }
+
+ public static ParseStream getInstance() {
+ return _instance;
+ }
+
+ public void send(Map.Entry<String, CharBuffer> buf) {
+ try {
+ blockingQueue.put(buf);
+ } catch (InterruptedException e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ }
private String getLine() {
StringBuilder s = new StringBuilder();
@@ -61,42 +89,48 @@
return null;
}
- @Override
- public void run() {
- chunkedBuffer = false;
- String event = null, data = null;
- log.info("ParseStream started");
- while (true) {
- try {
- Map.Entry<String, CharBuffer> entry = blockingQueue.take();
- this.charBuf = entry.getValue();
- while (this.charBuf.hasRemaining()) {
- String line = getLine();
- if (line != null) {
- if (line.startsWith("event")) {
- event = line.split(":", 2)[1];
+ public void open() {
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ executorService.submit(() -> {
+ run = true;
+ chunkedBuffer = false;
+ String event = null, data = null;
+ while (run) {
+ try {
+ Map.Entry<String, CharBuffer> entry = blockingQueue.take();
+ this.charBuf = entry.getValue();
+ while (this.charBuf.hasRemaining()) {
+ String line = getLine();
+ if (line != null) {
+ if (line.startsWith("event")) {
+ event = line.split(":", 2)[1];
- } else if (line.startsWith("data")) {
- data = line.split(":", 2)[1];
- if (event != null && data != null) {
- NBEventWorkerManager.getInstance()
- .send(
- new AbstractMap.SimpleEntry<>(
- entry.getKey(),
- new AbstractMap.SimpleEntry<>(event, data)
- )
- );
+ } else if (line.startsWith("data")) {
+ data = line.split(":", 2)[1];
+ if (event != null && data != null) {
+ NBEventWorkerManager.getInstance()
+ .send(
+ new AbstractMap.SimpleEntry<>(
+ entry.getKey(),
+ new AbstractMap.SimpleEntry<>(event, data)
+ )
+ );
+ }
+ event = null;
+ data = null;
}
- event = null;
- data = null;
}
- }
+ }
+ } catch (InterruptedException e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
}
- } catch (InterruptedException e) {
- log.error(ExceptionUtils.getFullStackTrace(e));
}
- }
+ });
}
+ @Override
+ public void close() {
+ run = false;
+ }
}