event stream implementation
diff --git a/apps/fpcagent/BUCK b/apps/fpcagent/BUCK
index 9d19b41..89b33cb 100644
--- a/apps/fpcagent/BUCK
+++ b/apps/fpcagent/BUCK
@@ -4,6 +4,7 @@
     '//lib:KRYO',
     '//lib:onos-yang-model',
     '//lib:httpclient-osgi',
+    '//lib:org.apache.httpcomponents.httpasyncclient-osgi',
     '//lib:httpcore-osgi',
     '//lib:netty',
     '//lib:javax.ws.rs-api',
@@ -13,7 +14,13 @@
     '//apps/config:onos-apps-config',
     '//models/fpcagent:onos-models-fpcagent',
     '//models/common:onos-models-common',
+    '//lib:jetty-http',
+    ':jetty-servlet',
+    ':jetty-annotations',
+    ':jetty-server-v9.4.6',
+    '//lib:jetty-util',
     ':zeromq',
+    '//lib:javax.servlet-api',
     ':json',
 ]
 
@@ -32,6 +39,9 @@
 EXCLUDED_BUNDLES = [
     ':zeromq',
     ':json',
+    ':jetty-servlet',
+    ':jetty-annotations',
+    ':jetty-server-v9.4.6',
 ]
 
 APPS = [
@@ -46,7 +56,7 @@
 osgi_jar_with_tests(
     deps = COMPILE_DEPS,
     test_deps = TEST_DEPS,
-    web_context = '/onos/fpcagent',
+    web_context = '/tmp',
     api_title = 'FPC REST API',
     api_package = 'org.onosproject.fpcagent.rest',
     api_version = '1.0',
@@ -81,3 +91,30 @@
   maven_coords = 'org.zeromq:jeromq:0.3.5',
   visibility = [ 'PUBLIC' ],
 )
+
+remote_jar (
+  name = 'jetty-annotations',
+  out = 'jetty-annotations-8.1.19.v20160209.jar',
+  url = 'mvn:org.eclipse.jetty:jetty-annotations:jar:8.1.19.v20160209',
+  sha1 = '649592e66a01b01956c19b80d87c03c8052413ee',
+  maven_coords = 'org.eclipse.jetty:jetty-annotations:8.1.19.v20160209',
+  visibility = [ 'PUBLIC' ],
+)
+
+remote_jar (
+  name = 'jetty-servlet',
+  out = 'jetty-servlet-8.1.19.v20160209.jar',
+  url = 'mvn:org.eclipse.jetty:jetty-servlet:jar:8.1.19.v20160209',
+  sha1 = '6872c3fc289de8f26a43b101741b33af36590cb4',
+  maven_coords = 'org.eclipse.jetty:jetty-servlet:8.1.19.v20160209',
+  visibility = [ 'PUBLIC' ],
+)
+
+remote_jar (
+  name = 'jetty-server-v9.4.6',
+  out = 'jetty-server-9.4.6.v20170531.jar',
+  url = 'mvn:org.eclipse.jetty:jetty-server:jar:9.4.6.v20170531',
+  sha1 = 'afda653f00267fb8b501cafd1cf5cdd1615602a2',
+  maven_coords = 'org.eclipse.jetty:jetty-server:9.4.6.v20170531',
+  visibility = [ 'PUBLIC' ],
+)
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
index d7d88e6..6478827 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
@@ -27,6 +27,9 @@
 import org.onosproject.fpcagent.providers.DpnDeviceListener;
 import org.onosproject.fpcagent.providers.DpnProviderService;
 import org.onosproject.fpcagent.util.ConfigHelper;
+import org.onosproject.fpcagent.util.eventStream.JettyServer;
+import org.onosproject.fpcagent.util.eventStream.NBEventWorkerManager;
+import org.onosproject.fpcagent.util.eventStream.ParseStream;
 import org.onosproject.fpcagent.workers.HTTPNotifier;
 import org.onosproject.fpcagent.workers.ZMQSBPublisherManager;
 import org.onosproject.fpcagent.workers.ZMQSBSubscriberManager;
@@ -88,6 +91,8 @@
     private FpcConfig fpcConfig;
     private boolean started = false;
     private HashSet<DpnDeviceListener> listeners = Sets.newHashSet();
+    private Thread jettyServer;
+    private Thread parseStream;
 
     /* Config */
     private ConfigFactory<ApplicationId, FpcConfig> fpcConfigConfigFactory =
@@ -108,7 +113,14 @@
 
         notificationIds = coreService.getIdGenerator("fpc-notification-ids");
 
-        HTTPNotifier.createInstance(modelConverter).open();
+        jettyServer = new Thread(JettyServer::init);
+        parseStream = new Thread(new ParseStream());
+
+        jettyServer.start();
+        parseStream.start();
+
+        NBEventWorkerManager.createInstance(20).open();
+        HTTPNotifier.createInstance().open();
 
         log.info("FPC Service Started");
     }
@@ -125,8 +137,12 @@
             started = false;
         }
 
+        NBEventWorkerManager.getInstance().close();
         HTTPNotifier.getInstance().close();
 
+        jettyServer.stop();
+        parseStream.stop();
+
         log.info("FPC Service Stopped");
     }
 
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcService.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcService.java
index d0b1217..4a29287 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcService.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcService.java
@@ -8,13 +8,14 @@
 import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.configuredpn.DefaultConfigureDpnOutput;
 import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.opinput.opbody.CreateOrUpdate;
 import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.opinput.opbody.DeleteOrQuery;
+import org.onosproject.yang.model.RpcInput;
+import org.onosproject.yang.model.RpcOutput;
 
 /**
  * Main service that handles RPC events and DC Store modifications.
  */
 @Beta
 public interface FpcRpcService {
-
     /**
      * Handles create Configure operations that are invoked through RPC.
      *
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/rest/FpcWebResource.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/rest/FpcWebResource.java
index 16b891e..2e336fe 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/rest/FpcWebResource.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/rest/FpcWebResource.java
@@ -16,39 +16,87 @@
 
 package org.onosproject.fpcagent.rest;
 
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.http.HttpVersion;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.BasicResponseHandler;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
 import org.onosproject.rest.AbstractWebResource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.ws.rs.Consumes;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.core.MediaType;
-import javax.ws.rs.Consumes;
 import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
 
 @Path("/")
 public class FpcWebResource extends AbstractWebResource {
     private static final Logger log =
             LoggerFactory.getLogger(FpcWebResource.class);
 
+    private ResponseHandler<String> handler = new BasicResponseHandler();
+
     @POST
     @Consumes(MediaType.APPLICATION_JSON)
-    @Path("/response")
-    public Response getResponse() {
+    @Path("/request")
+    public Response getRequest(InputStream stream) {
+        try {
+            ObjectNode node = (ObjectNode) mapper().readTree(stream);
+
+            log.info("Received {}", node.toString());
+
+            CloseableHttpClient client = HttpClients.createDefault();
+            HttpGet httpGet = new HttpGet(node.get("client-uri").asText());
+
+            httpGet.addHeader("User-Agent", "ONOS FPC Agent");
+            httpGet.addHeader("Charset", "utf-8");
+            httpGet.addHeader("Content-type", "text/event-stream");
+            httpGet.setProtocolVersion(HttpVersion.HTTP_1_1);
+
+            CloseableHttpResponse httpResponse = client.execute(httpGet);
+
+            String response = handler.handleResponse(httpResponse);
+            log.info("Response {}", response);
+        } catch (IOException e) {
+            log.error(ExceptionUtils.getFullStackTrace(e));
+        }
+
         return Response.ok().build();
     }
 
     @POST
     @Consumes(MediaType.APPLICATION_JSON)
-    @Path("/request")
-    public Response getRequest() {
+    @Path("/response")
+    public Response getResponse(InputStream stream) {
+        try {
+            ObjectNode node = (ObjectNode) mapper().readTree(stream);
+
+            log.info("Received {}", node.toString());
+        } catch (IOException e) {
+            log.error(ExceptionUtils.getFullStackTrace(e));
+        }
         return Response.ok().build();
     }
 
     @POST
     @Consumes(MediaType.APPLICATION_JSON)
     @Path("/notification")
-    public Response getNotification() {
+    public Response getNotification(InputStream stream) {
+        try {
+            ObjectNode node = (ObjectNode) mapper().readTree(stream);
+
+            log.info("Received {}", node.toString());
+        } catch (IOException e) {
+            log.error(ExceptionUtils.getFullStackTrace(e));
+        }
         return Response.ok().build();
     }
 }
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ConfigureService.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ConfigureService.java
new file mode 100644
index 0000000..1c90115
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ConfigureService.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright © 2016 - 2017 Copyright (c) Sprint, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.onosproject.fpcagent.util.eventStream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.onosproject.restconf.utils.RestconfUtils;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.DefaultYangAutoPrefixNotify;
+import org.onosproject.yang.model.DefaultModelObjectData;
+import org.onosproject.yang.model.ResourceData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.ServletContextEvent;
+import javax.servlet.ServletContextListener;
+import javax.servlet.ServletOutputStream;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.onosproject.fpcagent.util.FpcUtil.modelConverter;
+import static org.onosproject.fpcagent.util.FpcUtil.module;
+
+
+/**
+ * A servlet context listener that sends response (ConfigureOutput) events to clients.
+ */
+public class ConfigureService implements ServletContextListener {
+
+    private static final Logger log = LoggerFactory.getLogger(ConfigureService.class);
+    //key = client-id, value = configure-output
+    public static BlockingQueue<Map.Entry<String, DefaultYangAutoPrefixNotify>> blockingQueue = new LinkedBlockingQueue<>();
+    public static BlockingQueue<Map.Entry<String, String>> registerClientQueue = new LinkedBlockingQueue<>();
+
+    public void contextInitialized(ServletContextEvent sce) {
+        Map<String, AsyncContext> responseStreams = new ConcurrentHashMap<>();
+        sce.getServletContext().setAttribute("responseStreams", responseStreams);
+        Thread thread = new Thread(() -> {
+            String clientUri = null;
+            AsyncContext asyncContext = null;
+            while (true) {
+                try {
+                    Map.Entry<String, DefaultYangAutoPrefixNotify> entry = blockingQueue.take();
+                    clientUri = entry.getKey();
+                    DefaultYangAutoPrefixNotify notify = entry.getValue();
+
+                    ResourceData dataNode = modelConverter.createDataNode(
+                            DefaultModelObjectData.builder()
+                                    .addModelObject(notify)
+                                    .build()
+                    );
+                    ObjectNode jsonNodes = RestconfUtils.convertDataNodeToJson(module, dataNode.dataNodes().get(0));
+                    ObjectMapper mapper = new ObjectMapper();
+                    String value = mapper.writeValueAsString(jsonNodes);
+
+                    asyncContext = responseStreams.get(clientUri);
+                    if (responseStreams.get(clientUri) != null) {
+                        ServletOutputStream out = asyncContext.getResponse().getOutputStream();
+                        out.write(value.getBytes());
+                        out.flush();
+                        asyncContext.getResponse().flushBuffer();
+                    }
+                } catch (Exception e) {
+                    log.error(ExceptionUtils.getFullStackTrace(e));
+                    asyncContext.complete();
+                    responseStreams.remove(clientUri);
+                    break;
+                }
+            }
+        });
+        thread.start();
+
+        Thread thread1 = new Thread(() -> {
+            String clientUri = null;
+            AsyncContext asyncContext = null;
+            while (true) {
+                try {
+                    Map.Entry<String, String> entry = registerClientQueue.take();
+                    clientUri = entry.getKey();
+                    asyncContext = responseStreams.get(clientUri);
+                    if (responseStreams.get(entry.getKey()) != null) {
+                        ServletOutputStream out = asyncContext.getResponse().getOutputStream();
+                        out.write(entry.getValue().getBytes());
+                        out.flush();
+                        asyncContext.getResponse().flushBuffer();
+                    }
+                } catch (Exception e) {
+                    log.error(ExceptionUtils.getFullStackTrace(e));
+                    asyncContext.complete();
+                    responseStreams.remove(clientUri);
+                    break;
+                }
+            }
+        });
+        thread1.start();
+
+    }
+
+    public void contextDestroyed(ServletContextEvent sce) {
+    }
+}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventClient.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventClient.java
new file mode 100644
index 0000000..ec480cb
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventClient.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright © 2016 - 2017 Copyright (c) Sprint, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.onosproject.fpcagent.util.eventStream;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.impl.nio.client.HttpAsyncClients;
+import org.apache.http.nio.IOControl;
+import org.apache.http.nio.client.methods.AsyncCharConsumer;
+import org.apache.http.nio.client.methods.HttpAsyncMethods;
+import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
+import org.apache.http.protocol.HttpContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.CharBuffer;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+
+/**
+ * A HTTP client that sends a request to a FPC Client to initiate the request stream.
+ */
+public class EventClient {
+    private static final Logger log = LoggerFactory.getLogger(EventClient.class);
+
+    private static final CloseableHttpAsyncClient client = HttpAsyncClients.createDefault();
+    private static ArrayList<String> clientUriList = new ArrayList<String>();
+    protected String clientUri;
+
+    /**
+     * Send HttpRequest to Client
+     *
+     * @param uri - FPC Client Uri
+     */
+    public void connectToClient(String uri) {
+        this.clientUri = uri;
+        try {
+            client.start();
+            HttpAsyncRequestProducer get = HttpAsyncMethods.createGet(this.clientUri);
+            client.execute(get, new MyResponseConsumer(this.clientUri), null);
+        } catch (Exception e) {
+            log.error(ExceptionUtils.getFullStackTrace(e));
+        }
+    }
+
+    /**
+     * A character consumer to read incoming characters on the request stream
+     */
+    static class MyResponseConsumer extends AsyncCharConsumer<Boolean> {
+        private String clientUri;
+
+        /**
+         * Constructor
+         *
+         * @param clientUri - URI of the FPC Client
+         */
+        public MyResponseConsumer(String clientUri) {
+            this.clientUri = clientUri;
+        }
+
+        @Override
+        protected void onResponseReceived(final HttpResponse response) {
+        }
+
+        @Override
+        protected void onCharReceived(final CharBuffer buf, final IOControl ioctrl) {
+            try {
+                char[] charArray = new char[buf.remaining()];
+                System.arraycopy(buf.array(), 0, charArray, 0, buf.remaining());
+                CharBuffer charBuffer = CharBuffer.wrap(charArray);
+                ParseStream.blockingQueue.put(new AbstractMap.SimpleEntry<>(clientUri, charBuffer));
+            } catch (InterruptedException e) {
+                log.error(ExceptionUtils.getFullStackTrace(e));
+            }
+        }
+
+        @Override
+        protected void releaseResources() {
+        }
+
+        @Override
+        protected Boolean buildResult(final HttpContext context) {
+            return Boolean.TRUE;
+        }
+
+    }
+}
+
+
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventServer.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventServer.java
new file mode 100644
index 0000000..3313188
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventServer.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright © 2016 - 2017 Copyright (c) Sprint, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.onosproject.fpcagent.util.eventStream;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.HttpSession;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A Http Servlet to server response streams
+ */
+public class EventServer extends HttpServlet {
+    private static final Logger log = LoggerFactory.getLogger(EventServer.class);
+    private static ArrayList<String> clientUriList = new ArrayList<String>();
+
+    /**
+     * Method for stream initialization
+     *
+     * @param clientUri - Client Uri
+     * @param request   - The servlet request object
+     * @param response  - The servlet Response Object
+     */
+    private void init(String clientUri, HttpServletRequest request, HttpServletResponse response) {
+        log.info("Response Stream Inititated");
+        try {
+            HttpSession session = request.getSession();
+            session.setMaxInactiveInterval(72 * 60 * 60);
+            EventClient client = new EventClient();
+            client.connectToClient(clientUri);
+            response.setHeader("Content-Type", "text/event-stream");
+            response.setHeader("Cache-Control", "no-cache, no-store");
+            response.setHeader("Connection", "keep-alive");
+            AsyncContext asyncContext = request.startAsync(request, response);
+            asyncContext.setTimeout(72 * 60 * 60 * 1000);
+            asyncContext.getResponse().setBufferSize(1200);
+            try {
+                asyncContext.getResponse().flushBuffer();
+            } catch (IOException e) {
+                log.error(ExceptionUtils.getFullStackTrace(e));
+            }
+            ServletContext servletContext = request.getServletContext();
+            Map<String, AsyncContext> responseStreams = (ConcurrentHashMap<String, AsyncContext>) servletContext.getAttribute("responseStreams");
+            responseStreams.put(clientUri, asyncContext);
+        } catch (Exception e) {
+            log.error(ExceptionUtils.getFullStackTrace(e));
+        }
+    }
+
+    protected void doPost(HttpServletRequest request, HttpServletResponse response) {
+        try {
+            String clientUri = null;
+            StringBuffer jsonStringBuilder = new StringBuffer();
+            String line = null;
+            try {
+                BufferedReader br = request.getReader();
+                while ((line = br.readLine()) != null)
+                    jsonStringBuilder.append(line);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+
+            try {
+                if (jsonStringBuilder.length() > 0) {
+                    JSONObject jsonObj = new JSONObject(jsonStringBuilder.toString());
+                    clientUri = jsonObj.getString("client-uri");
+                    if (!clientUriList.contains(clientUri)) {
+                        init(clientUri, request, response);
+                    }
+                    jsonStringBuilder.setLength(0);
+                }
+            } catch (JSONException e) {
+                log.error(ExceptionUtils.getFullStackTrace(e));
+            }
+        } catch (Exception e) {
+            log.error(ExceptionUtils.getFullStackTrace(e));
+        }
+
+    }
+}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java
new file mode 100644
index 0000000..deba165
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright © 2016 - 2017 Copyright (c) Sprint, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.onosproject.fpcagent.util.eventStream;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SSE Server implementation
+ */
+public class JettyServer {
+	private static final Logger log = LoggerFactory.getLogger(JettyServer.class);
+
+	/**
+	 * Method used to initialize and start the SSE server
+	 */
+	public static void init() {
+		Server server = new Server(8070);
+		ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+		context.setContextPath("/");
+
+		ServletHolder requestServletHolder = new ServletHolder(EventServer.class);
+		context.addServlet(requestServletHolder, "/response");
+
+		ServletHolder notificationServletHolder = new ServletHolder(NotificationServer.class);
+		context.addServlet(notificationServletHolder, "/notification");
+
+		server.setHandler(context);
+		context.addEventListener(new ConfigureService());
+		context.addEventListener(new NotificationService());
+
+		try {
+			server.start();
+			log.info("Server Started");
+			server.join();
+		} catch (Exception e) {
+			log.error(ExceptionUtils.getFullStackTrace(e));
+		}
+	}
+
+}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java
new file mode 100644
index 0000000..2a7bd54
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright © 2016 - 2017 Copyright (c) Sprint, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.onosproject.fpcagent.util.eventStream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.onosproject.restconf.api.RestconfRpcOutput;
+import org.onosproject.restconf.api.RestconfService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Implements a Worker to process event-data pairs sent by the FPC Client
+ */
+public class NBEventWorkerManager implements AutoCloseable {
+    private static final Logger log = LoggerFactory.getLogger(NBEventWorkerManager.class);
+    private static NBEventWorkerManager _instance = null;
+
+    public static Map<Integer, String> clientIdToUri = new ConcurrentHashMap<Integer, String>();
+
+    private final BlockingQueue<Entry<String, Entry<String, String>>> blockingQueue;
+    private final int poolSize;
+    private boolean run;
+    private final RestconfService restconfService;
+
+    protected NBEventWorkerManager(int poolSize, RestconfService restconfService) {
+        this.poolSize = poolSize;
+        this.restconfService = restconfService;
+        this.blockingQueue = new LinkedBlockingQueue<>();
+    }
+
+    public static NBEventWorkerManager createInstance(int poolSize, RestconfService restconfService) {
+        if (_instance == null) {
+            _instance = new NBEventWorkerManager(poolSize, restconfService);
+        }
+        return _instance;
+    }
+
+    public static NBEventWorkerManager getInstance() {
+        return _instance;
+    }
+
+    public void send(Entry<String, Entry<String, String>> buf) {
+        try {
+            blockingQueue.put(buf);
+        } catch (InterruptedException e) {
+            log.error(ExceptionUtils.getFullStackTrace(e));
+        }
+    }
+
+    public void open() {
+        this.run = true;
+        Entry<String, Entry<String, String>> contents = null;
+        while (run) {
+            try {
+                contents = blockingQueue.take();
+
+                // call RESTCONF service
+                ObjectMapper mapper = new ObjectMapper();
+                ObjectNode node = (ObjectNode) mapper.readTree(contents.getValue().getValue());
+                if (contents.getValue().getKey().contains("ietf-dmm-fpcagent:configure")) {
+                    CompletableFuture<RestconfRpcOutput> output = restconfService.runRpc(
+                            new URI("/onos/restconf/operations/ietf-dmm-fpcagent:configure"),
+                            node,
+                            contents.getKey()
+                    );
+                    ConfigureService.blockingQueue.add(
+                            new AbstractMap.SimpleEntry(
+                                    contents.getKey(),
+                                    "event:application/json;/restconf/operations/ietf-dmm-fpcagent:configure\ndata:" + output.get().output().toString() + "\r\n"
+                            )
+                    );
+                } else if (contents.getValue().getKey().contains("fpc:register-client")) {
+                    CompletableFuture<RestconfRpcOutput> output = restconfService.runRpc(
+                            new URI("/onos/restconf/operations/fpc:register-client"),
+                            node,
+                            contents.getKey()
+                    );
+                    addClientIdToUriMapping(output.get().output().toString(), contents.getKey());
+                    ConfigureService.registerClientQueue.add(
+                            new AbstractMap.SimpleEntry(
+                                    contents.getKey(),
+                                    "event:application/json;/restconf/operations/fpc:register_client\ndata:" + output.get().output().toString() + "\r\n"
+                            )
+                    );
+                } else if (contents.getValue().getKey().contains("fpc:deregister-client")) {
+                    CompletableFuture<RestconfRpcOutput> output = restconfService.runRpc(
+                            new URI("/onos/restconf/operations/fpc:deregister-client"),
+                            node,
+                            contents.getKey()
+                    );
+                    removeClientIdToUriMapping(contents.getValue().getValue());
+                }
+
+            } catch (Exception e) {
+                log.error(ExceptionUtils.getFullStackTrace(e));
+            }
+        }
+    }
+
+    /**
+     * Add a FPC client URI to FPC client id mapping
+     *
+     * @param registerClientOutput - Output of the register_cleint rpc
+     * @param uri                  - FPC Client URI
+     */
+    private void addClientIdToUriMapping(String registerClientOutput, String uri) {
+        try {
+            JSONObject registerJSON = new JSONObject(registerClientOutput);
+            clientIdToUri.put(Integer.parseInt(registerJSON.getJSONObject("output").getString("client-id")), uri);
+        } catch (JSONException e) {
+            log.error(ExceptionUtils.getFullStackTrace(e));
+        }
+    }
+
+    /**
+     * Remove a FPC client URI to FPCClient Id mapping
+     *
+     * @param deregisterClientInput - Input String of deregister_client rpc
+     */
+    private void removeClientIdToUriMapping(String deregisterClientInput) {
+        try {
+            JSONObject deregisterJSON = new JSONObject(deregisterClientInput);
+            clientIdToUri.remove(Integer.parseInt(deregisterJSON.getJSONObject("input").getString("client-id")));
+        } catch (JSONException e) {
+            log.error(ExceptionUtils.getFullStackTrace(e));
+        }
+    }
+
+    @Override
+    public void close() {
+        this.run = false;
+    }
+}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NotificationServer.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NotificationServer.java
new file mode 100644
index 0000000..65150f8
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NotificationServer.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright © 2016 - 2017 Copyright (c) Sprint, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.onosproject.fpcagent.util.eventStream;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.HttpSession;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.opendaylight.fpc.utils.ErrorLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A HTTP servlet that handles requests for notification streams
+ */
+public class NotificationServer extends HttpServlet {
+	private static final Logger log = LoggerFactory.getLogger(EventServer.class);
+
+	protected void doPost( HttpServletRequest request, HttpServletResponse response){
+		log.info("Notification stream Inititated");
+		String clientId = null;
+		StringBuffer jsonStringBuilder = new StringBuffer();
+		String line = null;
+		try {
+			BufferedReader br = request.getReader();
+			while ((line = br.readLine()) != null)
+				jsonStringBuilder.append(line);
+		} catch (Exception e) { e.printStackTrace(); }
+
+		try {
+			if(jsonStringBuilder.length() > 0){
+				JSONObject jsonObj =  new JSONObject(jsonStringBuilder.toString());
+				clientId = jsonObj.getString("client-id");
+				jsonStringBuilder.setLength(0);
+				HttpSession session = request.getSession();
+				session.setMaxInactiveInterval(72*60*60);
+				response.setHeader("Content-Type", "text/event-stream");
+				response.setHeader("Cache-Control", "no-cache, no-store");
+				response.setHeader("Connection", "keep-alive");
+				AsyncContext asyncContext = request.startAsync(request,response);
+				asyncContext.setTimeout(72*60*60*1000);
+				asyncContext.getResponse().setBufferSize(1200);
+				try {
+					asyncContext.getResponse().flushBuffer();
+				} catch (IOException e1) {
+					ErrorLog.logError(e1.getMessage(),e1.getStackTrace());
+				}
+				ServletContext servletContext = request.getServletContext();
+				Map<String,AsyncContext> notificationStreams = (ConcurrentHashMap<String,AsyncContext>) servletContext.getAttribute("notificationStreams");
+				notificationStreams.put(clientId,asyncContext);
+				log.info("Client Id received in the notification stream request: "+clientId);
+			}
+		} catch (JSONException e) {
+			ErrorLog.logError(e.getLocalizedMessage(),e.getStackTrace());
+		}
+	}
+}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NotificationService.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NotificationService.java
new file mode 100644
index 0000000..13edc1e
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NotificationService.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright © 2016 - 2017 Copyright (c) Sprint, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.onosproject.fpcagent.util.eventStream;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.ServletContextEvent;
+import javax.servlet.ServletContextListener;
+import javax.servlet.ServletOutputStream;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+
+/**
+ * A servlet context listener that sends event-data pairs for config result notifications
+ */
+public class NotificationService implements ServletContextListener {
+    private static final Logger log = LoggerFactory.getLogger(NotificationService.class);
+
+    //key = client id, value = config-result-notification
+    public static BlockingQueue<Map.Entry<String, String>> blockingQueue = new LinkedBlockingQueue<Map.Entry<String, String>>();
+
+    public void contextInitialized(ServletContextEvent sce) {
+        Map<String, AsyncContext> notificationStreams = new ConcurrentHashMap<String, AsyncContext>();
+        sce.getServletContext().setAttribute("notificationStreams", notificationStreams);
+        Thread thread = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                String clientId = null;
+                AsyncContext asyncContext = null;
+                while (true) {
+                    try {
+                        Map.Entry<String, String> entry = blockingQueue.take();
+                        clientId = entry.getKey();
+                        asyncContext = notificationStreams.get(clientId);
+                        if (notificationStreams.get(entry.getKey()) != null) {
+                            ServletOutputStream out = asyncContext.getResponse().getOutputStream();
+                            out.write(entry.getValue().getBytes());
+                            out.flush();
+                            asyncContext.getResponse().flushBuffer();
+                        }
+                    } catch (Exception e) {
+                        log.error(ExceptionUtils.getFullStackTrace(e));
+                        asyncContext.complete();
+                        notificationStreams.remove(clientId);
+                        break;
+                    }
+                }
+            }
+
+        });
+        thread.start();
+
+    }
+
+    public void contextDestroyed(ServletContextEvent sce) {
+    }
+}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java
new file mode 100644
index 0000000..edd2415
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright © 2016 - 2017 Copyright (c) Sprint, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.onosproject.fpcagent.util.eventStream;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.CharBuffer;
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * A class that parses the request stream and enqueues creates event-data pairs for processing
+ */
+public class ParseStream implements Runnable {
+    private static final Logger log = LoggerFactory.getLogger(ParseStream.class);
+    public static BlockingQueue<Map.Entry<String, CharBuffer>> blockingQueue = new LinkedBlockingQueue<Map.Entry<String, CharBuffer>>();
+    private CharBuffer charBuf;
+    private String partialChunk; //Chunked data left over at the end of the buffer
+    boolean chunkedBuffer;
+
+    private String getLine() {
+        StringBuilder s = new StringBuilder();
+        if (this.chunkedBuffer) {
+            while (this.charBuf.hasRemaining()) {
+                char ch = this.charBuf.get();
+                if (ch == '\n') {
+                    this.partialChunk += s.toString();
+                    this.chunkedBuffer = false;
+                    return this.partialChunk;
+                }
+                if (ch == '\r') {
+                    continue;
+                }
+                s.append(ch);
+            }
+        }
+        while (this.charBuf.hasRemaining()) {
+            char ch = this.charBuf.get();
+            if (ch == '\n') {
+                String line = s.toString();
+                return line.length() > 0 ? line : null;
+            }
+            if (ch == '\r') {
+                continue;
+            }
+            s.append(ch);
+            if (!this.charBuf.hasRemaining()) {
+                this.partialChunk = s.toString();
+                this.chunkedBuffer = true;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public void run() {
+        chunkedBuffer = false;
+        String event = null, data = null;
+        while (true) {
+            try {
+                Map.Entry<String, CharBuffer> entry = blockingQueue.take();
+                this.charBuf = entry.getValue();
+                while (this.charBuf.hasRemaining()) {
+                    String line = getLine();
+                    if (line != null) {
+                        if (line.startsWith("event")) {
+                            event = line.split(":", 2)[1];
+
+                        } else if (line.startsWith("data")) {
+                            data = line.split(":", 2)[1];
+                            if (event != null && data != null) {
+                                NBEventWorkerManager.getInstance()
+                                        .send(
+                                                new AbstractMap.SimpleEntry<>(
+                                                        entry.getKey(),
+                                                        new AbstractMap.SimpleEntry<>(event, data)
+                                                )
+                                        );
+                            }
+                            event = null;
+                            data = null;
+                        }
+                    }
+
+                }
+            } catch (InterruptedException e) {
+                log.error(ExceptionUtils.getFullStackTrace(e));
+            }
+        }
+    }
+
+}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/HTTPNotifier.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/HTTPNotifier.java
index 6f10664..d3755ff 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/HTTPNotifier.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/HTTPNotifier.java
@@ -20,20 +20,19 @@
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.http.HttpResponse;
+import org.apache.http.HttpVersion;
 import org.apache.http.client.ResponseHandler;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.BasicResponseHandler;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
-import org.apache.http.HttpVersion;
 import org.json.JSONObject;
 import org.onosproject.fpcagent.protocols.DpnNgicCommunicator;
 import org.onosproject.restconf.utils.RestconfUtils;
 import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.DefaultYangAutoPrefixNotify;
 import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.yangautoprefixnotify.value.DownlinkDataNotification;
 import org.onosproject.yang.model.DefaultModelObjectData;
-import org.onosproject.yang.model.ModelConverter;
 import org.onosproject.yang.model.ResourceData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,25 +43,24 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import static org.onosproject.fpcagent.util.FpcUtil.modelConverter;
 import static org.onosproject.fpcagent.util.FpcUtil.module;
 
 public class HTTPNotifier implements AutoCloseable {
     private static final Logger log = LoggerFactory.getLogger(HTTPNotifier.class);
     private static HTTPNotifier _instance = null;
     private final BlockingQueue<Map.Entry<String, DefaultYangAutoPrefixNotify>> blockingQueue;
-    public ModelConverter modelConverter = null;
     private boolean run;
     private ResponseHandler<String> handler = new BasicResponseHandler();
 
-    protected HTTPNotifier(ModelConverter modelConverter) {
+    protected HTTPNotifier() {
         this.run = true;
         this.blockingQueue = new LinkedBlockingQueue<>();
-        this.modelConverter = modelConverter;
     }
 
-    public static HTTPNotifier createInstance(ModelConverter modelConverter) {
+    public static HTTPNotifier createInstance() {
         if (_instance == null) {
-            _instance = new HTTPNotifier(modelConverter);
+            _instance = new HTTPNotifier();
         }
         return _instance;
     }