mend
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ResponseClient.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ResponseClient.java
new file mode 100644
index 0000000..76762d8
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ResponseClient.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright © 2016 - 2017 Copyright (c) Sprint, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.onosproject.fpcagent.util.eventStream;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.impl.nio.client.HttpAsyncClients;
+import org.apache.http.nio.IOControl;
+import org.apache.http.nio.client.methods.AsyncCharConsumer;
+import org.apache.http.nio.client.methods.HttpAsyncMethods;
+import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
+import org.apache.http.protocol.HttpContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.CharBuffer;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+
+/**
+ * A HTTP client that sends a request to a FPC Client to initiate the request stream.
+ */
+public class ResponseClient {
+ private static final Logger log = LoggerFactory.getLogger(ResponseClient.class);
+
+ private static final CloseableHttpAsyncClient client = HttpAsyncClients.createDefault();
+ private static ArrayList<String> clientUriList = new ArrayList<String>();
+ protected String clientUri;
+
+ /**
+ * Send HttpRequest to Client
+ *
+ * @param uri - FPC Client Uri
+ */
+ public void connectToClient(String uri) {
+ this.clientUri = uri;
+ log.info("here connectToClient");
+ try {
+ client.start();
+ HttpAsyncRequestProducer get = HttpAsyncMethods.createGet(this.clientUri);
+ client.execute(get, new MyResponseConsumer(this.clientUri), null);
+ } catch (Exception e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ }
+
+ /**
+ * A character consumer to read incoming characters on the request stream
+ */
+ static class MyResponseConsumer extends AsyncCharConsumer<Boolean> {
+ private String clientUri;
+
+ /**
+ * Constructor
+ *
+ * @param clientUri - URI of the FPC Client
+ */
+ public MyResponseConsumer(String clientUri) {
+ this.clientUri = clientUri;
+ }
+
+ @Override
+ protected void onResponseReceived(final HttpResponse response) {
+ }
+
+ @Override
+ protected void onCharReceived(final CharBuffer buf, final IOControl ioctrl) {
+ char[] charArray = new char[buf.remaining()];
+ System.arraycopy(buf.array(), 0, charArray, 0, buf.remaining());
+ CharBuffer charBuffer = CharBuffer.wrap(charArray);
+ ParseStream.getInstance().send(new AbstractMap.SimpleEntry<>(clientUri, charBuffer));
+ }
+
+ @Override
+ protected void releaseResources() {
+ }
+
+ @Override
+ protected Boolean buildResult(final HttpContext context) {
+ return Boolean.TRUE;
+ }
+
+ }
+}
+
+
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ResponseServer.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ResponseServer.java
new file mode 100644
index 0000000..3594d22
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ResponseServer.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright © 2016 - 2017 Copyright (c) Sprint, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.onosproject.fpcagent.util.eventStream;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.HttpSession;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A Http Servlet to server response streams
+ */
+public class ResponseServer extends DefaultServlet {
+ private static final Logger log = LoggerFactory.getLogger(ResponseServer.class);
+ private static ArrayList<String> clientUriList = new ArrayList<>();
+
+ public ResponseServer() {
+ super();
+ log.info("Response Server Initialized");
+ }
+
+ /**
+ * Method for stream initialization
+ *
+ * @param clientUri - Client Uri
+ * @param request - The servlet request object
+ * @param response - The servlet Response Object
+ */
+ private void init(String clientUri, HttpServletRequest request, HttpServletResponse response) {
+ log.info("Response Stream Inititated");
+ try {
+ HttpSession session = request.getSession();
+ session.setMaxInactiveInterval(72 * 60 * 60);
+ ResponseClient client = new ResponseClient();
+ client.connectToClient(clientUri);
+ response.setHeader("Content-Type", "text/event-stream");
+ response.setHeader("Cache-Control", "no-cache, no-store");
+ response.setHeader("Connection", "keep-alive");
+ AsyncContext asyncContext = request.startAsync(request, response);
+ asyncContext.setTimeout(72 * 60 * 60 * 1000);
+ asyncContext.getResponse().setBufferSize(1200);
+ try {
+ asyncContext.getResponse().flushBuffer();
+ } catch (IOException e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ ServletContext servletContext = request.getServletContext();
+ Map<String, AsyncContext> responseStreams = (ConcurrentHashMap<String, AsyncContext>) servletContext.getAttribute("responseStreams");
+ responseStreams.put(clientUri, asyncContext);
+ } catch (Exception e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ }
+
+ @Override
+ protected void doGet(HttpServletRequest request, HttpServletResponse response) {
+ try {
+ response.setContentType("text/html");
+ response.setStatus(HttpServletResponse.SC_OK);
+ response.getWriter().println("<h1>Hello World</h1>");
+ response.getWriter().println("session=" + request.getSession(true).getId());
+ } catch (IOException e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ }
+
+ @Override
+ protected void doPost(HttpServletRequest request, HttpServletResponse response) {
+ log.info("here doPost");
+ try {
+ String clientUri = null;
+ StringBuffer jsonStringBuilder = new StringBuffer();
+ String line = null;
+ try {
+ BufferedReader br = request.getReader();
+ while ((line = br.readLine()) != null) {
+ jsonStringBuilder.append(line);
+ }
+ } catch (Exception e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+
+ try {
+ if (jsonStringBuilder.length() > 0) {
+ JSONObject jsonObj = new JSONObject(jsonStringBuilder.toString());
+ clientUri = jsonObj.getString("client-uri");
+ if (!clientUriList.contains(clientUri)) {
+ init(clientUri, request, response);
+ }
+ jsonStringBuilder.setLength(0);
+ }
+ } catch (JSONException e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ } catch (Exception e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+
+ }
+}