RESTful SSE
diff --git a/apps/fpcagent/BUCK b/apps/fpcagent/BUCK
index 9d19b41..2214f2c 100644
--- a/apps/fpcagent/BUCK
+++ b/apps/fpcagent/BUCK
@@ -7,10 +7,12 @@
'//lib:httpcore-osgi',
'//lib:netty',
'//lib:javax.ws.rs-api',
+ '//lib:org.apache.httpcomponents.httpasyncclient-osgi',
'//utils/rest:onlab-rest',
'//core/store/serializers:onos-core-serializers',
'//apps/restconf/utils:onos-apps-restconf-utils',
'//apps/config:onos-apps-config',
+ '//apps/restconf/api:onos-apps-restconf-api',
'//models/fpcagent:onos-models-fpcagent',
'//models/common:onos-models-common',
':zeromq',
@@ -27,6 +29,7 @@
'//apps/fpcagent:onos-apps-fpcagent',
'//lib:httpclient-osgi',
'//lib:httpcore-osgi',
+ '//lib:org.apache.httpcomponents.httpasyncclient-osgi',
]
EXCLUDED_BUNDLES = [
@@ -46,7 +49,7 @@
osgi_jar_with_tests(
deps = COMPILE_DEPS,
test_deps = TEST_DEPS,
- web_context = '/onos/fpcagent',
+ web_context = '/',
api_title = 'FPC REST API',
api_package = 'org.onosproject.fpcagent.rest',
api_version = '1.0',
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
index d7d88e6..02d8037 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
@@ -27,11 +27,15 @@
import org.onosproject.fpcagent.providers.DpnDeviceListener;
import org.onosproject.fpcagent.providers.DpnProviderService;
import org.onosproject.fpcagent.util.ConfigHelper;
+import org.onosproject.fpcagent.util.eventStream.SseManager;
+import org.onosproject.fpcagent.util.eventStream.NBEventWorkerManager;
+import org.onosproject.fpcagent.util.eventStream.ParseStream;
import org.onosproject.fpcagent.workers.HTTPNotifier;
import org.onosproject.fpcagent.workers.ZMQSBPublisherManager;
import org.onosproject.fpcagent.workers.ZMQSBSubscriberManager;
import org.onosproject.net.config.*;
import org.onosproject.net.config.basics.SubjectFactories;
+import org.onosproject.restconf.api.RestconfService;
import org.onosproject.yang.model.ModelConverter;
import org.onosproject.yang.model.RpcRegistry;
import org.slf4j.Logger;
@@ -83,6 +87,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private DpnProviderService dpnProviderService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private RestconfService restconfService;
+
/* Variables */
private IdGenerator notificationIds;
private FpcConfig fpcConfig;
@@ -108,7 +115,10 @@
notificationIds = coreService.getIdGenerator("fpc-notification-ids");
+ SseManager.createInstance().open();
HTTPNotifier.createInstance(modelConverter).open();
+ ParseStream.createInstance().open();
+ NBEventWorkerManager.createInstance(10, restconfService).open();
log.info("FPC Service Started");
}
@@ -125,7 +135,10 @@
started = false;
}
+ SseManager.getInstance().close();
HTTPNotifier.getInstance().close();
+ ParseStream.getInstance().close();
+ NBEventWorkerManager.getInstance().close();
log.info("FPC Service Stopped");
}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java
index ec79725..e082b33 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java
@@ -191,7 +191,7 @@
) throws Exception {
DefaultConfigureOutput configureOutput = new DefaultConfigureOutput();
Collection<Callable<Object>> tasks = new ArrayList<>();
- // Get this clients tenant.
+ // Get this clientUriToHttpClient tenant.
FpcIdentity tenantId = clientInfo.tenantId();
DefaultCommonSuccess defaultCommonSuccess = new DefaultCommonSuccess();
// Load cache for specific tenant.
@@ -355,7 +355,7 @@
) throws Exception {
DefaultConfigureOutput configureOutput = new DefaultConfigureOutput();
Collection<Callable<Object>> tasks = new ArrayList<>();
- // Get this clients tenant.
+ // Get this clientUriToHttpClient tenant.
FpcIdentity tenantId = clientInfo.tenantId();
DefaultCommonSuccess defaultCommonSuccess = new DefaultCommonSuccess();
// Load cache for specific tenant.
@@ -925,7 +925,7 @@
}
// keep information for each client. this can be moved to the DC Store and use Cache.
clientInfo.put(input.clientId(), input);
- // keep clients for each tenant
+ // keep clientUriToHttpClient for each tenant
HashSet<ClientIdentifier> hashSet = tenantInfo.getOrDefault(input.tenantId(), Sets.newHashSet());
hashSet.add(input.clientId());
tenantInfo.put(input.tenantId(), hashSet);
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/rest/FpcWebResource.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/rest/FpcWebResource.java
index 16b891e..52f7be2 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/rest/FpcWebResource.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/rest/FpcWebResource.java
@@ -16,40 +16,150 @@
package org.onosproject.fpcagent.rest;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.impl.nio.client.HttpAsyncClients;
+import org.apache.http.nio.IOControl;
+import org.apache.http.nio.client.methods.AsyncCharConsumer;
+import org.apache.http.nio.client.methods.HttpAsyncMethods;
+import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
+import org.apache.http.protocol.HttpContext;
+import org.onosproject.fpcagent.util.eventStream.SseManager;
+import org.onosproject.fpcagent.util.eventStream.ParseStream;
import org.onosproject.rest.AbstractWebResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.core.MediaType;
-import javax.ws.rs.Consumes;
import javax.ws.rs.core.Response;
+import java.io.InputStream;
+import java.nio.CharBuffer;
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
@Path("/")
public class FpcWebResource extends AbstractWebResource {
- private static final Logger log =
- LoggerFactory.getLogger(FpcWebResource.class);
+ private static final Logger log = LoggerFactory.getLogger(FpcWebResource.class);
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Path("/response")
- public Response getResponse() {
- return Response.ok().build();
+ public Response getResponse(InputStream stream) {
+ JsonNode jsonNode = mapper().valueToTree(stream);
+ JsonNode clientUri = jsonNode.get("client-uri");
+
+ log.info("Received {}", clientUri);
+ if (!clientUri.isMissingNode()) {
+ String uri = clientUri.asText();
+ Map<String, CloseableHttpAsyncClient> clients = SseManager.getInstance().clientUriToHttpClient;
+ if (!clients.containsKey(uri)) {
+ Executors.newSingleThreadExecutor().submit(() -> {
+ try {
+ CloseableHttpAsyncClient client = HttpAsyncClients.createDefault();
+ clients.put(uri, client);
+ HttpAsyncRequestProducer get = HttpAsyncMethods.createGet(uri);
+ client.execute(get, new MyResponseConsumer(uri), null);
+ } catch (Exception e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ });
+ }
+ }
+
+ return Response.ok()
+ .header("Content-Type", "text/event-stream")
+ .header("Cache-Control", "no-cache, no-store")
+ .header("Connection", "keep-alive")
+ .build();
}
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Path("/request")
- public Response getRequest() {
- return Response.ok().build();
+ public Response getRequest(InputStream stream) {
+ JsonNode jsonNode = mapper().valueToTree(stream);
+
+ return Response.ok()
+ .header("Content-Type", "text/event-stream")
+ .header("Cache-Control", "no-cache, no-store")
+ .header("Connection", "keep-alive")
+ .build();
}
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Path("/notification")
- public Response getNotification() {
- return Response.ok().build();
+ public Response getNotification(InputStream stream) {
+ JsonNode jsonNode = mapper().valueToTree(stream);
+ JsonNode clientId = jsonNode.get("client-id");
+
+ log.info("Received {}", clientId);
+ if (!clientId.isMissingNode()) {
+ String uri = clientId.asText();
+ Map<String, CloseableHttpAsyncClient> clients = SseManager.getInstance().clientUriToHttpClient;
+ if (!clients.containsKey(uri)) {
+ Executors.newSingleThreadExecutor().submit(() -> {
+ try {
+ CloseableHttpAsyncClient client = HttpAsyncClients.createDefault();
+ clients.put(uri, client);
+ HttpAsyncRequestProducer get = HttpAsyncMethods.createGet(uri);
+ client.execute(get, new MyResponseConsumer(uri), null);
+ } catch (Exception e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ });
+ }
+ }
+
+ return Response.ok()
+ .header("Content-Type", "text/event-stream")
+ .header("Cache-Control", "no-cache, no-store")
+ .header("Connection", "keep-alive")
+ .build();
+ }
+
+ /**
+ * A character consumer to read incoming characters on the request stream
+ */
+ static class MyResponseConsumer extends AsyncCharConsumer<Boolean> {
+ private String clientUri;
+
+ /**
+ * Constructor
+ *
+ * @param clientUri - URI of the FPC Client
+ */
+ public MyResponseConsumer(String clientUri) {
+ this.clientUri = clientUri;
+ }
+
+ @Override
+ protected void onResponseReceived(final HttpResponse response) {
+ }
+
+ @Override
+ protected void onCharReceived(final CharBuffer buf, final IOControl ioctrl) {
+ char[] charArray = new char[buf.remaining()];
+ System.arraycopy(buf.array(), 0, charArray, 0, buf.remaining());
+ CharBuffer charBuffer = CharBuffer.wrap(charArray);
+ ParseStream.getInstance().send(new AbstractMap.SimpleEntry<>(clientUri, charBuffer));
+ }
+
+ @Override
+ protected void releaseResources() {
+ }
+
+ @Override
+ protected Boolean buildResult(final HttpContext context) {
+ return Boolean.TRUE;
+ }
+
}
}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/ConfigHelper.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/ConfigHelper.java
index 20868af..5b1c8e6 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/ConfigHelper.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/ConfigHelper.java
@@ -17,7 +17,7 @@
"mobilityupdate-ms",
"activation-threads",
"target-read-limit",
- "http-notifier-clients",
+ "http-notifier-clientUriToHttpClient",
"zmq-nbi-server-poolsize",
"zmq-nbi-server-uri",
"zmq-nbi-inproc-uri",
@@ -51,7 +51,7 @@
private Integer activationThreads;
@JsonProperty("target-read-limit")
private Integer targetReadLimit;
- @JsonProperty("http-notifier-clients")
+ @JsonProperty("http-notifier-clientUriToHttpClient")
private Integer httpNotifierClients;
@JsonProperty("zmq-nbi-server-poolsize")
private Integer zmqNbiServerPoolsize;
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java
new file mode 100644
index 0000000..467575f
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright © 2016 - 2017 Copyright (c) Sprint, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.onosproject.fpcagent.util.eventStream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.onosproject.restconf.api.RestconfRpcOutput;
+import org.onosproject.restconf.api.RestconfService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.*;
+
+/**
+ * Implements a Worker to process event-data pairs sent by the FPC Client
+ */
+public class NBEventWorkerManager implements AutoCloseable {
+ private static final Logger log = LoggerFactory.getLogger(NBEventWorkerManager.class);
+ private static NBEventWorkerManager _instance = null;
+
+ public static Map<Integer, String> clientIdToUri = new ConcurrentHashMap<Integer, String>();
+
+ private final BlockingQueue<Entry<String, Entry<String, String>>> blockingQueue;
+ private final int poolSize;
+ private boolean run;
+ private final RestconfService restconfService;
+
+ protected NBEventWorkerManager(int poolSize, RestconfService restconfService) {
+ this.poolSize = poolSize;
+ this.restconfService = restconfService;
+ this.blockingQueue = new LinkedBlockingQueue<>();
+ }
+
+ public static NBEventWorkerManager createInstance(int poolSize, RestconfService restconfService) {
+ if (_instance == null) {
+ _instance = new NBEventWorkerManager(poolSize, restconfService);
+ }
+ return _instance;
+ }
+
+ public static NBEventWorkerManager getInstance() {
+ return _instance;
+ }
+
+ public void send(Entry<String, Entry<String, String>> buf) {
+ try {
+ blockingQueue.put(buf);
+ } catch (InterruptedException e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ }
+
+ public void open() {
+ Executors.newFixedThreadPool(this.poolSize).submit(() -> {
+ this.run = true;
+ Entry<String, Entry<String, String>> contents = null;
+ while (run) {
+ try {
+ contents = blockingQueue.take();
+
+ // call RESTCONF service
+ ObjectMapper mapper = new ObjectMapper();
+ ObjectNode node = (ObjectNode) mapper.readTree(contents.getValue().getValue());
+
+ log.info("Handling {}", node);
+
+ if (contents.getValue().getKey().contains("ietf-dmm-fpcagent:configure")) {
+ CompletableFuture<RestconfRpcOutput> output = restconfService.runRpc(
+ new URI("/onos/restconf/operations/ietf-dmm-fpcagent:configure"),
+ node,
+ contents.getKey()
+ );
+ SseManager.getInstance().configureQueue.add(
+ new AbstractMap.SimpleEntry(
+ contents.getKey(),
+ "event:application/json;/restconf/operations/ietf-dmm-fpcagent:configure\ndata:" + output.get().output().toString() + "\r\n"
+ )
+ );
+ } else if (contents.getValue().getKey().contains("fpc:register-client")) {
+ CompletableFuture<RestconfRpcOutput> output = restconfService.runRpc(
+ new URI("/onos/restconf/operations/fpc:register-client"),
+ node,
+ contents.getKey()
+ );
+ addClientIdToUriMapping(output.get().output().toString(), contents.getKey());
+ SseManager.getInstance().registerClientQueue.add(
+ new AbstractMap.SimpleEntry(
+ contents.getKey(),
+ "event:application/json;/restconf/operations/fpc:register_client\ndata:" + output.get().output().toString() + "\r\n"
+ )
+ );
+ } else if (contents.getValue().getKey().contains("fpc:deregister-client")) {
+ CompletableFuture<RestconfRpcOutput> output = restconfService.runRpc(
+ new URI("/onos/restconf/operations/fpc:deregister-client"),
+ node,
+ contents.getKey()
+ );
+ removeClientIdToUriMapping(contents.getValue().getValue());
+ }
+
+ } catch (Exception e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ }
+ });
+ }
+
+ /**
+ * Add a FPC client URI to FPC client id mapping
+ *
+ * @param registerClientOutput - Output of the register_cleint rpc
+ * @param uri - FPC Client URI
+ */
+ private void addClientIdToUriMapping(String registerClientOutput, String uri) {
+ try {
+ JSONObject registerJSON = new JSONObject(registerClientOutput);
+ clientIdToUri.put(Integer.parseInt(registerJSON.getJSONObject("output").getString("client-id")), uri);
+ } catch (JSONException e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ }
+
+ /**
+ * Remove a FPC client URI to FPCClient Id mapping
+ *
+ * @param deregisterClientInput - Input String of deregister_client rpc
+ */
+ private void removeClientIdToUriMapping(String deregisterClientInput) {
+ try {
+ JSONObject deregisterJSON = new JSONObject(deregisterClientInput);
+ clientIdToUri.remove(Integer.parseInt(deregisterJSON.getJSONObject("input").getString("client-id")));
+ } catch (JSONException e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ }
+
+ @Override
+ public void close() {
+ this.run = false;
+ }
+}
+
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java
new file mode 100644
index 0000000..7e94ad4
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java
@@ -0,0 +1,136 @@
+/*
+ * Copyright © 2016 - 2017 Copyright (c) Sprint, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.onosproject.fpcagent.util.eventStream;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.CharBuffer;
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * A class that parses the request stream and enqueues creates event-data pairs for processing
+ */
+public class ParseStream implements AutoCloseable {
+ private static final Logger log = LoggerFactory.getLogger(ParseStream.class);
+ private BlockingQueue<Map.Entry<String, CharBuffer>> blockingQueue;
+ private CharBuffer charBuf;
+ private String partialChunk; //Chunked data left over at the end of the buffer
+ private boolean chunkedBuffer;
+ private boolean run;
+
+ private static ParseStream _instance;
+
+ private ParseStream() {
+ blockingQueue = new LinkedBlockingQueue<>();
+ }
+
+ public static ParseStream createInstance() {
+ if (_instance == null) {
+ _instance = new ParseStream();
+ }
+ return _instance;
+ }
+
+ public static ParseStream getInstance() {
+ return _instance;
+ }
+
+ public void send(Map.Entry<String, CharBuffer> buf) {
+ try {
+ blockingQueue.put(buf);
+ } catch (InterruptedException e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ }
+
+ private String getLine() {
+ StringBuilder s = new StringBuilder();
+ if (this.chunkedBuffer) {
+ while (this.charBuf.hasRemaining()) {
+ char ch = this.charBuf.get();
+ if (ch == '\n') {
+ this.partialChunk += s.toString();
+ this.chunkedBuffer = false;
+ return this.partialChunk;
+ }
+ if (ch == '\r') {
+ continue;
+ }
+ s.append(ch);
+ }
+ }
+ while (this.charBuf.hasRemaining()) {
+ char ch = this.charBuf.get();
+ if (ch == '\n') {
+ String line = s.toString();
+ return line.length() > 0 ? line : null;
+ }
+ if (ch == '\r') {
+ continue;
+ }
+ s.append(ch);
+ if (!this.charBuf.hasRemaining()) {
+ this.partialChunk = s.toString();
+ this.chunkedBuffer = true;
+ }
+ }
+ return null;
+ }
+
+ public void open() {
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ executorService.submit(() -> {
+ run = true;
+ chunkedBuffer = false;
+ String event = null, data = null;
+ while (run) {
+ try {
+ Map.Entry<String, CharBuffer> entry = blockingQueue.take();
+ this.charBuf = entry.getValue();
+ while (this.charBuf.hasRemaining()) {
+ String line = getLine();
+ if (line != null) {
+ if (line.startsWith("event")) {
+ event = line.split(":", 2)[1];
+
+ } else if (line.startsWith("data")) {
+ data = line.split(":", 2)[1];
+ if (event != null && data != null) {
+ NBEventWorkerManager.getInstance()
+ .send(
+ new AbstractMap.SimpleEntry<>(
+ entry.getKey(),
+ new AbstractMap.SimpleEntry<>(event, data)
+ )
+ );
+ }
+ event = null;
+ data = null;
+ }
+ }
+
+ }
+ } catch (InterruptedException e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ }
+ });
+ }
+
+ @Override
+ public void close() {
+ run = false;
+ }
+}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/SseManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/SseManager.java
new file mode 100644
index 0000000..aa2dc53
--- /dev/null
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/SseManager.java
@@ -0,0 +1,153 @@
+/*
+ * Copyright © 2016 - 2017 Copyright (c) Sprint, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.onosproject.fpcagent.util.eventStream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.nio.client.methods.HttpAsyncMethods;
+import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
+import org.onosproject.restconf.utils.RestconfUtils;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.DefaultYangAutoPrefixNotify;
+import org.onosproject.yang.model.DefaultModelObjectData;
+import org.onosproject.yang.model.ResourceData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.onosproject.fpcagent.util.FpcUtil.modelConverter;
+import static org.onosproject.fpcagent.util.FpcUtil.module;
+
+
+/**
+ * A servlet context listener that sends response (ConfigureOutput) events to clientUriToHttpClient.
+ */
+public class SseManager implements AutoCloseable {
+ private static final Logger log = LoggerFactory.getLogger(SseManager.class);
+
+ //key = client-id, value = configure-output
+ public BlockingQueue<Map.Entry<String, DefaultYangAutoPrefixNotify>> configureQueue;
+ public BlockingQueue<Map.Entry<String, String>> registerClientQueue;
+ public BlockingQueue<Map.Entry<String, String>> notificationQueue;
+
+ public Map<String, CloseableHttpAsyncClient> clientUriToHttpClient;
+ public Map<String, CloseableHttpAsyncClient> clientIdToHttpClient;
+ private static SseManager _instance;
+
+ private boolean run;
+
+ private SseManager() {
+ configureQueue = new LinkedBlockingQueue<>();
+ registerClientQueue = new LinkedBlockingQueue<>();
+ notificationQueue = new LinkedBlockingQueue<>();
+ clientUriToHttpClient = Maps.newConcurrentMap();
+ clientIdToHttpClient = Maps.newConcurrentMap();
+ }
+
+ public static SseManager createInstance() {
+ if (_instance == null) {
+ _instance = new SseManager();
+ }
+ return _instance;
+ }
+
+ public static SseManager getInstance() {
+ return _instance;
+ }
+
+ public void open() {
+ run = true;
+ Executors.newSingleThreadExecutor().submit(() -> {
+ String clientUri = null;
+ while (run) {
+ try {
+ Map.Entry<String, DefaultYangAutoPrefixNotify> entry = configureQueue.take();
+ clientUri = entry.getKey();
+ DefaultYangAutoPrefixNotify notify = entry.getValue();
+
+ ResourceData dataNode = modelConverter.createDataNode(
+ DefaultModelObjectData.builder()
+ .addModelObject(notify)
+ .build()
+ );
+ ObjectNode jsonNodes = RestconfUtils.convertDataNodeToJson(module, dataNode.dataNodes().get(0));
+ ObjectMapper mapper = new ObjectMapper();
+ String value = mapper.writeValueAsString(jsonNodes);
+
+ if (clientUriToHttpClient.containsKey(clientUri)) {
+ CloseableHttpAsyncClient client = clientUriToHttpClient.get(clientUri);
+ HttpAsyncRequestProducer get = HttpAsyncMethods
+ .createPost(clientUri, value.getBytes(), ContentType.TEXT_PLAIN);
+
+ client.execute(get, null, null);
+ }
+ } catch (Exception e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ clientUriToHttpClient.remove(clientUri);
+ break;
+ }
+ }
+ });
+
+ Executors.newSingleThreadExecutor().submit(() -> {
+ String clientUri = null;
+ while (run) {
+ try {
+ Map.Entry<String, String> entry = registerClientQueue.take();
+ clientUri = entry.getKey();
+
+ if (clientUriToHttpClient.containsKey(clientUri)) {
+ CloseableHttpAsyncClient client = clientUriToHttpClient.get(clientUri);
+ HttpAsyncRequestProducer get = HttpAsyncMethods
+ .createPost(clientUri, entry.getValue().getBytes(), ContentType.TEXT_PLAIN);
+ client.execute(get, null, null);
+ }
+ } catch (Exception e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ clientUriToHttpClient.remove(clientUri);
+ break;
+ }
+ }
+
+ });
+
+ Executors.newSingleThreadExecutor().submit(() -> {
+ String clientId = null;
+ while (run) {
+ try {
+ Map.Entry<String, String> entry = notificationQueue.take();
+ clientId = entry.getKey();
+
+ if (clientIdToHttpClient.containsKey(clientId)) {
+ CloseableHttpAsyncClient client = clientIdToHttpClient.get(clientId);
+ HttpAsyncRequestProducer get = HttpAsyncMethods
+ .createPost(clientId, entry.getValue().getBytes(), ContentType.TEXT_PLAIN);
+ client.execute(get, null, null);
+ }
+ } catch (Exception e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ clientIdToHttpClient.remove(clientId);
+ break;
+ }
+ }
+
+ });
+ }
+
+ @Override
+ public void close() {
+ run = false;
+ }
+}
\ No newline at end of file