RESTful SSE
diff --git a/apps/fpcagent/BUCK b/apps/fpcagent/BUCK
index 9d19b41..2214f2c 100644
--- a/apps/fpcagent/BUCK
+++ b/apps/fpcagent/BUCK
@@ -7,10 +7,12 @@
     '//lib:httpcore-osgi',
     '//lib:netty',
     '//lib:javax.ws.rs-api',
+    '//lib:org.apache.httpcomponents.httpasyncclient-osgi',
     '//utils/rest:onlab-rest',
     '//core/store/serializers:onos-core-serializers',
     '//apps/restconf/utils:onos-apps-restconf-utils',
     '//apps/config:onos-apps-config',
+    '//apps/restconf/api:onos-apps-restconf-api',
     '//models/fpcagent:onos-models-fpcagent',
     '//models/common:onos-models-common',
     ':zeromq',
@@ -27,6 +29,7 @@
     '//apps/fpcagent:onos-apps-fpcagent',
     '//lib:httpclient-osgi',
     '//lib:httpcore-osgi',
+    '//lib:org.apache.httpcomponents.httpasyncclient-osgi',
 ]
 
 EXCLUDED_BUNDLES = [
@@ -46,7 +49,7 @@
 osgi_jar_with_tests(
     deps = COMPILE_DEPS,
     test_deps = TEST_DEPS,
-    web_context = '/onos/fpcagent',
+    web_context = '/',
     api_title = 'FPC REST API',
     api_package = 'org.onosproject.fpcagent.rest',
     api_version = '1.0',
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
index d7d88e6..02d8037 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
@@ -27,11 +27,15 @@
 import org.onosproject.fpcagent.providers.DpnDeviceListener;
 import org.onosproject.fpcagent.providers.DpnProviderService;
 import org.onosproject.fpcagent.util.ConfigHelper;
+import org.onosproject.fpcagent.util.eventStream.SseManager;
+import org.onosproject.fpcagent.util.eventStream.NBEventWorkerManager;
+import org.onosproject.fpcagent.util.eventStream.ParseStream;
 import org.onosproject.fpcagent.workers.HTTPNotifier;
 import org.onosproject.fpcagent.workers.ZMQSBPublisherManager;
 import org.onosproject.fpcagent.workers.ZMQSBSubscriberManager;
 import org.onosproject.net.config.*;
 import org.onosproject.net.config.basics.SubjectFactories;
+import org.onosproject.restconf.api.RestconfService;
 import org.onosproject.yang.model.ModelConverter;
 import org.onosproject.yang.model.RpcRegistry;
 import org.slf4j.Logger;
@@ -83,6 +87,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private DpnProviderService dpnProviderService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private RestconfService restconfService;
+
     /* Variables */
     private IdGenerator notificationIds;
     private FpcConfig fpcConfig;
@@ -108,7 +115,10 @@
 
         notificationIds = coreService.getIdGenerator("fpc-notification-ids");
 
+        SseManager.createInstance().open();
         HTTPNotifier.createInstance(modelConverter).open();
+        ParseStream.createInstance().open();
+        NBEventWorkerManager.createInstance(10, restconfService).open();
 
         log.info("FPC Service Started");
     }
@@ -125,7 +135,10 @@
             started = false;
         }
 
+        SseManager.getInstance().close();
         HTTPNotifier.getInstance().close();
+        ParseStream.getInstance().close();
+        NBEventWorkerManager.getInstance().close();
 
         log.info("FPC Service Stopped");
     }
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java
index ec79725..e082b33 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java
@@ -191,7 +191,7 @@
     ) throws Exception {
         DefaultConfigureOutput configureOutput = new DefaultConfigureOutput();
         Collection<Callable<Object>> tasks = new ArrayList<>();
-        // Get this clients tenant.
+        // Get this clientUriToHttpClient tenant.
         FpcIdentity tenantId = clientInfo.tenantId();
         DefaultCommonSuccess defaultCommonSuccess = new DefaultCommonSuccess();
         // Load cache for specific tenant.
@@ -355,7 +355,7 @@
     ) throws Exception {
         DefaultConfigureOutput configureOutput = new DefaultConfigureOutput();
         Collection<Callable<Object>> tasks = new ArrayList<>();
-        // Get this clients tenant.
+        // Get this clientUriToHttpClient tenant.
         FpcIdentity tenantId = clientInfo.tenantId();
         DefaultCommonSuccess defaultCommonSuccess = new DefaultCommonSuccess();
         // Load cache for specific tenant.
@@ -925,7 +925,7 @@
                     }
                     // keep information for each client. this can be moved to the DC Store and use Cache.
                     clientInfo.put(input.clientId(), input);
-                    // keep clients for each tenant
+                    // keep clientUriToHttpClient for each tenant
                     HashSet<ClientIdentifier> hashSet = tenantInfo.getOrDefault(input.tenantId(), Sets.newHashSet());
                     hashSet.add(input.clientId());
                     tenantInfo.put(input.tenantId(), hashSet);
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/rest/FpcWebResource.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/rest/FpcWebResource.java
index 16b891e..52f7be2 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/rest/FpcWebResource.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/rest/FpcWebResource.java
@@ -16,40 +16,150 @@
 
 package org.onosproject.fpcagent.rest;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.impl.nio.client.HttpAsyncClients;
+import org.apache.http.nio.IOControl;
+import org.apache.http.nio.client.methods.AsyncCharConsumer;
+import org.apache.http.nio.client.methods.HttpAsyncMethods;
+import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
+import org.apache.http.protocol.HttpContext;
+import org.onosproject.fpcagent.util.eventStream.SseManager;
+import org.onosproject.fpcagent.util.eventStream.ParseStream;
 import org.onosproject.rest.AbstractWebResource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.ws.rs.Consumes;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.core.MediaType;
-import javax.ws.rs.Consumes;
 import javax.ws.rs.core.Response;
+import java.io.InputStream;
+import java.nio.CharBuffer;
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
 
 @Path("/")
 public class FpcWebResource extends AbstractWebResource {
-    private static final Logger log =
-            LoggerFactory.getLogger(FpcWebResource.class);
+    private static final Logger log = LoggerFactory.getLogger(FpcWebResource.class);
 
     @POST
     @Consumes(MediaType.APPLICATION_JSON)
     @Path("/response")
-    public Response getResponse() {
-        return Response.ok().build();
+    public Response getResponse(InputStream stream) {
+        JsonNode jsonNode = mapper().valueToTree(stream);
+        JsonNode clientUri = jsonNode.get("client-uri");
+
+        log.info("Received {}", clientUri);
+        if (!clientUri.isMissingNode()) {
+            String uri = clientUri.asText();
+            Map<String, CloseableHttpAsyncClient> clients = SseManager.getInstance().clientUriToHttpClient;
+            if (!clients.containsKey(uri)) {
+                Executors.newSingleThreadExecutor().submit(() -> {
+                    try {
+                        CloseableHttpAsyncClient client = HttpAsyncClients.createDefault();
+                        clients.put(uri, client);
+                        HttpAsyncRequestProducer get = HttpAsyncMethods.createGet(uri);
+                        client.execute(get, new MyResponseConsumer(uri), null);
+                    } catch (Exception e) {
+                        log.error(ExceptionUtils.getFullStackTrace(e));
+                    }
+                });
+            }
+        }
+
+        return Response.ok()
+                .header("Content-Type", "text/event-stream")
+                .header("Cache-Control", "no-cache, no-store")
+                .header("Connection", "keep-alive")
+                .build();
     }
 
     @POST
     @Consumes(MediaType.APPLICATION_JSON)
     @Path("/request")
-    public Response getRequest() {
-        return Response.ok().build();
+    public Response getRequest(InputStream stream) {
+        JsonNode jsonNode = mapper().valueToTree(stream);
+
+        return Response.ok()
+                .header("Content-Type", "text/event-stream")
+                .header("Cache-Control", "no-cache, no-store")
+                .header("Connection", "keep-alive")
+                .build();
     }
 
     @POST
     @Consumes(MediaType.APPLICATION_JSON)
     @Path("/notification")
-    public Response getNotification() {
-        return Response.ok().build();
+    public Response getNotification(InputStream stream) {
+        JsonNode jsonNode = mapper().valueToTree(stream);
+        JsonNode clientId = jsonNode.get("client-id");
+
+        log.info("Received {}", clientId);
+        if (!clientId.isMissingNode()) {
+            String uri = clientId.asText();
+            Map<String, CloseableHttpAsyncClient> clients = SseManager.getInstance().clientUriToHttpClient;
+            if (!clients.containsKey(uri)) {
+                Executors.newSingleThreadExecutor().submit(() -> {
+                    try {
+                        CloseableHttpAsyncClient client = HttpAsyncClients.createDefault();
+                        clients.put(uri, client);
+                        HttpAsyncRequestProducer get = HttpAsyncMethods.createGet(uri);
+                        client.execute(get, new MyResponseConsumer(uri), null);
+                    } catch (Exception e) {
+                        log.error(ExceptionUtils.getFullStackTrace(e));
+                    }
+                });
+            }
+        }
+
+        return Response.ok()
+                .header("Content-Type", "text/event-stream")
+                .header("Cache-Control", "no-cache, no-store")
+                .header("Connection", "keep-alive")
+                .build();
+    }
+
+    /**
+     * A character consumer to read incoming characters on the request stream
+     */
+    static class MyResponseConsumer extends AsyncCharConsumer<Boolean> {
+        private String clientUri;
+
+        /**
+         * Constructor
+         *
+         * @param clientUri - URI of the FPC Client
+         */
+        public MyResponseConsumer(String clientUri) {
+            this.clientUri = clientUri;
+        }
+
+        @Override
+        protected void onResponseReceived(final HttpResponse response) {
+        }
+
+        @Override
+        protected void onCharReceived(final CharBuffer buf, final IOControl ioctrl) {
+            char[] charArray = new char[buf.remaining()];
+            System.arraycopy(buf.array(), 0, charArray, 0, buf.remaining());
+            CharBuffer charBuffer = CharBuffer.wrap(charArray);
+            ParseStream.getInstance().send(new AbstractMap.SimpleEntry<>(clientUri, charBuffer));
+        }
+
+        @Override
+        protected void releaseResources() {
+        }
+
+        @Override
+        protected Boolean buildResult(final HttpContext context) {
+            return Boolean.TRUE;
+        }
+
     }
 }
 
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/ConfigHelper.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/ConfigHelper.java
index 20868af..5b1c8e6 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/ConfigHelper.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/ConfigHelper.java
@@ -17,7 +17,7 @@
         "mobilityupdate-ms",
         "activation-threads",
         "target-read-limit",
-        "http-notifier-clients",
+        "http-notifier-clientUriToHttpClient",
         "zmq-nbi-server-poolsize",
         "zmq-nbi-server-uri",
         "zmq-nbi-inproc-uri",
@@ -51,7 +51,7 @@
     private Integer activationThreads;
     @JsonProperty("target-read-limit")
     private Integer targetReadLimit;
-    @JsonProperty("http-notifier-clients")
+    @JsonProperty("http-notifier-clientUriToHttpClient")
     private Integer httpNotifierClients;
     @JsonProperty("zmq-nbi-server-poolsize")
     private Integer zmqNbiServerPoolsize;
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java
new file mode 100644
index 0000000..467575f
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright © 2016 - 2017 Copyright (c) Sprint, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.onosproject.fpcagent.util.eventStream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.onosproject.restconf.api.RestconfRpcOutput;
+import org.onosproject.restconf.api.RestconfService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.*;
+
+/**
+ * Implements a Worker to process event-data pairs sent by the FPC Client
+ */
+public class NBEventWorkerManager implements AutoCloseable {
+    private static final Logger log = LoggerFactory.getLogger(NBEventWorkerManager.class);
+    private static NBEventWorkerManager _instance = null;
+
+    public static Map<Integer, String> clientIdToUri = new ConcurrentHashMap<Integer, String>();
+
+    private final BlockingQueue<Entry<String, Entry<String, String>>> blockingQueue;
+    private final int poolSize;
+    private boolean run;
+    private final RestconfService restconfService;
+
+    protected NBEventWorkerManager(int poolSize, RestconfService restconfService) {
+        this.poolSize = poolSize;
+        this.restconfService = restconfService;
+        this.blockingQueue = new LinkedBlockingQueue<>();
+    }
+
+    public static NBEventWorkerManager createInstance(int poolSize, RestconfService restconfService) {
+        if (_instance == null) {
+            _instance = new NBEventWorkerManager(poolSize, restconfService);
+        }
+        return _instance;
+    }
+
+    public static NBEventWorkerManager getInstance() {
+        return _instance;
+    }
+
+    public void send(Entry<String, Entry<String, String>> buf) {
+        try {
+            blockingQueue.put(buf);
+        } catch (InterruptedException e) {
+            log.error(ExceptionUtils.getFullStackTrace(e));
+        }
+    }
+
+    public void open() {
+        Executors.newFixedThreadPool(this.poolSize).submit(() -> {
+            this.run = true;
+            Entry<String, Entry<String, String>> contents = null;
+            while (run) {
+                try {
+                    contents = blockingQueue.take();
+
+                    // call RESTCONF service
+                    ObjectMapper mapper = new ObjectMapper();
+                    ObjectNode node = (ObjectNode) mapper.readTree(contents.getValue().getValue());
+
+                    log.info("Handling {}", node);
+
+                    if (contents.getValue().getKey().contains("ietf-dmm-fpcagent:configure")) {
+                        CompletableFuture<RestconfRpcOutput> output = restconfService.runRpc(
+                                new URI("/onos/restconf/operations/ietf-dmm-fpcagent:configure"),
+                                node,
+                                contents.getKey()
+                        );
+                        SseManager.getInstance().configureQueue.add(
+                                new AbstractMap.SimpleEntry(
+                                        contents.getKey(),
+                                        "event:application/json;/restconf/operations/ietf-dmm-fpcagent:configure\ndata:" + output.get().output().toString() + "\r\n"
+                                )
+                        );
+                    } else if (contents.getValue().getKey().contains("fpc:register-client")) {
+                        CompletableFuture<RestconfRpcOutput> output = restconfService.runRpc(
+                                new URI("/onos/restconf/operations/fpc:register-client"),
+                                node,
+                                contents.getKey()
+                        );
+                        addClientIdToUriMapping(output.get().output().toString(), contents.getKey());
+                        SseManager.getInstance().registerClientQueue.add(
+                                new AbstractMap.SimpleEntry(
+                                        contents.getKey(),
+                                        "event:application/json;/restconf/operations/fpc:register_client\ndata:" + output.get().output().toString() + "\r\n"
+                                )
+                        );
+                    } else if (contents.getValue().getKey().contains("fpc:deregister-client")) {
+                        CompletableFuture<RestconfRpcOutput> output = restconfService.runRpc(
+                                new URI("/onos/restconf/operations/fpc:deregister-client"),
+                                node,
+                                contents.getKey()
+                        );
+                        removeClientIdToUriMapping(contents.getValue().getValue());
+                    }
+
+                } catch (Exception e) {
+                    log.error(ExceptionUtils.getFullStackTrace(e));
+                }
+            }
+        });
+    }
+
+    /**
+     * Add a FPC client URI to FPC client id mapping
+     *
+     * @param registerClientOutput - Output of the register_cleint rpc
+     * @param uri                  - FPC Client URI
+     */
+    private void addClientIdToUriMapping(String registerClientOutput, String uri) {
+        try {
+            JSONObject registerJSON = new JSONObject(registerClientOutput);
+            clientIdToUri.put(Integer.parseInt(registerJSON.getJSONObject("output").getString("client-id")), uri);
+        } catch (JSONException e) {
+            log.error(ExceptionUtils.getFullStackTrace(e));
+        }
+    }
+
+    /**
+     * Remove a FPC client URI to FPCClient Id mapping
+     *
+     * @param deregisterClientInput - Input String of deregister_client rpc
+     */
+    private void removeClientIdToUriMapping(String deregisterClientInput) {
+        try {
+            JSONObject deregisterJSON = new JSONObject(deregisterClientInput);
+            clientIdToUri.remove(Integer.parseInt(deregisterJSON.getJSONObject("input").getString("client-id")));
+        } catch (JSONException e) {
+            log.error(ExceptionUtils.getFullStackTrace(e));
+        }
+    }
+
+    @Override
+    public void close() {
+        this.run = false;
+    }
+}
+
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java
new file mode 100644
index 0000000..7e94ad4
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java
@@ -0,0 +1,136 @@
+/*
+ * Copyright © 2016 - 2017 Copyright (c) Sprint, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.onosproject.fpcagent.util.eventStream;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.CharBuffer;
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * A class that parses the request stream and enqueues creates event-data pairs for processing
+ */
+public class ParseStream implements AutoCloseable {
+    private static final Logger log = LoggerFactory.getLogger(ParseStream.class);
+    private BlockingQueue<Map.Entry<String, CharBuffer>> blockingQueue;
+    private CharBuffer charBuf;
+    private String partialChunk; //Chunked data left over at the end of the buffer
+    private boolean chunkedBuffer;
+    private boolean run;
+
+    private static ParseStream _instance;
+
+    private ParseStream() {
+        blockingQueue = new LinkedBlockingQueue<>();
+    }
+
+    public static ParseStream createInstance() {
+        if (_instance == null) {
+            _instance = new ParseStream();
+        }
+        return _instance;
+    }
+
+    public static ParseStream getInstance() {
+        return _instance;
+    }
+
+    public void send(Map.Entry<String, CharBuffer> buf) {
+        try {
+            blockingQueue.put(buf);
+        } catch (InterruptedException e) {
+            log.error(ExceptionUtils.getFullStackTrace(e));
+        }
+    }
+
+    private String getLine() {
+        StringBuilder s = new StringBuilder();
+        if (this.chunkedBuffer) {
+            while (this.charBuf.hasRemaining()) {
+                char ch = this.charBuf.get();
+                if (ch == '\n') {
+                    this.partialChunk += s.toString();
+                    this.chunkedBuffer = false;
+                    return this.partialChunk;
+                }
+                if (ch == '\r') {
+                    continue;
+                }
+                s.append(ch);
+            }
+        }
+        while (this.charBuf.hasRemaining()) {
+            char ch = this.charBuf.get();
+            if (ch == '\n') {
+                String line = s.toString();
+                return line.length() > 0 ? line : null;
+            }
+            if (ch == '\r') {
+                continue;
+            }
+            s.append(ch);
+            if (!this.charBuf.hasRemaining()) {
+                this.partialChunk = s.toString();
+                this.chunkedBuffer = true;
+            }
+        }
+        return null;
+    }
+
+    public void open() {
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        executorService.submit(() -> {
+            run = true;
+            chunkedBuffer = false;
+            String event = null, data = null;
+            while (run) {
+                try {
+                    Map.Entry<String, CharBuffer> entry = blockingQueue.take();
+                    this.charBuf = entry.getValue();
+                    while (this.charBuf.hasRemaining()) {
+                        String line = getLine();
+                        if (line != null) {
+                            if (line.startsWith("event")) {
+                                event = line.split(":", 2)[1];
+
+                            } else if (line.startsWith("data")) {
+                                data = line.split(":", 2)[1];
+                                if (event != null && data != null) {
+                                    NBEventWorkerManager.getInstance()
+                                            .send(
+                                                    new AbstractMap.SimpleEntry<>(
+                                                            entry.getKey(),
+                                                            new AbstractMap.SimpleEntry<>(event, data)
+                                                    )
+                                            );
+                                }
+                                event = null;
+                                data = null;
+                            }
+                        }
+
+                    }
+                } catch (InterruptedException e) {
+                    log.error(ExceptionUtils.getFullStackTrace(e));
+                }
+            }
+        });
+    }
+
+    @Override
+    public void close() {
+        run = false;
+    }
+}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/SseManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/SseManager.java
new file mode 100644
index 0000000..aa2dc53
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/SseManager.java
@@ -0,0 +1,153 @@
+/*
+ * Copyright © 2016 - 2017 Copyright (c) Sprint, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.onosproject.fpcagent.util.eventStream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.nio.client.methods.HttpAsyncMethods;
+import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
+import org.onosproject.restconf.utils.RestconfUtils;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.DefaultYangAutoPrefixNotify;
+import org.onosproject.yang.model.DefaultModelObjectData;
+import org.onosproject.yang.model.ResourceData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.onosproject.fpcagent.util.FpcUtil.modelConverter;
+import static org.onosproject.fpcagent.util.FpcUtil.module;
+
+
+/**
+ * A servlet context listener that sends response (ConfigureOutput) events to clientUriToHttpClient.
+ */
+public class SseManager implements AutoCloseable {
+    private static final Logger log = LoggerFactory.getLogger(SseManager.class);
+
+    //key = client-id, value = configure-output
+    public BlockingQueue<Map.Entry<String, DefaultYangAutoPrefixNotify>> configureQueue;
+    public BlockingQueue<Map.Entry<String, String>> registerClientQueue;
+    public BlockingQueue<Map.Entry<String, String>> notificationQueue;
+
+    public Map<String, CloseableHttpAsyncClient> clientUriToHttpClient;
+    public Map<String, CloseableHttpAsyncClient> clientIdToHttpClient;
+    private static SseManager _instance;
+
+    private boolean run;
+
+    private SseManager() {
+        configureQueue = new LinkedBlockingQueue<>();
+        registerClientQueue = new LinkedBlockingQueue<>();
+        notificationQueue = new LinkedBlockingQueue<>();
+        clientUriToHttpClient = Maps.newConcurrentMap();
+        clientIdToHttpClient = Maps.newConcurrentMap();
+    }
+
+    public static SseManager createInstance() {
+        if (_instance == null) {
+            _instance = new SseManager();
+        }
+        return _instance;
+    }
+
+    public static SseManager getInstance() {
+        return _instance;
+    }
+
+    public void open() {
+        run = true;
+        Executors.newSingleThreadExecutor().submit(() -> {
+            String clientUri = null;
+            while (run) {
+                try {
+                    Map.Entry<String, DefaultYangAutoPrefixNotify> entry = configureQueue.take();
+                    clientUri = entry.getKey();
+                    DefaultYangAutoPrefixNotify notify = entry.getValue();
+
+                    ResourceData dataNode = modelConverter.createDataNode(
+                            DefaultModelObjectData.builder()
+                                    .addModelObject(notify)
+                                    .build()
+                    );
+                    ObjectNode jsonNodes = RestconfUtils.convertDataNodeToJson(module, dataNode.dataNodes().get(0));
+                    ObjectMapper mapper = new ObjectMapper();
+                    String value = mapper.writeValueAsString(jsonNodes);
+
+                    if (clientUriToHttpClient.containsKey(clientUri)) {
+                        CloseableHttpAsyncClient client = clientUriToHttpClient.get(clientUri);
+                        HttpAsyncRequestProducer get = HttpAsyncMethods
+                                .createPost(clientUri, value.getBytes(), ContentType.TEXT_PLAIN);
+
+                        client.execute(get, null, null);
+                    }
+                } catch (Exception e) {
+                    log.error(ExceptionUtils.getFullStackTrace(e));
+                    clientUriToHttpClient.remove(clientUri);
+                    break;
+                }
+            }
+        });
+
+        Executors.newSingleThreadExecutor().submit(() -> {
+            String clientUri = null;
+            while (run) {
+                try {
+                    Map.Entry<String, String> entry = registerClientQueue.take();
+                    clientUri = entry.getKey();
+
+                    if (clientUriToHttpClient.containsKey(clientUri)) {
+                        CloseableHttpAsyncClient client = clientUriToHttpClient.get(clientUri);
+                        HttpAsyncRequestProducer get = HttpAsyncMethods
+                                .createPost(clientUri, entry.getValue().getBytes(), ContentType.TEXT_PLAIN);
+                        client.execute(get, null, null);
+                    }
+                } catch (Exception e) {
+                    log.error(ExceptionUtils.getFullStackTrace(e));
+                    clientUriToHttpClient.remove(clientUri);
+                    break;
+                }
+            }
+
+        });
+
+        Executors.newSingleThreadExecutor().submit(() -> {
+            String clientId = null;
+            while (run) {
+                try {
+                    Map.Entry<String, String> entry = notificationQueue.take();
+                    clientId = entry.getKey();
+
+                    if (clientIdToHttpClient.containsKey(clientId)) {
+                        CloseableHttpAsyncClient client = clientIdToHttpClient.get(clientId);
+                        HttpAsyncRequestProducer get = HttpAsyncMethods
+                                .createPost(clientId, entry.getValue().getBytes(), ContentType.TEXT_PLAIN);
+                        client.execute(get, null, null);
+                    }
+                } catch (Exception e) {
+                    log.error(ExceptionUtils.getFullStackTrace(e));
+                    clientIdToHttpClient.remove(clientId);
+                    break;
+                }
+            }
+
+        });
+    }
+
+    @Override
+    public void close() {
+        run = false;
+    }
+}
\ No newline at end of file