event stream implementation
diff --git a/apps/fpcagent/BUCK b/apps/fpcagent/BUCK
index 9d19b41..89b33cb 100644
--- a/apps/fpcagent/BUCK
+++ b/apps/fpcagent/BUCK
@@ -4,6 +4,7 @@
'//lib:KRYO',
'//lib:onos-yang-model',
'//lib:httpclient-osgi',
+ '//lib:org.apache.httpcomponents.httpasyncclient-osgi',
'//lib:httpcore-osgi',
'//lib:netty',
'//lib:javax.ws.rs-api',
@@ -13,7 +14,13 @@
'//apps/config:onos-apps-config',
'//models/fpcagent:onos-models-fpcagent',
'//models/common:onos-models-common',
+ '//lib:jetty-http',
+ ':jetty-servlet',
+ ':jetty-annotations',
+ ':jetty-server-v9.4.6',
+ '//lib:jetty-util',
':zeromq',
+ '//lib:javax.servlet-api',
':json',
]
@@ -32,6 +39,9 @@
EXCLUDED_BUNDLES = [
':zeromq',
':json',
+ ':jetty-servlet',
+ ':jetty-annotations',
+ ':jetty-server-v9.4.6',
]
APPS = [
@@ -46,7 +56,7 @@
osgi_jar_with_tests(
deps = COMPILE_DEPS,
test_deps = TEST_DEPS,
- web_context = '/onos/fpcagent',
+ web_context = '/tmp',
api_title = 'FPC REST API',
api_package = 'org.onosproject.fpcagent.rest',
api_version = '1.0',
@@ -81,3 +91,30 @@
maven_coords = 'org.zeromq:jeromq:0.3.5',
visibility = [ 'PUBLIC' ],
)
+
+remote_jar (
+ name = 'jetty-annotations',
+ out = 'jetty-annotations-8.1.19.v20160209.jar',
+ url = 'mvn:org.eclipse.jetty:jetty-annotations:jar:8.1.19.v20160209',
+ sha1 = '649592e66a01b01956c19b80d87c03c8052413ee',
+ maven_coords = 'org.eclipse.jetty:jetty-annotations:8.1.19.v20160209',
+ visibility = [ 'PUBLIC' ],
+)
+
+remote_jar (
+ name = 'jetty-servlet',
+ out = 'jetty-servlet-8.1.19.v20160209.jar',
+ url = 'mvn:org.eclipse.jetty:jetty-servlet:jar:8.1.19.v20160209',
+ sha1 = '6872c3fc289de8f26a43b101741b33af36590cb4',
+ maven_coords = 'org.eclipse.jetty:jetty-servlet:8.1.19.v20160209',
+ visibility = [ 'PUBLIC' ],
+)
+
+remote_jar (
+ name = 'jetty-server-v9.4.6',
+ out = 'jetty-server-9.4.6.v20170531.jar',
+ url = 'mvn:org.eclipse.jetty:jetty-server:jar:9.4.6.v20170531',
+ sha1 = 'afda653f00267fb8b501cafd1cf5cdd1615602a2',
+ maven_coords = 'org.eclipse.jetty:jetty-server:9.4.6.v20170531',
+ visibility = [ 'PUBLIC' ],
+)
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
index d7d88e6..6478827 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
@@ -27,6 +27,9 @@
import org.onosproject.fpcagent.providers.DpnDeviceListener;
import org.onosproject.fpcagent.providers.DpnProviderService;
import org.onosproject.fpcagent.util.ConfigHelper;
+import org.onosproject.fpcagent.util.eventStream.JettyServer;
+import org.onosproject.fpcagent.util.eventStream.NBEventWorkerManager;
+import org.onosproject.fpcagent.util.eventStream.ParseStream;
import org.onosproject.fpcagent.workers.HTTPNotifier;
import org.onosproject.fpcagent.workers.ZMQSBPublisherManager;
import org.onosproject.fpcagent.workers.ZMQSBSubscriberManager;
@@ -88,6 +91,8 @@
private FpcConfig fpcConfig;
private boolean started = false;
private HashSet<DpnDeviceListener> listeners = Sets.newHashSet();
+ private Thread jettyServer;
+ private Thread parseStream;
/* Config */
private ConfigFactory<ApplicationId, FpcConfig> fpcConfigConfigFactory =
@@ -108,7 +113,14 @@
notificationIds = coreService.getIdGenerator("fpc-notification-ids");
- HTTPNotifier.createInstance(modelConverter).open();
+ jettyServer = new Thread(JettyServer::init);
+ parseStream = new Thread(new ParseStream());
+
+ jettyServer.start();
+ parseStream.start();
+
+ NBEventWorkerManager.createInstance(20).open();
+ HTTPNotifier.createInstance().open();
log.info("FPC Service Started");
}
@@ -125,8 +137,12 @@
started = false;
}
+ NBEventWorkerManager.getInstance().close();
HTTPNotifier.getInstance().close();
+ jettyServer.stop();
+ parseStream.stop();
+
log.info("FPC Service Stopped");
}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcService.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcService.java
index d0b1217..4a29287 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcService.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcService.java
@@ -8,13 +8,14 @@
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.configuredpn.DefaultConfigureDpnOutput;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.opinput.opbody.CreateOrUpdate;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.opinput.opbody.DeleteOrQuery;
+import org.onosproject.yang.model.RpcInput;
+import org.onosproject.yang.model.RpcOutput;
/**
* Main service that handles RPC events and DC Store modifications.
*/
@Beta
public interface FpcRpcService {
-
/**
* Handles create Configure operations that are invoked through RPC.
*
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/rest/FpcWebResource.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/rest/FpcWebResource.java
index 16b891e..2e336fe 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/rest/FpcWebResource.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/rest/FpcWebResource.java
@@ -16,39 +16,87 @@
package org.onosproject.fpcagent.rest;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.http.HttpVersion;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.BasicResponseHandler;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
import org.onosproject.rest.AbstractWebResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.core.MediaType;
-import javax.ws.rs.Consumes;
import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
@Path("/")
public class FpcWebResource extends AbstractWebResource {
private static final Logger log =
LoggerFactory.getLogger(FpcWebResource.class);
+ private ResponseHandler<String> handler = new BasicResponseHandler();
+
@POST
@Consumes(MediaType.APPLICATION_JSON)
- @Path("/response")
- public Response getResponse() {
+ @Path("/request")
+ public Response getRequest(InputStream stream) {
+ try {
+ ObjectNode node = (ObjectNode) mapper().readTree(stream);
+
+ log.info("Received {}", node.toString());
+
+ CloseableHttpClient client = HttpClients.createDefault();
+ HttpGet httpGet = new HttpGet(node.get("client-uri").asText());
+
+ httpGet.addHeader("User-Agent", "ONOS FPC Agent");
+ httpGet.addHeader("Charset", "utf-8");
+ httpGet.addHeader("Content-type", "text/event-stream");
+ httpGet.setProtocolVersion(HttpVersion.HTTP_1_1);
+
+ CloseableHttpResponse httpResponse = client.execute(httpGet);
+
+ String response = handler.handleResponse(httpResponse);
+ log.info("Response {}", response);
+ } catch (IOException e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+
return Response.ok().build();
}
@POST
@Consumes(MediaType.APPLICATION_JSON)
- @Path("/request")
- public Response getRequest() {
+ @Path("/response")
+ public Response getResponse(InputStream stream) {
+ try {
+ ObjectNode node = (ObjectNode) mapper().readTree(stream);
+
+ log.info("Received {}", node.toString());
+ } catch (IOException e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
return Response.ok().build();
}
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Path("/notification")
- public Response getNotification() {
+ public Response getNotification(InputStream stream) {
+ try {
+ ObjectNode node = (ObjectNode) mapper().readTree(stream);
+
+ log.info("Received {}", node.toString());
+ } catch (IOException e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
return Response.ok().build();
}
}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ConfigureService.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ConfigureService.java
new file mode 100644
index 0000000..1c90115
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ConfigureService.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright © 2016 - 2017 Copyright (c) Sprint, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.onosproject.fpcagent.util.eventStream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.onosproject.restconf.utils.RestconfUtils;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.DefaultYangAutoPrefixNotify;
+import org.onosproject.yang.model.DefaultModelObjectData;
+import org.onosproject.yang.model.ResourceData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.ServletContextEvent;
+import javax.servlet.ServletContextListener;
+import javax.servlet.ServletOutputStream;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.onosproject.fpcagent.util.FpcUtil.modelConverter;
+import static org.onosproject.fpcagent.util.FpcUtil.module;
+
+
+/**
+ * A servlet context listener that sends response (ConfigureOutput) events to clients.
+ */
+public class ConfigureService implements ServletContextListener {
+
+ private static final Logger log = LoggerFactory.getLogger(ConfigureService.class);
+ //key = client-id, value = configure-output
+ public static BlockingQueue<Map.Entry<String, DefaultYangAutoPrefixNotify>> blockingQueue = new LinkedBlockingQueue<>();
+ public static BlockingQueue<Map.Entry<String, String>> registerClientQueue = new LinkedBlockingQueue<>();
+
+ public void contextInitialized(ServletContextEvent sce) {
+ Map<String, AsyncContext> responseStreams = new ConcurrentHashMap<>();
+ sce.getServletContext().setAttribute("responseStreams", responseStreams);
+ Thread thread = new Thread(() -> {
+ String clientUri = null;
+ AsyncContext asyncContext = null;
+ while (true) {
+ try {
+ Map.Entry<String, DefaultYangAutoPrefixNotify> entry = blockingQueue.take();
+ clientUri = entry.getKey();
+ DefaultYangAutoPrefixNotify notify = entry.getValue();
+
+ ResourceData dataNode = modelConverter.createDataNode(
+ DefaultModelObjectData.builder()
+ .addModelObject(notify)
+ .build()
+ );
+ ObjectNode jsonNodes = RestconfUtils.convertDataNodeToJson(module, dataNode.dataNodes().get(0));
+ ObjectMapper mapper = new ObjectMapper();
+ String value = mapper.writeValueAsString(jsonNodes);
+
+ asyncContext = responseStreams.get(clientUri);
+ if (responseStreams.get(clientUri) != null) {
+ ServletOutputStream out = asyncContext.getResponse().getOutputStream();
+ out.write(value.getBytes());
+ out.flush();
+ asyncContext.getResponse().flushBuffer();
+ }
+ } catch (Exception e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ asyncContext.complete();
+ responseStreams.remove(clientUri);
+ break;
+ }
+ }
+ });
+ thread.start();
+
+ Thread thread1 = new Thread(() -> {
+ String clientUri = null;
+ AsyncContext asyncContext = null;
+ while (true) {
+ try {
+ Map.Entry<String, String> entry = registerClientQueue.take();
+ clientUri = entry.getKey();
+ asyncContext = responseStreams.get(clientUri);
+ if (responseStreams.get(entry.getKey()) != null) {
+ ServletOutputStream out = asyncContext.getResponse().getOutputStream();
+ out.write(entry.getValue().getBytes());
+ out.flush();
+ asyncContext.getResponse().flushBuffer();
+ }
+ } catch (Exception e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ asyncContext.complete();
+ responseStreams.remove(clientUri);
+ break;
+ }
+ }
+ });
+ thread1.start();
+
+ }
+
+ public void contextDestroyed(ServletContextEvent sce) {
+ }
+}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventClient.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventClient.java
new file mode 100644
index 0000000..ec480cb
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventClient.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright © 2016 - 2017 Copyright (c) Sprint, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.onosproject.fpcagent.util.eventStream;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.impl.nio.client.HttpAsyncClients;
+import org.apache.http.nio.IOControl;
+import org.apache.http.nio.client.methods.AsyncCharConsumer;
+import org.apache.http.nio.client.methods.HttpAsyncMethods;
+import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
+import org.apache.http.protocol.HttpContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.CharBuffer;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+
+/**
+ * A HTTP client that sends a request to a FPC Client to initiate the request stream.
+ */
+public class EventClient {
+ private static final Logger log = LoggerFactory.getLogger(EventClient.class);
+
+ private static final CloseableHttpAsyncClient client = HttpAsyncClients.createDefault();
+ private static ArrayList<String> clientUriList = new ArrayList<String>();
+ protected String clientUri;
+
+ /**
+ * Send HttpRequest to Client
+ *
+ * @param uri - FPC Client Uri
+ */
+ public void connectToClient(String uri) {
+ this.clientUri = uri;
+ try {
+ client.start();
+ HttpAsyncRequestProducer get = HttpAsyncMethods.createGet(this.clientUri);
+ client.execute(get, new MyResponseConsumer(this.clientUri), null);
+ } catch (Exception e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ }
+
+ /**
+ * A character consumer to read incoming characters on the request stream
+ */
+ static class MyResponseConsumer extends AsyncCharConsumer<Boolean> {
+ private String clientUri;
+
+ /**
+ * Constructor
+ *
+ * @param clientUri - URI of the FPC Client
+ */
+ public MyResponseConsumer(String clientUri) {
+ this.clientUri = clientUri;
+ }
+
+ @Override
+ protected void onResponseReceived(final HttpResponse response) {
+ }
+
+ @Override
+ protected void onCharReceived(final CharBuffer buf, final IOControl ioctrl) {
+ try {
+ char[] charArray = new char[buf.remaining()];
+ System.arraycopy(buf.array(), 0, charArray, 0, buf.remaining());
+ CharBuffer charBuffer = CharBuffer.wrap(charArray);
+ ParseStream.blockingQueue.put(new AbstractMap.SimpleEntry<>(clientUri, charBuffer));
+ } catch (InterruptedException e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ }
+
+ @Override
+ protected void releaseResources() {
+ }
+
+ @Override
+ protected Boolean buildResult(final HttpContext context) {
+ return Boolean.TRUE;
+ }
+
+ }
+}
+
+
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventServer.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventServer.java
new file mode 100644
index 0000000..3313188
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventServer.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright © 2016 - 2017 Copyright (c) Sprint, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.onosproject.fpcagent.util.eventStream;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.HttpSession;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A Http Servlet to server response streams
+ */
+public class EventServer extends HttpServlet {
+ private static final Logger log = LoggerFactory.getLogger(EventServer.class);
+ private static ArrayList<String> clientUriList = new ArrayList<String>();
+
+ /**
+ * Method for stream initialization
+ *
+ * @param clientUri - Client Uri
+ * @param request - The servlet request object
+ * @param response - The servlet Response Object
+ */
+ private void init(String clientUri, HttpServletRequest request, HttpServletResponse response) {
+ log.info("Response Stream Inititated");
+ try {
+ HttpSession session = request.getSession();
+ session.setMaxInactiveInterval(72 * 60 * 60);
+ EventClient client = new EventClient();
+ client.connectToClient(clientUri);
+ response.setHeader("Content-Type", "text/event-stream");
+ response.setHeader("Cache-Control", "no-cache, no-store");
+ response.setHeader("Connection", "keep-alive");
+ AsyncContext asyncContext = request.startAsync(request, response);
+ asyncContext.setTimeout(72 * 60 * 60 * 1000);
+ asyncContext.getResponse().setBufferSize(1200);
+ try {
+ asyncContext.getResponse().flushBuffer();
+ } catch (IOException e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ ServletContext servletContext = request.getServletContext();
+ Map<String, AsyncContext> responseStreams = (ConcurrentHashMap<String, AsyncContext>) servletContext.getAttribute("responseStreams");
+ responseStreams.put(clientUri, asyncContext);
+ } catch (Exception e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ }
+
+ protected void doPost(HttpServletRequest request, HttpServletResponse response) {
+ try {
+ String clientUri = null;
+ StringBuffer jsonStringBuilder = new StringBuffer();
+ String line = null;
+ try {
+ BufferedReader br = request.getReader();
+ while ((line = br.readLine()) != null)
+ jsonStringBuilder.append(line);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ try {
+ if (jsonStringBuilder.length() > 0) {
+ JSONObject jsonObj = new JSONObject(jsonStringBuilder.toString());
+ clientUri = jsonObj.getString("client-uri");
+ if (!clientUriList.contains(clientUri)) {
+ init(clientUri, request, response);
+ }
+ jsonStringBuilder.setLength(0);
+ }
+ } catch (JSONException e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ } catch (Exception e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+
+ }
+}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java
new file mode 100644
index 0000000..deba165
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright © 2016 - 2017 Copyright (c) Sprint, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.onosproject.fpcagent.util.eventStream;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SSE Server implementation
+ */
+public class JettyServer {
+ private static final Logger log = LoggerFactory.getLogger(JettyServer.class);
+
+ /**
+ * Method used to initialize and start the SSE server
+ */
+ public static void init() {
+ Server server = new Server(8070);
+ ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+ context.setContextPath("/");
+
+ ServletHolder requestServletHolder = new ServletHolder(EventServer.class);
+ context.addServlet(requestServletHolder, "/response");
+
+ ServletHolder notificationServletHolder = new ServletHolder(NotificationServer.class);
+ context.addServlet(notificationServletHolder, "/notification");
+
+ server.setHandler(context);
+ context.addEventListener(new ConfigureService());
+ context.addEventListener(new NotificationService());
+
+ try {
+ server.start();
+ log.info("Server Started");
+ server.join();
+ } catch (Exception e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ }
+
+}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java
new file mode 100644
index 0000000..2a7bd54
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright © 2016 - 2017 Copyright (c) Sprint, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.onosproject.fpcagent.util.eventStream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.onosproject.restconf.api.RestconfRpcOutput;
+import org.onosproject.restconf.api.RestconfService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Implements a Worker to process event-data pairs sent by the FPC Client
+ */
+public class NBEventWorkerManager implements AutoCloseable {
+ private static final Logger log = LoggerFactory.getLogger(NBEventWorkerManager.class);
+ private static NBEventWorkerManager _instance = null;
+
+ public static Map<Integer, String> clientIdToUri = new ConcurrentHashMap<Integer, String>();
+
+ private final BlockingQueue<Entry<String, Entry<String, String>>> blockingQueue;
+ private final int poolSize;
+ private boolean run;
+ private final RestconfService restconfService;
+
+ protected NBEventWorkerManager(int poolSize, RestconfService restconfService) {
+ this.poolSize = poolSize;
+ this.restconfService = restconfService;
+ this.blockingQueue = new LinkedBlockingQueue<>();
+ }
+
+ public static NBEventWorkerManager createInstance(int poolSize, RestconfService restconfService) {
+ if (_instance == null) {
+ _instance = new NBEventWorkerManager(poolSize, restconfService);
+ }
+ return _instance;
+ }
+
+ public static NBEventWorkerManager getInstance() {
+ return _instance;
+ }
+
+ public void send(Entry<String, Entry<String, String>> buf) {
+ try {
+ blockingQueue.put(buf);
+ } catch (InterruptedException e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ }
+
+ public void open() {
+ this.run = true;
+ Entry<String, Entry<String, String>> contents = null;
+ while (run) {
+ try {
+ contents = blockingQueue.take();
+
+ // call RESTCONF service
+ ObjectMapper mapper = new ObjectMapper();
+ ObjectNode node = (ObjectNode) mapper.readTree(contents.getValue().getValue());
+ if (contents.getValue().getKey().contains("ietf-dmm-fpcagent:configure")) {
+ CompletableFuture<RestconfRpcOutput> output = restconfService.runRpc(
+ new URI("/onos/restconf/operations/ietf-dmm-fpcagent:configure"),
+ node,
+ contents.getKey()
+ );
+ ConfigureService.blockingQueue.add(
+ new AbstractMap.SimpleEntry(
+ contents.getKey(),
+ "event:application/json;/restconf/operations/ietf-dmm-fpcagent:configure\ndata:" + output.get().output().toString() + "\r\n"
+ )
+ );
+ } else if (contents.getValue().getKey().contains("fpc:register-client")) {
+ CompletableFuture<RestconfRpcOutput> output = restconfService.runRpc(
+ new URI("/onos/restconf/operations/fpc:register-client"),
+ node,
+ contents.getKey()
+ );
+ addClientIdToUriMapping(output.get().output().toString(), contents.getKey());
+ ConfigureService.registerClientQueue.add(
+ new AbstractMap.SimpleEntry(
+ contents.getKey(),
+ "event:application/json;/restconf/operations/fpc:register_client\ndata:" + output.get().output().toString() + "\r\n"
+ )
+ );
+ } else if (contents.getValue().getKey().contains("fpc:deregister-client")) {
+ CompletableFuture<RestconfRpcOutput> output = restconfService.runRpc(
+ new URI("/onos/restconf/operations/fpc:deregister-client"),
+ node,
+ contents.getKey()
+ );
+ removeClientIdToUriMapping(contents.getValue().getValue());
+ }
+
+ } catch (Exception e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ }
+ }
+
+ /**
+ * Add a FPC client URI to FPC client id mapping
+ *
+ * @param registerClientOutput - Output of the register_cleint rpc
+ * @param uri - FPC Client URI
+ */
+ private void addClientIdToUriMapping(String registerClientOutput, String uri) {
+ try {
+ JSONObject registerJSON = new JSONObject(registerClientOutput);
+ clientIdToUri.put(Integer.parseInt(registerJSON.getJSONObject("output").getString("client-id")), uri);
+ } catch (JSONException e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ }
+
+ /**
+ * Remove a FPC client URI to FPCClient Id mapping
+ *
+ * @param deregisterClientInput - Input String of deregister_client rpc
+ */
+ private void removeClientIdToUriMapping(String deregisterClientInput) {
+ try {
+ JSONObject deregisterJSON = new JSONObject(deregisterClientInput);
+ clientIdToUri.remove(Integer.parseInt(deregisterJSON.getJSONObject("input").getString("client-id")));
+ } catch (JSONException e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ }
+
+ @Override
+ public void close() {
+ this.run = false;
+ }
+}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NotificationServer.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NotificationServer.java
new file mode 100644
index 0000000..65150f8
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NotificationServer.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright © 2016 - 2017 Copyright (c) Sprint, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.onosproject.fpcagent.util.eventStream;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.servlet.http.HttpSession;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.opendaylight.fpc.utils.ErrorLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A HTTP servlet that handles requests for notification streams
+ */
+public class NotificationServer extends HttpServlet {
+ private static final Logger log = LoggerFactory.getLogger(EventServer.class);
+
+ protected void doPost( HttpServletRequest request, HttpServletResponse response){
+ log.info("Notification stream Inititated");
+ String clientId = null;
+ StringBuffer jsonStringBuilder = new StringBuffer();
+ String line = null;
+ try {
+ BufferedReader br = request.getReader();
+ while ((line = br.readLine()) != null)
+ jsonStringBuilder.append(line);
+ } catch (Exception e) { e.printStackTrace(); }
+
+ try {
+ if(jsonStringBuilder.length() > 0){
+ JSONObject jsonObj = new JSONObject(jsonStringBuilder.toString());
+ clientId = jsonObj.getString("client-id");
+ jsonStringBuilder.setLength(0);
+ HttpSession session = request.getSession();
+ session.setMaxInactiveInterval(72*60*60);
+ response.setHeader("Content-Type", "text/event-stream");
+ response.setHeader("Cache-Control", "no-cache, no-store");
+ response.setHeader("Connection", "keep-alive");
+ AsyncContext asyncContext = request.startAsync(request,response);
+ asyncContext.setTimeout(72*60*60*1000);
+ asyncContext.getResponse().setBufferSize(1200);
+ try {
+ asyncContext.getResponse().flushBuffer();
+ } catch (IOException e1) {
+ ErrorLog.logError(e1.getMessage(),e1.getStackTrace());
+ }
+ ServletContext servletContext = request.getServletContext();
+ Map<String,AsyncContext> notificationStreams = (ConcurrentHashMap<String,AsyncContext>) servletContext.getAttribute("notificationStreams");
+ notificationStreams.put(clientId,asyncContext);
+ log.info("Client Id received in the notification stream request: "+clientId);
+ }
+ } catch (JSONException e) {
+ ErrorLog.logError(e.getLocalizedMessage(),e.getStackTrace());
+ }
+ }
+}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NotificationService.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NotificationService.java
new file mode 100644
index 0000000..13edc1e
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NotificationService.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright © 2016 - 2017 Copyright (c) Sprint, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.onosproject.fpcagent.util.eventStream;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.ServletContextEvent;
+import javax.servlet.ServletContextListener;
+import javax.servlet.ServletOutputStream;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+
+/**
+ * A servlet context listener that sends event-data pairs for config result notifications
+ */
+public class NotificationService implements ServletContextListener {
+ private static final Logger log = LoggerFactory.getLogger(NotificationService.class);
+
+ //key = client id, value = config-result-notification
+ public static BlockingQueue<Map.Entry<String, String>> blockingQueue = new LinkedBlockingQueue<Map.Entry<String, String>>();
+
+ public void contextInitialized(ServletContextEvent sce) {
+ Map<String, AsyncContext> notificationStreams = new ConcurrentHashMap<String, AsyncContext>();
+ sce.getServletContext().setAttribute("notificationStreams", notificationStreams);
+ Thread thread = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ String clientId = null;
+ AsyncContext asyncContext = null;
+ while (true) {
+ try {
+ Map.Entry<String, String> entry = blockingQueue.take();
+ clientId = entry.getKey();
+ asyncContext = notificationStreams.get(clientId);
+ if (notificationStreams.get(entry.getKey()) != null) {
+ ServletOutputStream out = asyncContext.getResponse().getOutputStream();
+ out.write(entry.getValue().getBytes());
+ out.flush();
+ asyncContext.getResponse().flushBuffer();
+ }
+ } catch (Exception e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ asyncContext.complete();
+ notificationStreams.remove(clientId);
+ break;
+ }
+ }
+ }
+
+ });
+ thread.start();
+
+ }
+
+ public void contextDestroyed(ServletContextEvent sce) {
+ }
+}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java
new file mode 100644
index 0000000..edd2415
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright © 2016 - 2017 Copyright (c) Sprint, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.onosproject.fpcagent.util.eventStream;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.CharBuffer;
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * A class that parses the request stream and enqueues creates event-data pairs for processing
+ */
+public class ParseStream implements Runnable {
+ private static final Logger log = LoggerFactory.getLogger(ParseStream.class);
+ public static BlockingQueue<Map.Entry<String, CharBuffer>> blockingQueue = new LinkedBlockingQueue<Map.Entry<String, CharBuffer>>();
+ private CharBuffer charBuf;
+ private String partialChunk; //Chunked data left over at the end of the buffer
+ boolean chunkedBuffer;
+
+ private String getLine() {
+ StringBuilder s = new StringBuilder();
+ if (this.chunkedBuffer) {
+ while (this.charBuf.hasRemaining()) {
+ char ch = this.charBuf.get();
+ if (ch == '\n') {
+ this.partialChunk += s.toString();
+ this.chunkedBuffer = false;
+ return this.partialChunk;
+ }
+ if (ch == '\r') {
+ continue;
+ }
+ s.append(ch);
+ }
+ }
+ while (this.charBuf.hasRemaining()) {
+ char ch = this.charBuf.get();
+ if (ch == '\n') {
+ String line = s.toString();
+ return line.length() > 0 ? line : null;
+ }
+ if (ch == '\r') {
+ continue;
+ }
+ s.append(ch);
+ if (!this.charBuf.hasRemaining()) {
+ this.partialChunk = s.toString();
+ this.chunkedBuffer = true;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void run() {
+ chunkedBuffer = false;
+ String event = null, data = null;
+ while (true) {
+ try {
+ Map.Entry<String, CharBuffer> entry = blockingQueue.take();
+ this.charBuf = entry.getValue();
+ while (this.charBuf.hasRemaining()) {
+ String line = getLine();
+ if (line != null) {
+ if (line.startsWith("event")) {
+ event = line.split(":", 2)[1];
+
+ } else if (line.startsWith("data")) {
+ data = line.split(":", 2)[1];
+ if (event != null && data != null) {
+ NBEventWorkerManager.getInstance()
+ .send(
+ new AbstractMap.SimpleEntry<>(
+ entry.getKey(),
+ new AbstractMap.SimpleEntry<>(event, data)
+ )
+ );
+ }
+ event = null;
+ data = null;
+ }
+ }
+
+ }
+ } catch (InterruptedException e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ }
+ }
+
+}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/HTTPNotifier.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/HTTPNotifier.java
index 6f10664..d3755ff 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/HTTPNotifier.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/HTTPNotifier.java
@@ -20,20 +20,19 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.http.HttpResponse;
+import org.apache.http.HttpVersion;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicResponseHandler;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
-import org.apache.http.HttpVersion;
import org.json.JSONObject;
import org.onosproject.fpcagent.protocols.DpnNgicCommunicator;
import org.onosproject.restconf.utils.RestconfUtils;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.DefaultYangAutoPrefixNotify;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.yangautoprefixnotify.value.DownlinkDataNotification;
import org.onosproject.yang.model.DefaultModelObjectData;
-import org.onosproject.yang.model.ModelConverter;
import org.onosproject.yang.model.ResourceData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,25 +43,24 @@
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import static org.onosproject.fpcagent.util.FpcUtil.modelConverter;
import static org.onosproject.fpcagent.util.FpcUtil.module;
public class HTTPNotifier implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(HTTPNotifier.class);
private static HTTPNotifier _instance = null;
private final BlockingQueue<Map.Entry<String, DefaultYangAutoPrefixNotify>> blockingQueue;
- public ModelConverter modelConverter = null;
private boolean run;
private ResponseHandler<String> handler = new BasicResponseHandler();
- protected HTTPNotifier(ModelConverter modelConverter) {
+ protected HTTPNotifier() {
this.run = true;
this.blockingQueue = new LinkedBlockingQueue<>();
- this.modelConverter = modelConverter;
}
- public static HTTPNotifier createInstance(ModelConverter modelConverter) {
+ public static HTTPNotifier createInstance() {
if (_instance == null) {
- _instance = new HTTPNotifier(modelConverter);
+ _instance = new HTTPNotifier();
}
return _instance;
}