Merge branch 'master' of ssh://gerrit.opencord.org:29418/fpcagent

Conflicts:
	apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java
	apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
index 9e97ee9..a44e12b 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
@@ -117,12 +117,8 @@
 
         notificationIds = coreService.getIdGenerator("fpc-notification-ids");
 
-        jettyServer = new Thread(JettyServer::init);
-        parseStream = new Thread(new ParseStream());
-
-        jettyServer.start();
-        parseStream.start();
-
+        JettyServer.createInstance().open();
+        ParseStream.createInstance().open();
         NBEventWorkerManager.createInstance(20, restconfService).open();
         HTTPNotifier.createInstance().open();
 
@@ -143,9 +139,8 @@
 
         NBEventWorkerManager.getInstance().close();
         HTTPNotifier.getInstance().close();
-
-        jettyServer.stop();
-        parseStream.stop();
+        ParseStream.getInstance().close();
+        JettyServer.getInstance().close();
 
         log.info("FPC Service Stopped");
     }
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventClient.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventClient.java
index 4728b79..88b0201 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventClient.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventClient.java
@@ -71,14 +71,10 @@
 
         @Override
         protected void onCharReceived(final CharBuffer buf, final IOControl ioctrl) {
-            try {
-                char[] charArray = new char[buf.remaining()];
-                System.arraycopy(buf.array(), 0, charArray, 0, buf.remaining());
-                CharBuffer charBuffer = CharBuffer.wrap(charArray);
-                ParseStream.blockingQueue.put(new AbstractMap.SimpleEntry<>(clientUri, charBuffer));
-            } catch (InterruptedException e) {
-                log.error(ExceptionUtils.getFullStackTrace(e));
-            }
+            char[] charArray = new char[buf.remaining()];
+            System.arraycopy(buf.array(), 0, charArray, 0, buf.remaining());
+            CharBuffer charBuffer = CharBuffer.wrap(charArray);
+            ParseStream.getInstance().send(new AbstractMap.SimpleEntry<>(clientUri, charBuffer));
         }
 
         @Override
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java
index df6526a..7da2de9 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java
@@ -14,37 +14,63 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
 /**
  * SSE Server implementation
  */
-public class JettyServer {
+public class JettyServer implements AutoCloseable {
 	private static final Logger log = LoggerFactory.getLogger(JettyServer.class);
+    private static JettyServer _isntance;
+    private Server server;
 
+    public static JettyServer createInstance() {
+        if (_isntance == null) {
+            _isntance = new JettyServer();
+        }
+        return _isntance;
+    }
+
+    public static JettyServer getInstance() {
+        return _isntance;
+    }
 	/**
 	 * Method used to initialize and start the SSE server
 	 */
-	public static void init() {
-		Server server = new Server(8070);
-		ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
-		context.setContextPath("/");
+	public void open() {
+		ExecutorService executorService = Executors.newSingleThreadExecutor();
+		executorService.submit(() -> {
+			server = new Server(8070);
+			ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+			context.setContextPath("/");
 
-		ServletHolder requestServletHolder = new ServletHolder(EventServer.class);
-		context.addServlet(requestServletHolder, "/response");
+			ServletHolder requestServletHolder = new ServletHolder(EventServer.class);
+			context.addServlet(requestServletHolder, "/response");
 
-		ServletHolder notificationServletHolder = new ServletHolder(NotificationServer.class);
-		context.addServlet(notificationServletHolder, "/notification");
+			ServletHolder notificationServletHolder = new ServletHolder(NotificationServer.class);
+			context.addServlet(notificationServletHolder, "/notification");
 
-		server.setHandler(context);
-		context.addEventListener(new ConfigureService());
-		context.addEventListener(new NotificationService());
+			server.setHandler(context);
+			context.addEventListener(new ConfigureService());
+			context.addEventListener(new NotificationService());
 
-		try {
-			server.start();
-			log.info("Jetty server started");
-			server.join();
-		} catch (Exception e) {
-			log.error(ExceptionUtils.getFullStackTrace(e));
-		}
+			try {
+				server.start();
+				log.info("Jetty-Server Started");
+				server.join();
+			} catch (Exception e) {
+				log.error(ExceptionUtils.getFullStackTrace(e));
+			}
+		});
 	}
 
+    @Override
+    public void close() {
+        try {
+            server.stop();
+        } catch (Exception e) {
+            log.error(ExceptionUtils.getFullStackTrace(e));
+        }
+    }
 }
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java
index 0705f4c..7e94ad4 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java
@@ -15,17 +15,45 @@
 import java.util.AbstractMap;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * A class that parses the request stream and enqueues creates event-data pairs for processing
  */
-public class ParseStream implements Runnable {
+public class ParseStream implements AutoCloseable {
     private static final Logger log = LoggerFactory.getLogger(ParseStream.class);
-    public static BlockingQueue<Map.Entry<String, CharBuffer>> blockingQueue = new LinkedBlockingQueue<Map.Entry<String, CharBuffer>>();
+    private BlockingQueue<Map.Entry<String, CharBuffer>> blockingQueue;
     private CharBuffer charBuf;
     private String partialChunk; //Chunked data left over at the end of the buffer
-    boolean chunkedBuffer;
+    private boolean chunkedBuffer;
+    private boolean run;
+
+    private static ParseStream _instance;
+
+    private ParseStream() {
+        blockingQueue = new LinkedBlockingQueue<>();
+    }
+
+    public static ParseStream createInstance() {
+        if (_instance == null) {
+            _instance = new ParseStream();
+        }
+        return _instance;
+    }
+
+    public static ParseStream getInstance() {
+        return _instance;
+    }
+
+    public void send(Map.Entry<String, CharBuffer> buf) {
+        try {
+            blockingQueue.put(buf);
+        } catch (InterruptedException e) {
+            log.error(ExceptionUtils.getFullStackTrace(e));
+        }
+    }
 
     private String getLine() {
         StringBuilder s = new StringBuilder();
@@ -61,42 +89,48 @@
         return null;
     }
 
-    @Override
-    public void run() {
-        chunkedBuffer = false;
-        String event = null, data = null;
-	log.info("ParseStream started");
-        while (true) {
-            try {
-                Map.Entry<String, CharBuffer> entry = blockingQueue.take();
-                this.charBuf = entry.getValue();
-                while (this.charBuf.hasRemaining()) {
-                    String line = getLine();
-                    if (line != null) {
-                        if (line.startsWith("event")) {
-                            event = line.split(":", 2)[1];
+    public void open() {
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        executorService.submit(() -> {
+            run = true;
+            chunkedBuffer = false;
+            String event = null, data = null;
+            while (run) {
+                try {
+                    Map.Entry<String, CharBuffer> entry = blockingQueue.take();
+                    this.charBuf = entry.getValue();
+                    while (this.charBuf.hasRemaining()) {
+                        String line = getLine();
+                        if (line != null) {
+                            if (line.startsWith("event")) {
+                                event = line.split(":", 2)[1];
 
-                        } else if (line.startsWith("data")) {
-                            data = line.split(":", 2)[1];
-                            if (event != null && data != null) {
-                                NBEventWorkerManager.getInstance()
-                                        .send(
-                                                new AbstractMap.SimpleEntry<>(
-                                                        entry.getKey(),
-                                                        new AbstractMap.SimpleEntry<>(event, data)
-                                                )
-                                        );
+                            } else if (line.startsWith("data")) {
+                                data = line.split(":", 2)[1];
+                                if (event != null && data != null) {
+                                    NBEventWorkerManager.getInstance()
+                                            .send(
+                                                    new AbstractMap.SimpleEntry<>(
+                                                            entry.getKey(),
+                                                            new AbstractMap.SimpleEntry<>(event, data)
+                                                    )
+                                            );
+                                }
+                                event = null;
+                                data = null;
                             }
-                            event = null;
-                            data = null;
                         }
-                    }
 
+                    }
+                } catch (InterruptedException e) {
+                    log.error(ExceptionUtils.getFullStackTrace(e));
                 }
-            } catch (InterruptedException e) {
-                log.error(ExceptionUtils.getFullStackTrace(e));
             }
-        }
+        });
     }
 
+    @Override
+    public void close() {
+        run = false;
+    }
 }