changed to executors
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
index 9e97ee9..a44e12b 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
@@ -117,12 +117,8 @@
 
         notificationIds = coreService.getIdGenerator("fpc-notification-ids");
 
-        jettyServer = new Thread(JettyServer::init);
-        parseStream = new Thread(new ParseStream());
-
-        jettyServer.start();
-        parseStream.start();
-
+        JettyServer.createInstance().open();
+        ParseStream.createInstance().open();
         NBEventWorkerManager.createInstance(20, restconfService).open();
         HTTPNotifier.createInstance().open();
 
@@ -143,9 +139,8 @@
 
         NBEventWorkerManager.getInstance().close();
         HTTPNotifier.getInstance().close();
-
-        jettyServer.stop();
-        parseStream.stop();
+        ParseStream.getInstance().close();
+        JettyServer.getInstance().close();
 
         log.info("FPC Service Stopped");
     }
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventClient.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventClient.java
index ec480cb..664d9d9 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventClient.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventClient.java
@@ -70,14 +70,10 @@
 
         @Override
         protected void onCharReceived(final CharBuffer buf, final IOControl ioctrl) {
-            try {
-                char[] charArray = new char[buf.remaining()];
-                System.arraycopy(buf.array(), 0, charArray, 0, buf.remaining());
-                CharBuffer charBuffer = CharBuffer.wrap(charArray);
-                ParseStream.blockingQueue.put(new AbstractMap.SimpleEntry<>(clientUri, charBuffer));
-            } catch (InterruptedException e) {
-                log.error(ExceptionUtils.getFullStackTrace(e));
-            }
+            char[] charArray = new char[buf.remaining()];
+            System.arraycopy(buf.array(), 0, charArray, 0, buf.remaining());
+            CharBuffer charBuffer = CharBuffer.wrap(charArray);
+            ParseStream.getInstance().send(new AbstractMap.SimpleEntry<>(clientUri, charBuffer));
         }
 
         @Override
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java
index deba165..7da2de9 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java
@@ -14,37 +14,63 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
 /**
  * SSE Server implementation
  */
-public class JettyServer {
+public class JettyServer implements AutoCloseable {
 	private static final Logger log = LoggerFactory.getLogger(JettyServer.class);
+    private static JettyServer _isntance;
+    private Server server;
 
+    public static JettyServer createInstance() {
+        if (_isntance == null) {
+            _isntance = new JettyServer();
+        }
+        return _isntance;
+    }
+
+    public static JettyServer getInstance() {
+        return _isntance;
+    }
 	/**
 	 * Method used to initialize and start the SSE server
 	 */
-	public static void init() {
-		Server server = new Server(8070);
-		ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
-		context.setContextPath("/");
+	public void open() {
+		ExecutorService executorService = Executors.newSingleThreadExecutor();
+		executorService.submit(() -> {
+			server = new Server(8070);
+			ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+			context.setContextPath("/");
 
-		ServletHolder requestServletHolder = new ServletHolder(EventServer.class);
-		context.addServlet(requestServletHolder, "/response");
+			ServletHolder requestServletHolder = new ServletHolder(EventServer.class);
+			context.addServlet(requestServletHolder, "/response");
 
-		ServletHolder notificationServletHolder = new ServletHolder(NotificationServer.class);
-		context.addServlet(notificationServletHolder, "/notification");
+			ServletHolder notificationServletHolder = new ServletHolder(NotificationServer.class);
+			context.addServlet(notificationServletHolder, "/notification");
 
-		server.setHandler(context);
-		context.addEventListener(new ConfigureService());
-		context.addEventListener(new NotificationService());
+			server.setHandler(context);
+			context.addEventListener(new ConfigureService());
+			context.addEventListener(new NotificationService());
 
-		try {
-			server.start();
-			log.info("Server Started");
-			server.join();
-		} catch (Exception e) {
-			log.error(ExceptionUtils.getFullStackTrace(e));
-		}
+			try {
+				server.start();
+				log.info("Jetty-Server Started");
+				server.join();
+			} catch (Exception e) {
+				log.error(ExceptionUtils.getFullStackTrace(e));
+			}
+		});
 	}
 
+    @Override
+    public void close() {
+        try {
+            server.stop();
+        } catch (Exception e) {
+            log.error(ExceptionUtils.getFullStackTrace(e));
+        }
+    }
 }
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java
index 2a7bd54..3397bd6 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java
@@ -21,10 +21,7 @@
 import java.util.AbstractMap;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.*;
 
 /**
  * Implements a Worker to process event-data pairs sent by the FPC Client
@@ -66,53 +63,56 @@
     }
 
     public void open() {
-        this.run = true;
-        Entry<String, Entry<String, String>> contents = null;
-        while (run) {
-            try {
-                contents = blockingQueue.take();
+        ExecutorService executorService = Executors.newFixedThreadPool(this.poolSize);
+        executorService.submit(() -> {
+            this.run = true;
+            Entry<String, Entry<String, String>> contents = null;
+            while (run) {
+                try {
+                    contents = blockingQueue.take();
 
-                // call RESTCONF service
-                ObjectMapper mapper = new ObjectMapper();
-                ObjectNode node = (ObjectNode) mapper.readTree(contents.getValue().getValue());
-                if (contents.getValue().getKey().contains("ietf-dmm-fpcagent:configure")) {
-                    CompletableFuture<RestconfRpcOutput> output = restconfService.runRpc(
-                            new URI("/onos/restconf/operations/ietf-dmm-fpcagent:configure"),
-                            node,
-                            contents.getKey()
-                    );
-                    ConfigureService.blockingQueue.add(
-                            new AbstractMap.SimpleEntry(
-                                    contents.getKey(),
-                                    "event:application/json;/restconf/operations/ietf-dmm-fpcagent:configure\ndata:" + output.get().output().toString() + "\r\n"
-                            )
-                    );
-                } else if (contents.getValue().getKey().contains("fpc:register-client")) {
-                    CompletableFuture<RestconfRpcOutput> output = restconfService.runRpc(
-                            new URI("/onos/restconf/operations/fpc:register-client"),
-                            node,
-                            contents.getKey()
-                    );
-                    addClientIdToUriMapping(output.get().output().toString(), contents.getKey());
-                    ConfigureService.registerClientQueue.add(
-                            new AbstractMap.SimpleEntry(
-                                    contents.getKey(),
-                                    "event:application/json;/restconf/operations/fpc:register_client\ndata:" + output.get().output().toString() + "\r\n"
-                            )
-                    );
-                } else if (contents.getValue().getKey().contains("fpc:deregister-client")) {
-                    CompletableFuture<RestconfRpcOutput> output = restconfService.runRpc(
-                            new URI("/onos/restconf/operations/fpc:deregister-client"),
-                            node,
-                            contents.getKey()
-                    );
-                    removeClientIdToUriMapping(contents.getValue().getValue());
+                    // call RESTCONF service
+                    ObjectMapper mapper = new ObjectMapper();
+                    ObjectNode node = (ObjectNode) mapper.readTree(contents.getValue().getValue());
+                    if (contents.getValue().getKey().contains("ietf-dmm-fpcagent:configure")) {
+                        CompletableFuture<RestconfRpcOutput> output = restconfService.runRpc(
+                                new URI("/onos/restconf/operations/ietf-dmm-fpcagent:configure"),
+                                node,
+                                contents.getKey()
+                        );
+                        ConfigureService.blockingQueue.add(
+                                new AbstractMap.SimpleEntry(
+                                        contents.getKey(),
+                                        "event:application/json;/restconf/operations/ietf-dmm-fpcagent:configure\ndata:" + output.get().output().toString() + "\r\n"
+                                )
+                        );
+                    } else if (contents.getValue().getKey().contains("fpc:register-client")) {
+                        CompletableFuture<RestconfRpcOutput> output = restconfService.runRpc(
+                                new URI("/onos/restconf/operations/fpc:register-client"),
+                                node,
+                                contents.getKey()
+                        );
+                        addClientIdToUriMapping(output.get().output().toString(), contents.getKey());
+                        ConfigureService.registerClientQueue.add(
+                                new AbstractMap.SimpleEntry(
+                                        contents.getKey(),
+                                        "event:application/json;/restconf/operations/fpc:register_client\ndata:" + output.get().output().toString() + "\r\n"
+                                )
+                        );
+                    } else if (contents.getValue().getKey().contains("fpc:deregister-client")) {
+                        CompletableFuture<RestconfRpcOutput> output = restconfService.runRpc(
+                                new URI("/onos/restconf/operations/fpc:deregister-client"),
+                                node,
+                                contents.getKey()
+                        );
+                        removeClientIdToUriMapping(contents.getValue().getValue());
+                    }
+
+                } catch (Exception e) {
+                    log.error(ExceptionUtils.getFullStackTrace(e));
                 }
-
-            } catch (Exception e) {
-                log.error(ExceptionUtils.getFullStackTrace(e));
             }
-        }
+        });
     }
 
     /**
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java
index edd2415..7e94ad4 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java
@@ -15,17 +15,45 @@
 import java.util.AbstractMap;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * A class that parses the request stream and enqueues creates event-data pairs for processing
  */
-public class ParseStream implements Runnable {
+public class ParseStream implements AutoCloseable {
     private static final Logger log = LoggerFactory.getLogger(ParseStream.class);
-    public static BlockingQueue<Map.Entry<String, CharBuffer>> blockingQueue = new LinkedBlockingQueue<Map.Entry<String, CharBuffer>>();
+    private BlockingQueue<Map.Entry<String, CharBuffer>> blockingQueue;
     private CharBuffer charBuf;
     private String partialChunk; //Chunked data left over at the end of the buffer
-    boolean chunkedBuffer;
+    private boolean chunkedBuffer;
+    private boolean run;
+
+    private static ParseStream _instance;
+
+    private ParseStream() {
+        blockingQueue = new LinkedBlockingQueue<>();
+    }
+
+    public static ParseStream createInstance() {
+        if (_instance == null) {
+            _instance = new ParseStream();
+        }
+        return _instance;
+    }
+
+    public static ParseStream getInstance() {
+        return _instance;
+    }
+
+    public void send(Map.Entry<String, CharBuffer> buf) {
+        try {
+            blockingQueue.put(buf);
+        } catch (InterruptedException e) {
+            log.error(ExceptionUtils.getFullStackTrace(e));
+        }
+    }
 
     private String getLine() {
         StringBuilder s = new StringBuilder();
@@ -61,41 +89,48 @@
         return null;
     }
 
-    @Override
-    public void run() {
-        chunkedBuffer = false;
-        String event = null, data = null;
-        while (true) {
-            try {
-                Map.Entry<String, CharBuffer> entry = blockingQueue.take();
-                this.charBuf = entry.getValue();
-                while (this.charBuf.hasRemaining()) {
-                    String line = getLine();
-                    if (line != null) {
-                        if (line.startsWith("event")) {
-                            event = line.split(":", 2)[1];
+    public void open() {
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        executorService.submit(() -> {
+            run = true;
+            chunkedBuffer = false;
+            String event = null, data = null;
+            while (run) {
+                try {
+                    Map.Entry<String, CharBuffer> entry = blockingQueue.take();
+                    this.charBuf = entry.getValue();
+                    while (this.charBuf.hasRemaining()) {
+                        String line = getLine();
+                        if (line != null) {
+                            if (line.startsWith("event")) {
+                                event = line.split(":", 2)[1];
 
-                        } else if (line.startsWith("data")) {
-                            data = line.split(":", 2)[1];
-                            if (event != null && data != null) {
-                                NBEventWorkerManager.getInstance()
-                                        .send(
-                                                new AbstractMap.SimpleEntry<>(
-                                                        entry.getKey(),
-                                                        new AbstractMap.SimpleEntry<>(event, data)
-                                                )
-                                        );
+                            } else if (line.startsWith("data")) {
+                                data = line.split(":", 2)[1];
+                                if (event != null && data != null) {
+                                    NBEventWorkerManager.getInstance()
+                                            .send(
+                                                    new AbstractMap.SimpleEntry<>(
+                                                            entry.getKey(),
+                                                            new AbstractMap.SimpleEntry<>(event, data)
+                                                    )
+                                            );
+                                }
+                                event = null;
+                                data = null;
                             }
-                            event = null;
-                            data = null;
                         }
-                    }
 
+                    }
+                } catch (InterruptedException e) {
+                    log.error(ExceptionUtils.getFullStackTrace(e));
                 }
-            } catch (InterruptedException e) {
-                log.error(ExceptionUtils.getFullStackTrace(e));
             }
-        }
+        });
     }
 
+    @Override
+    public void close() {
+        run = false;
+    }
 }