working SSE
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java
index ec79725..dfa7c73 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java
@@ -1062,47 +1062,49 @@
});
} else if (event.subject().manufacturer().equals("cp")) {
String clientId = event.subject().annotations().value("client-id");
- DefaultRegisterClientInput clientInput = clientInfo.get(ClientIdentifier.fromString(clientId));
+ if (clientInfo.keySet().contains(ClientIdentifier.fromString(clientId))) {
+ DefaultRegisterClientInput clientInput = clientInfo.get(ClientIdentifier.fromString(clientId));
- Optional<DefaultTenant> defaultTenant = getTenant(clientInput.tenantId());
- if (defaultTenant.isPresent()) {
- DefaultTenant tenant = defaultTenant.get();
- if (tenant.fpcTopology().dpns() != null) {
- tenant.fpcTopology().dpns().forEach(
- dpn -> {
- DefaultYangAutoPrefixNotify notify = new DefaultYangAutoPrefixNotify();
- notify.notificationId(NotificationId.of(notificationIds.getNewId()));
+ Optional<DefaultTenant> defaultTenant = getTenant(clientInput.tenantId());
+ if (defaultTenant.isPresent()) {
+ DefaultTenant tenant = defaultTenant.get();
+ if (tenant.fpcTopology().dpns() != null) {
+ tenant.fpcTopology().dpns().forEach(
+ dpn -> {
+ DefaultYangAutoPrefixNotify notify = new DefaultYangAutoPrefixNotify();
+ notify.notificationId(NotificationId.of(notificationIds.getNewId()));
- notify.timestamp(BigInteger.valueOf(System.currentTimeMillis()));
- DefaultDpnAvailability availability = new DefaultDpnAvailability();
- availability.availabilityMessageType("Dpn-Availability");
- availability.dpnId(dpn.dpnId());
- availability.dpnGroups(dpn.dpnGroups());
- availability.controlProtocol(dpn.controlProtocol());
- availability.networkId(dpn.networkId());
- availability.nodeId(dpn.nodeId());
- availability.dpnName(dpn.dpnName());
+ notify.timestamp(BigInteger.valueOf(System.currentTimeMillis()));
+ DefaultDpnAvailability availability = new DefaultDpnAvailability();
+ availability.availabilityMessageType("Dpn-Availability");
+ availability.dpnId(dpn.dpnId());
+ availability.dpnGroups(dpn.dpnGroups());
+ availability.controlProtocol(dpn.controlProtocol());
+ availability.networkId(dpn.networkId());
+ availability.nodeId(dpn.nodeId());
+ availability.dpnName(dpn.dpnName());
- switch (event.type()) {
- case DEVICE_UPDATED:
- case DEVICE_ADDED: {
- availability.dpnStatus(DpnStatusEnum.AVAILABLE);
- break;
+ switch (event.type()) {
+ case DEVICE_UPDATED:
+ case DEVICE_ADDED: {
+ availability.dpnStatus(DpnStatusEnum.AVAILABLE);
+ break;
+ }
+ case DEVICE_AVAILABILITY_CHANGED:
+ case DEVICE_REMOVED: {
+ availability.dpnStatus(DpnStatusEnum.UNAVAILABLE);
+ break;
+ }
+ default:
+ throw new RuntimeException("Unknown Device case.");
}
- case DEVICE_AVAILABILITY_CHANGED:
- case DEVICE_REMOVED: {
- availability.dpnStatus(DpnStatusEnum.UNAVAILABLE);
- break;
- }
- default:
- throw new RuntimeException("Unknown Device case.");
+
+ notify.value(availability);
+ // sendNotification(notify, clientInput.clientId());
}
+ );
- notify.value(availability);
- sendNotification(notify, clientInput.clientId());
- }
- );
-
+ }
}
}
}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java
index 8c1de9b..e711024 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java
@@ -301,12 +301,14 @@
* @param client HTTP client to receive message
*/
public static void sendNotification(DefaultYangAutoPrefixNotify notify, ClientIdentifier client) {
- log.info("Sending HTTP notification {} to {}", notify, client);
- HTTPNotifier.getInstance().send(
- new AbstractMap.SimpleEntry<>(
- clientInfo.get(client).endpointUri().toString(),
- notify
- )
- );
+ if (clientInfo.keySet().contains(client)) {
+ log.info("Sending HTTP notification {} to {}", notify, client);
+ HTTPNotifier.getInstance().send(
+ new AbstractMap.SimpleEntry<>(
+ clientInfo.get(client).endpointUri().toString(),
+ notify
+ )
+ );
+ }
}
}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ConfigureService.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ConfigureService.java
index 1c90115..2332bb3 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ConfigureService.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ConfigureService.java
@@ -7,13 +7,7 @@
*/
package org.onosproject.fpcagent.util.eventStream;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.lang.exception.ExceptionUtils;
-import org.onosproject.restconf.utils.RestconfUtils;
-import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.DefaultYangAutoPrefixNotify;
-import org.onosproject.yang.model.DefaultModelObjectData;
-import org.onosproject.yang.model.ResourceData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -24,11 +18,9 @@
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
-import static org.onosproject.fpcagent.util.FpcUtil.modelConverter;
-import static org.onosproject.fpcagent.util.FpcUtil.module;
-
/**
* A servlet context listener that sends response (ConfigureOutput) events to clients.
@@ -37,34 +29,25 @@
private static final Logger log = LoggerFactory.getLogger(ConfigureService.class);
//key = client-id, value = configure-output
- public static BlockingQueue<Map.Entry<String, DefaultYangAutoPrefixNotify>> blockingQueue = new LinkedBlockingQueue<>();
+ public static BlockingQueue<Map.Entry<String, String>> configureQueue = new LinkedBlockingQueue<>();
public static BlockingQueue<Map.Entry<String, String>> registerClientQueue = new LinkedBlockingQueue<>();
public void contextInitialized(ServletContextEvent sce) {
Map<String, AsyncContext> responseStreams = new ConcurrentHashMap<>();
sce.getServletContext().setAttribute("responseStreams", responseStreams);
- Thread thread = new Thread(() -> {
+ Executors.newSingleThreadExecutor().submit(() -> {
String clientUri = null;
AsyncContext asyncContext = null;
while (true) {
try {
- Map.Entry<String, DefaultYangAutoPrefixNotify> entry = blockingQueue.take();
+ Map.Entry<String, String> entry = configureQueue.take();
clientUri = entry.getKey();
- DefaultYangAutoPrefixNotify notify = entry.getValue();
-
- ResourceData dataNode = modelConverter.createDataNode(
- DefaultModelObjectData.builder()
- .addModelObject(notify)
- .build()
- );
- ObjectNode jsonNodes = RestconfUtils.convertDataNodeToJson(module, dataNode.dataNodes().get(0));
- ObjectMapper mapper = new ObjectMapper();
- String value = mapper.writeValueAsString(jsonNodes);
+ String notify = entry.getValue();
asyncContext = responseStreams.get(clientUri);
if (responseStreams.get(clientUri) != null) {
ServletOutputStream out = asyncContext.getResponse().getOutputStream();
- out.write(value.getBytes());
+ out.write(notify.getBytes());
out.flush();
asyncContext.getResponse().flushBuffer();
}
@@ -76,9 +59,8 @@
}
}
});
- thread.start();
- Thread thread1 = new Thread(() -> {
+ Executors.newSingleThreadExecutor().submit(() -> {
String clientUri = null;
AsyncContext asyncContext = null;
while (true) {
@@ -100,8 +82,6 @@
}
}
});
- thread1.start();
-
}
public void contextDestroyed(ServletContextEvent sce) {
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java
index 16c8caf..72f283c 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java
@@ -73,6 +73,7 @@
// call RESTCONF service
ObjectMapper mapper = new ObjectMapper();
+ log.info("NB Event Contents: {}", contents.getValue().getValue());
ObjectNode node = (ObjectNode) mapper.readTree(contents.getValue().getValue());
if (contents.getValue().getKey().contains("ietf-dmm-fpcagent:configure")) {
CompletableFuture<RestconfRpcOutput> output = restconfService.runRpc(
@@ -80,10 +81,11 @@
node,
contents.getKey()
);
- ConfigureService.blockingQueue.add(
+ String replace = output.get().output().toString().replace("ietf-dmm-fpcagent:output", "fpc:output");
+ ConfigureService.configureQueue.add(
new AbstractMap.SimpleEntry(
contents.getKey(),
- "event:application/json;/restconf/operations/ietf-dmm-fpcagent:configure\ndata:" + output.get().output().toString() + "\r\n"
+ "event:application/json;/onos/restconf/operations/ietf-dmm-fpcagent:configure\ndata:" + replace + "\r\n"
)
);
} else if (contents.getValue().getKey().contains("fpc:register-client")) {
@@ -96,7 +98,7 @@
ConfigureService.registerClientQueue.add(
new AbstractMap.SimpleEntry(
contents.getKey(),
- "event:application/json;/restconf/operations/fpc:register_client\ndata:" + output.get().output().toString() + "\r\n"
+ "event:application/json;/onos/restconf/operations/fpc:register-client\ndata:" + output.get().output().toString() + "\r\n"
)
);
} else if (contents.getValue().getKey().contains("fpc:deregister-client")) {
@@ -124,7 +126,8 @@
private void addClientIdToUriMapping(String registerClientOutput, String uri) {
try {
JSONObject registerJSON = new JSONObject(registerClientOutput);
- clientIdToUri.put(Integer.parseInt(registerJSON.getJSONObject("output").getString("client-id")), uri);
+// log.info("registerJSON: {}", registerJSON);
+ clientIdToUri.put(Integer.parseInt(registerJSON.getJSONObject("fpc:output").getString("client-id")), uri);
} catch (JSONException e) {
log.error(ExceptionUtils.getFullStackTrace(e));
}
@@ -138,6 +141,7 @@
private void removeClientIdToUriMapping(String deregisterClientInput) {
try {
JSONObject deregisterJSON = new JSONObject(deregisterClientInput);
+// log.info("deregisterJSON: {}", deregisterJSON);
clientIdToUri.remove(Integer.parseInt(deregisterJSON.getJSONObject("input").getString("client-id")));
} catch (JSONException e) {
log.error(ExceptionUtils.getFullStackTrace(e));
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NotificationServer.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NotificationServer.java
index ccbd364..e448324 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NotificationServer.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NotificationServer.java
@@ -30,11 +30,6 @@
public class NotificationServer extends DefaultServlet {
private static final Logger log = LoggerFactory.getLogger(ResponseServer.class);
- public NotificationServer() {
- super();
- log.info("Notification Server Initialized");
- }
-
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) {
try {
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java
index 7e94ad4..52eeb07 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java
@@ -118,9 +118,8 @@
}
event = null;
data = null;
- }
+ }
}
-
}
} catch (InterruptedException e) {
log.error(ExceptionUtils.getFullStackTrace(e));
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ResponseServer.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ResponseServer.java
index 3594d22..6189501 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ResponseServer.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ResponseServer.java
@@ -32,11 +32,6 @@
private static final Logger log = LoggerFactory.getLogger(ResponseServer.class);
private static ArrayList<String> clientUriList = new ArrayList<>();
- public ResponseServer() {
- super();
- log.info("Response Server Initialized");
- }
-
/**
* Method for stream initialization
*
@@ -49,7 +44,7 @@
try {
HttpSession session = request.getSession();
session.setMaxInactiveInterval(72 * 60 * 60);
- ResponseClient client = new ResponseClient();
+ ResponseService client = new ResponseService();
client.connectToClient(clientUri);
response.setHeader("Content-Type", "text/event-stream");
response.setHeader("Cache-Control", "no-cache, no-store");
@@ -84,7 +79,6 @@
@Override
protected void doPost(HttpServletRequest request, HttpServletResponse response) {
- log.info("here doPost");
try {
String clientUri = null;
StringBuffer jsonStringBuilder = new StringBuffer();
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ResponseClient.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ResponseService.java
similarity index 86%
rename from apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ResponseClient.java
rename to apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ResponseService.java
index 76762d8..7736d51 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ResponseClient.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ResponseService.java
@@ -21,16 +21,14 @@
import java.nio.CharBuffer;
import java.util.AbstractMap;
-import java.util.ArrayList;
/**
* A HTTP client that sends a request to a FPC Client to initiate the request stream.
*/
-public class ResponseClient {
- private static final Logger log = LoggerFactory.getLogger(ResponseClient.class);
+public class ResponseService {
+ private static final Logger log = LoggerFactory.getLogger(ResponseService.class);
private static final CloseableHttpAsyncClient client = HttpAsyncClients.createDefault();
- private static ArrayList<String> clientUriList = new ArrayList<String>();
protected String clientUri;
/**
@@ -40,11 +38,10 @@
*/
public void connectToClient(String uri) {
this.clientUri = uri;
- log.info("here connectToClient");
try {
client.start();
HttpAsyncRequestProducer get = HttpAsyncMethods.createGet(this.clientUri);
- client.execute(get, new MyResponseConsumer(this.clientUri), null);
+ client.execute(get, new RequestStreamConsumer(this.clientUri), null);
} catch (Exception e) {
log.error(ExceptionUtils.getFullStackTrace(e));
}
@@ -53,7 +50,7 @@
/**
* A character consumer to read incoming characters on the request stream
*/
- static class MyResponseConsumer extends AsyncCharConsumer<Boolean> {
+ static class RequestStreamConsumer extends AsyncCharConsumer<Boolean> {
private String clientUri;
/**
@@ -61,7 +58,7 @@
*
* @param clientUri - URI of the FPC Client
*/
- public MyResponseConsumer(String clientUri) {
+ public RequestStreamConsumer(String clientUri) {
this.clientUri = clientUri;
}