fixed NBEvent worker
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventClient.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventClient.java
index ec480cb..4728b79 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventClient.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventClient.java
@@ -40,6 +40,7 @@
*/
public void connectToClient(String uri) {
this.clientUri = uri;
+ log.info("here connectToClient");
try {
client.start();
HttpAsyncRequestProducer get = HttpAsyncMethods.createGet(this.clientUri);
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventServer.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventServer.java
index 3313188..ddb7426 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventServer.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/EventServer.java
@@ -66,6 +66,7 @@
}
protected void doPost(HttpServletRequest request, HttpServletResponse response) {
+ log.info("here doPost");
try {
String clientUri = null;
StringBuffer jsonStringBuilder = new StringBuffer();
@@ -75,7 +76,7 @@
while ((line = br.readLine()) != null)
jsonStringBuilder.append(line);
} catch (Exception e) {
- e.printStackTrace();
+ log.error(ExceptionUtils.getFullStackTrace(e));
}
try {
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java
index deba165..df6526a 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java
@@ -40,7 +40,7 @@
try {
server.start();
- log.info("Server Started");
+ log.info("Jetty server started");
server.join();
} catch (Exception e) {
log.error(ExceptionUtils.getFullStackTrace(e));
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java
index 2a7bd54..16c8caf 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java
@@ -21,10 +21,7 @@
import java.util.AbstractMap;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.*;
/**
* Implements a Worker to process event-data pairs sent by the FPC Client
@@ -66,53 +63,56 @@
}
public void open() {
- this.run = true;
- Entry<String, Entry<String, String>> contents = null;
- while (run) {
- try {
- contents = blockingQueue.take();
+ ExecutorService executorService = Executors.newFixedThreadPool(this.poolSize);
+ executorService.submit(() -> {
+ this.run = true;
+ Entry<String, Entry<String, String>> contents = null;
+ while (run) {
+ try {
+ contents = blockingQueue.take();
- // call RESTCONF service
- ObjectMapper mapper = new ObjectMapper();
- ObjectNode node = (ObjectNode) mapper.readTree(contents.getValue().getValue());
- if (contents.getValue().getKey().contains("ietf-dmm-fpcagent:configure")) {
- CompletableFuture<RestconfRpcOutput> output = restconfService.runRpc(
- new URI("/onos/restconf/operations/ietf-dmm-fpcagent:configure"),
- node,
- contents.getKey()
- );
- ConfigureService.blockingQueue.add(
- new AbstractMap.SimpleEntry(
- contents.getKey(),
- "event:application/json;/restconf/operations/ietf-dmm-fpcagent:configure\ndata:" + output.get().output().toString() + "\r\n"
- )
- );
- } else if (contents.getValue().getKey().contains("fpc:register-client")) {
- CompletableFuture<RestconfRpcOutput> output = restconfService.runRpc(
- new URI("/onos/restconf/operations/fpc:register-client"),
- node,
- contents.getKey()
- );
- addClientIdToUriMapping(output.get().output().toString(), contents.getKey());
- ConfigureService.registerClientQueue.add(
- new AbstractMap.SimpleEntry(
- contents.getKey(),
- "event:application/json;/restconf/operations/fpc:register_client\ndata:" + output.get().output().toString() + "\r\n"
- )
- );
- } else if (contents.getValue().getKey().contains("fpc:deregister-client")) {
- CompletableFuture<RestconfRpcOutput> output = restconfService.runRpc(
- new URI("/onos/restconf/operations/fpc:deregister-client"),
- node,
- contents.getKey()
- );
- removeClientIdToUriMapping(contents.getValue().getValue());
+ // call RESTCONF service
+ ObjectMapper mapper = new ObjectMapper();
+ ObjectNode node = (ObjectNode) mapper.readTree(contents.getValue().getValue());
+ if (contents.getValue().getKey().contains("ietf-dmm-fpcagent:configure")) {
+ CompletableFuture<RestconfRpcOutput> output = restconfService.runRpc(
+ new URI("/onos/restconf/operations/ietf-dmm-fpcagent:configure"),
+ node,
+ contents.getKey()
+ );
+ ConfigureService.blockingQueue.add(
+ new AbstractMap.SimpleEntry(
+ contents.getKey(),
+ "event:application/json;/restconf/operations/ietf-dmm-fpcagent:configure\ndata:" + output.get().output().toString() + "\r\n"
+ )
+ );
+ } else if (contents.getValue().getKey().contains("fpc:register-client")) {
+ CompletableFuture<RestconfRpcOutput> output = restconfService.runRpc(
+ new URI("/onos/restconf/operations/fpc:register-client"),
+ node,
+ contents.getKey()
+ );
+ addClientIdToUriMapping(output.get().output().toString(), contents.getKey());
+ ConfigureService.registerClientQueue.add(
+ new AbstractMap.SimpleEntry(
+ contents.getKey(),
+ "event:application/json;/restconf/operations/fpc:register_client\ndata:" + output.get().output().toString() + "\r\n"
+ )
+ );
+ } else if (contents.getValue().getKey().contains("fpc:deregister-client")) {
+ CompletableFuture<RestconfRpcOutput> output = restconfService.runRpc(
+ new URI("/onos/restconf/operations/fpc:deregister-client"),
+ node,
+ contents.getKey()
+ );
+ removeClientIdToUriMapping(contents.getValue().getValue());
+ }
+
+ } catch (Exception e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
}
-
- } catch (Exception e) {
- log.error(ExceptionUtils.getFullStackTrace(e));
}
- }
+ });
}
/**
@@ -149,3 +149,4 @@
this.run = false;
}
}
+
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java
index edd2415..0705f4c 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ParseStream.java
@@ -65,6 +65,7 @@
public void run() {
chunkedBuffer = false;
String event = null, data = null;
+ log.info("ParseStream started");
while (true) {
try {
Map.Entry<String, CharBuffer> entry = blockingQueue.take();