Merge branch 'master' of ssh://gerrit.opencord.org:29418/fpcagent
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
index a44e12b..a5a456f 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcManager.java
@@ -30,7 +30,6 @@
import org.onosproject.fpcagent.util.eventStream.JettyServer;
import org.onosproject.fpcagent.util.eventStream.NBEventWorkerManager;
import org.onosproject.fpcagent.util.eventStream.ParseStream;
-import org.onosproject.fpcagent.workers.HTTPNotifier;
import org.onosproject.fpcagent.workers.ZMQSBPublisherManager;
import org.onosproject.fpcagent.workers.ZMQSBSubscriberManager;
import org.onosproject.net.config.*;
@@ -95,8 +94,6 @@
private FpcConfig fpcConfig;
private boolean started = false;
private HashSet<DpnDeviceListener> listeners = Sets.newHashSet();
- private Thread jettyServer;
- private Thread parseStream;
/* Config */
private ConfigFactory<ApplicationId, FpcConfig> fpcConfigConfigFactory =
@@ -120,7 +117,6 @@
JettyServer.createInstance().open();
ParseStream.createInstance().open();
NBEventWorkerManager.createInstance(20, restconfService).open();
- HTTPNotifier.createInstance().open();
log.info("FPC Service Started");
}
@@ -138,7 +134,6 @@
}
NBEventWorkerManager.getInstance().close();
- HTTPNotifier.getInstance().close();
ParseStream.getInstance().close();
JettyServer.getInstance().close();
@@ -156,7 +151,8 @@
helper.dpnSubscriberUri(),
helper.nodeId(),
helper.networkId(),
- dpnProviderService.getListener()
+ dpnProviderService.getListener(),
+ notificationIds
);
ZMQSBPublisherManager.createInstance(
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java
index dfa7c73..cef7592 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java
@@ -260,7 +260,7 @@
Instr3GppMob instr3GppMob = (Instr3GppMob) instructions.instrType();
String commands = Bits.toString(instr3GppMob.instr3GppMob().bits());
- log.info("handling configure create event {} for {}", commands, dpn.dpnId());
+ log.debug("handling configure create event {} for {}", commands, dpn.dpnId());
Ip4Address s1u_enodeb_ipv4 = Ip4Address.valueOf(context.ul().tunnelLocalAddress().toString()),
s1u_sgw_ipv4 = Ip4Address.valueOf(context.ul().tunnelRemoteAddress().toString());
@@ -419,7 +419,7 @@
Instr3GppMob instr3GppMob = (Instr3GppMob) instructions.instrType();
String commands = Bits.toString(instr3GppMob.instr3GppMob().bits());
- log.info("handling configure update event {} for {}", commands, dpn.dpnId());
+ log.debug("handling configure update event {} for {}", commands, dpn.dpnId());
Ip4Address s1u_enodeb_ipv4 = Ip4Address.valueOf(context.ul().tunnelLocalAddress().toString()),
s1u_sgw_ipv4 = Ip4Address.valueOf(context.ul().tunnelRemoteAddress().toString());
@@ -527,7 +527,7 @@
throw new RuntimeException("Context doesn't exist. Please issue create operation..");
}
- log.info("handling configure-delete {}", targetStr);
+ log.debug("handling configure-delete {}", targetStr);
DefaultContexts context = defaultContexts.get();
for (Dpns dpn : context.dpns()) {
@@ -764,7 +764,6 @@
try {
for (ModelObject modelObject : getModelObjects(rpcInput.data(), configure)) {
DefaultConfigureInput input = (DefaultConfigureInput) modelObject;
- log.info("configure event {}", input.clientId());
if (!clientInfo.containsKey(input.clientId())) {
throw new RuntimeException("Client Identifier is not registered.");
}
@@ -824,7 +823,6 @@
try {
for (ModelObject modelObject : getModelObjects(rpcInput.data(), configureBundles)) {
DefaultConfigureBundlesInput input = (DefaultConfigureBundlesInput) modelObject;
- log.info("configure-bundles event {}", input.clientId());
if (!clientInfo.containsKey(input.clientId())) {
throw new RuntimeException("Client Identifier is not registered.");
}
@@ -919,7 +917,6 @@
try {
for (ModelObject modelObject : getModelObjects(rpcInput.data(), registerClient)) {
DefaultRegisterClientInput input = (DefaultRegisterClientInput) modelObject;
- log.info("register-client event {}", input.clientId());
if (clientInfo.containsKey(input.clientId())) {
throw new RuntimeException("Client already registered.");
}
@@ -975,7 +972,6 @@
try {
for (ModelObject modelObject : getModelObjects(rpcInput.data(), registerClient)) {
DefaultRegisterClientInput input = (DefaultRegisterClientInput) modelObject;
- log.info("deregister-client event {}", input.clientId());
if (!clientInfo.containsKey(input.clientId())) {
throw new RuntimeException("Client does not exist.");
}
@@ -1053,61 +1049,62 @@
}
notify.value(availability);
- clients.forEach(client -> sendNotification(notify, client));
+ clients.forEach(client -> sendNotification(client.toString(), notify));
}
);
} catch (ExecutionException e) {
log.error(ExceptionUtils.getFullStackTrace(e));
}
});
- } else if (event.subject().manufacturer().equals("cp")) {
- String clientId = event.subject().annotations().value("client-id");
- if (clientInfo.keySet().contains(ClientIdentifier.fromString(clientId))) {
- DefaultRegisterClientInput clientInput = clientInfo.get(ClientIdentifier.fromString(clientId));
-
- Optional<DefaultTenant> defaultTenant = getTenant(clientInput.tenantId());
- if (defaultTenant.isPresent()) {
- DefaultTenant tenant = defaultTenant.get();
- if (tenant.fpcTopology().dpns() != null) {
- tenant.fpcTopology().dpns().forEach(
- dpn -> {
- DefaultYangAutoPrefixNotify notify = new DefaultYangAutoPrefixNotify();
- notify.notificationId(NotificationId.of(notificationIds.getNewId()));
-
- notify.timestamp(BigInteger.valueOf(System.currentTimeMillis()));
- DefaultDpnAvailability availability = new DefaultDpnAvailability();
- availability.availabilityMessageType("Dpn-Availability");
- availability.dpnId(dpn.dpnId());
- availability.dpnGroups(dpn.dpnGroups());
- availability.controlProtocol(dpn.controlProtocol());
- availability.networkId(dpn.networkId());
- availability.nodeId(dpn.nodeId());
- availability.dpnName(dpn.dpnName());
-
- switch (event.type()) {
- case DEVICE_UPDATED:
- case DEVICE_ADDED: {
- availability.dpnStatus(DpnStatusEnum.AVAILABLE);
- break;
- }
- case DEVICE_AVAILABILITY_CHANGED:
- case DEVICE_REMOVED: {
- availability.dpnStatus(DpnStatusEnum.UNAVAILABLE);
- break;
- }
- default:
- throw new RuntimeException("Unknown Device case.");
- }
-
- notify.value(availability);
- // sendNotification(notify, clientInput.clientId());
- }
- );
-
- }
- }
- }
}
+// else if (event.subject().manufacturer().equals("cp")) {
+// String clientId = event.subject().annotations().value("client-id");
+// if (clientInfo.keySet().contains(ClientIdentifier.fromString(clientId))) {
+// DefaultRegisterClientInput clientInput = clientInfo.get(ClientIdentifier.fromString(clientId));
+//
+// Optional<DefaultTenant> defaultTenant = getTenant(clientInput.tenantId());
+// if (defaultTenant.isPresent()) {
+// DefaultTenant tenant = defaultTenant.get();
+// if (tenant.fpcTopology().dpns() != null) {
+// tenant.fpcTopology().dpns().forEach(
+// dpn -> {
+// DefaultYangAutoPrefixNotify notify = new DefaultYangAutoPrefixNotify();
+// notify.notificationId(NotificationId.of(notificationIds.getNewId()));
+//
+// notify.timestamp(BigInteger.valueOf(System.currentTimeMillis()));
+// DefaultDpnAvailability availability = new DefaultDpnAvailability();
+// availability.availabilityMessageType("Dpn-Availability");
+// availability.dpnId(dpn.dpnId());
+// availability.dpnGroups(dpn.dpnGroups());
+// availability.controlProtocol(dpn.controlProtocol());
+// availability.networkId(dpn.networkId());
+// availability.nodeId(dpn.nodeId());
+// availability.dpnName(dpn.dpnName());
+//
+// switch (event.type()) {
+// case DEVICE_UPDATED:
+// case DEVICE_ADDED: {
+// availability.dpnStatus(DpnStatusEnum.AVAILABLE);
+// break;
+// }
+// case DEVICE_AVAILABILITY_CHANGED:
+// case DEVICE_REMOVED: {
+// availability.dpnStatus(DpnStatusEnum.UNAVAILABLE);
+// break;
+// }
+// default:
+// throw new RuntimeException("Unknown Device case.");
+// }
+//
+// notify.value(availability);
+// sendNotification(clientInput.clientId().toString(), notify);
+// }
+// );
+//
+// }
+// }
+// }
+// }
}
}
}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java
index e711024..18a38c9 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java
@@ -16,17 +16,20 @@
package org.onosproject.fpcagent.util;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Maps;
+import org.apache.commons.lang.exception.ExceptionUtils;
import org.onosproject.config.DynamicConfigService;
import org.onosproject.config.Filter;
import org.onosproject.core.IdGenerator;
-import org.onosproject.fpcagent.workers.HTTPNotifier;
+import org.onosproject.fpcagent.util.eventStream.ConfigureService;
import org.onosproject.net.Device;
import org.onosproject.net.device.DeviceStore;
+import org.onosproject.restconf.utils.RestconfUtils;
import org.onosproject.yang.gen.v1.fpc.rev20150105.fpc.registerclient.DefaultRegisterClientInput;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.ClientIdentifier;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.DefaultTenants;
-import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.DefaultYangAutoPrefixNotify;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.tenants.DefaultTenant;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.tenants.TenantKeys;
import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.FpcIdentity;
@@ -294,21 +297,26 @@
);
}
- /**
- * Sends a HTTP Post request to specified client.
- *
- * @param notify message
- * @param client HTTP client to receive message
- */
- public static void sendNotification(DefaultYangAutoPrefixNotify notify, ClientIdentifier client) {
- if (clientInfo.keySet().contains(client)) {
- log.info("Sending HTTP notification {} to {}", notify, client);
- HTTPNotifier.getInstance().send(
- new AbstractMap.SimpleEntry<>(
- clientInfo.get(client).endpointUri().toString(),
- notify
+ public static void sendNotification(String clientId, ModelObject notify) {
+ try {
+ ResourceData dataNode = modelConverter.createDataNode(
+ DefaultModelObjectData.builder()
+ .addModelObject(notify)
+ .build()
+ );
+ ObjectNode jsonNodes = RestconfUtils.convertDataNodeToJson(module, dataNode.dataNodes().get(0));
+ ObjectMapper mapper = new ObjectMapper();
+
+ String value = mapper.writeValueAsString(jsonNodes);
+
+ ConfigureService.notificationQueue.add(
+ new AbstractMap.SimpleEntry(
+ clientId,
+ "event:application/json;/notification\ndata:" + value + "\r\n"
)
);
+ } catch (Exception e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
}
}
}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ConfigureService.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ConfigureService.java
index 2332bb3..b9aa3a5 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ConfigureService.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ConfigureService.java
@@ -29,23 +29,28 @@
private static final Logger log = LoggerFactory.getLogger(ConfigureService.class);
//key = client-id, value = configure-output
- public static BlockingQueue<Map.Entry<String, String>> configureQueue = new LinkedBlockingQueue<>();
- public static BlockingQueue<Map.Entry<String, String>> registerClientQueue = new LinkedBlockingQueue<>();
+ public static BlockingQueue<Map.Entry<String, String>> responseQueue = new LinkedBlockingQueue<>();
+ //key = client id, value = config-result-notification
+ public static BlockingQueue<Map.Entry<String, String>> notificationQueue = new LinkedBlockingQueue<>();
public void contextInitialized(ServletContextEvent sce) {
Map<String, AsyncContext> responseStreams = new ConcurrentHashMap<>();
+ Map<String, AsyncContext> notificationStreams = new ConcurrentHashMap<>();
+
+ sce.getServletContext().setAttribute("notificationStreams", notificationStreams);
sce.getServletContext().setAttribute("responseStreams", responseStreams);
+
Executors.newSingleThreadExecutor().submit(() -> {
String clientUri = null;
AsyncContext asyncContext = null;
while (true) {
try {
- Map.Entry<String, String> entry = configureQueue.take();
+ Map.Entry<String, String> entry = responseQueue.take();
clientUri = entry.getKey();
String notify = entry.getValue();
-
asyncContext = responseStreams.get(clientUri);
if (responseStreams.get(clientUri) != null) {
+ log.info("Sending on /response {}", notify);
ServletOutputStream out = asyncContext.getResponse().getOutputStream();
out.write(notify.getBytes());
out.flush();
@@ -61,23 +66,26 @@
});
Executors.newSingleThreadExecutor().submit(() -> {
- String clientUri = null;
+ String clientId = null;
AsyncContext asyncContext = null;
while (true) {
try {
- Map.Entry<String, String> entry = registerClientQueue.take();
- clientUri = entry.getKey();
- asyncContext = responseStreams.get(clientUri);
- if (responseStreams.get(entry.getKey()) != null) {
+ Map.Entry<String, String> entry = notificationQueue.take();
+ clientId = entry.getKey();
+ String notify = entry.getValue();
+ asyncContext = notificationStreams.get(clientId);
+ log.info("clientId {}, streams {}", clientId, notificationStreams.keySet());
+ if (notificationStreams.get(clientId) != null) {
+ log.info("Sending on /notification {}", notify);
ServletOutputStream out = asyncContext.getResponse().getOutputStream();
- out.write(entry.getValue().getBytes());
+ out.write(notify.getBytes());
out.flush();
asyncContext.getResponse().flushBuffer();
}
} catch (Exception e) {
log.error(ExceptionUtils.getFullStackTrace(e));
asyncContext.complete();
- responseStreams.remove(clientUri);
+ notificationStreams.remove(clientId);
break;
}
}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java
index e6d11d1..5b243f6 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/JettyServer.java
@@ -53,7 +53,6 @@
server.setHandler(context);
context.addEventListener(new ConfigureService());
- context.addEventListener(new NotificationService());
try {
server.start();
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java
index 72f283c..879f989 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NBEventWorkerManager.java
@@ -73,7 +73,7 @@
// call RESTCONF service
ObjectMapper mapper = new ObjectMapper();
- log.info("NB Event Contents: {}", contents.getValue().getValue());
+ log.debug("NB Event Contents: {}", contents.getValue().getValue());
ObjectNode node = (ObjectNode) mapper.readTree(contents.getValue().getValue());
if (contents.getValue().getKey().contains("ietf-dmm-fpcagent:configure")) {
CompletableFuture<RestconfRpcOutput> output = restconfService.runRpc(
@@ -82,7 +82,7 @@
contents.getKey()
);
String replace = output.get().output().toString().replace("ietf-dmm-fpcagent:output", "fpc:output");
- ConfigureService.configureQueue.add(
+ ConfigureService.responseQueue.add(
new AbstractMap.SimpleEntry(
contents.getKey(),
"event:application/json;/onos/restconf/operations/ietf-dmm-fpcagent:configure\ndata:" + replace + "\r\n"
@@ -95,7 +95,7 @@
contents.getKey()
);
addClientIdToUriMapping(output.get().output().toString(), contents.getKey());
- ConfigureService.registerClientQueue.add(
+ ConfigureService.responseQueue.add(
new AbstractMap.SimpleEntry(
contents.getKey(),
"event:application/json;/onos/restconf/operations/fpc:register-client\ndata:" + output.get().output().toString() + "\r\n"
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NotificationServer.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NotificationServer.java
index e448324..28ba759 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NotificationServer.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NotificationServer.java
@@ -31,18 +31,6 @@
private static final Logger log = LoggerFactory.getLogger(ResponseServer.class);
@Override
- protected void doGet(HttpServletRequest request, HttpServletResponse response) {
- try {
- response.setContentType("text/html");
- response.setStatus(HttpServletResponse.SC_OK);
- response.getWriter().println("<h1>Hello World</h1>");
- response.getWriter().println("session=" + request.getSession(true).getId());
- } catch (IOException e) {
- log.error(ExceptionUtils.getFullStackTrace(e));
- }
- }
-
- @Override
protected void doPost(HttpServletRequest request, HttpServletResponse response) {
log.info("Notification stream Inititated");
String clientId = null;
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NotificationService.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NotificationService.java
deleted file mode 100644
index 13edc1e..0000000
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/NotificationService.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright © 2016 - 2017 Copyright (c) Sprint, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.onosproject.fpcagent.util.eventStream;
-
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.servlet.AsyncContext;
-import javax.servlet.ServletContextEvent;
-import javax.servlet.ServletContextListener;
-import javax.servlet.ServletOutputStream;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-
-
-/**
- * A servlet context listener that sends event-data pairs for config result notifications
- */
-public class NotificationService implements ServletContextListener {
- private static final Logger log = LoggerFactory.getLogger(NotificationService.class);
-
- //key = client id, value = config-result-notification
- public static BlockingQueue<Map.Entry<String, String>> blockingQueue = new LinkedBlockingQueue<Map.Entry<String, String>>();
-
- public void contextInitialized(ServletContextEvent sce) {
- Map<String, AsyncContext> notificationStreams = new ConcurrentHashMap<String, AsyncContext>();
- sce.getServletContext().setAttribute("notificationStreams", notificationStreams);
- Thread thread = new Thread(new Runnable() {
-
- @Override
- public void run() {
- String clientId = null;
- AsyncContext asyncContext = null;
- while (true) {
- try {
- Map.Entry<String, String> entry = blockingQueue.take();
- clientId = entry.getKey();
- asyncContext = notificationStreams.get(clientId);
- if (notificationStreams.get(entry.getKey()) != null) {
- ServletOutputStream out = asyncContext.getResponse().getOutputStream();
- out.write(entry.getValue().getBytes());
- out.flush();
- asyncContext.getResponse().flushBuffer();
- }
- } catch (Exception e) {
- log.error(ExceptionUtils.getFullStackTrace(e));
- asyncContext.complete();
- notificationStreams.remove(clientId);
- break;
- }
- }
- }
-
- });
- thread.start();
-
- }
-
- public void contextDestroyed(ServletContextEvent sce) {
- }
-}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ResponseService.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/RequestServer.java
similarity index 97%
rename from apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ResponseService.java
rename to apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/RequestServer.java
index 7736d51..9715053 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ResponseService.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/RequestServer.java
@@ -25,8 +25,8 @@
/**
* A HTTP client that sends a request to a FPC Client to initiate the request stream.
*/
-public class ResponseService {
- private static final Logger log = LoggerFactory.getLogger(ResponseService.class);
+public class RequestServer {
+ private static final Logger log = LoggerFactory.getLogger(RequestServer.class);
private static final CloseableHttpAsyncClient client = HttpAsyncClients.createDefault();
protected String clientUri;
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ResponseServer.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ResponseServer.java
index 6189501..2218f1b 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ResponseServer.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/eventStream/ResponseServer.java
@@ -44,7 +44,7 @@
try {
HttpSession session = request.getSession();
session.setMaxInactiveInterval(72 * 60 * 60);
- ResponseService client = new ResponseService();
+ RequestServer client = new RequestServer();
client.connectToClient(clientUri);
response.setHeader("Content-Type", "text/event-stream");
response.setHeader("Cache-Control", "no-cache, no-store");
@@ -66,18 +66,6 @@
}
@Override
- protected void doGet(HttpServletRequest request, HttpServletResponse response) {
- try {
- response.setContentType("text/html");
- response.setStatus(HttpServletResponse.SC_OK);
- response.getWriter().println("<h1>Hello World</h1>");
- response.getWriter().println("session=" + request.getSession(true).getId());
- } catch (IOException e) {
- log.error(ExceptionUtils.getFullStackTrace(e));
- }
- }
-
- @Override
protected void doPost(HttpServletRequest request, HttpServletResponse response) {
try {
String clientUri = null;
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/HTTPNotifier.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/HTTPNotifier.java
deleted file mode 100644
index d3755ff..0000000
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/HTTPNotifier.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.fpcagent.workers;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.http.HttpResponse;
-import org.apache.http.HttpVersion;
-import org.apache.http.client.ResponseHandler;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.BasicResponseHandler;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.json.JSONObject;
-import org.onosproject.fpcagent.protocols.DpnNgicCommunicator;
-import org.onosproject.restconf.utils.RestconfUtils;
-import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.DefaultYangAutoPrefixNotify;
-import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.yangautoprefixnotify.value.DownlinkDataNotification;
-import org.onosproject.yang.model.DefaultModelObjectData;
-import org.onosproject.yang.model.ResourceData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import static org.onosproject.fpcagent.util.FpcUtil.modelConverter;
-import static org.onosproject.fpcagent.util.FpcUtil.module;
-
-public class HTTPNotifier implements AutoCloseable {
- private static final Logger log = LoggerFactory.getLogger(HTTPNotifier.class);
- private static HTTPNotifier _instance = null;
- private final BlockingQueue<Map.Entry<String, DefaultYangAutoPrefixNotify>> blockingQueue;
- private boolean run;
- private ResponseHandler<String> handler = new BasicResponseHandler();
-
- protected HTTPNotifier() {
- this.run = true;
- this.blockingQueue = new LinkedBlockingQueue<>();
- }
-
- public static HTTPNotifier createInstance() {
- if (_instance == null) {
- _instance = new HTTPNotifier();
- }
- return _instance;
- }
-
- public static HTTPNotifier getInstance() {
- return _instance;
- }
-
- public void send(Map.Entry<String, DefaultYangAutoPrefixNotify> buf) {
- try {
- blockingQueue.put(buf);
- } catch (InterruptedException e) {
- log.error(ExceptionUtils.getFullStackTrace(e));
- }
- }
-
-
- public void open() {
- ExecutorService executorService = Executors.newSingleThreadExecutor();
- executorService.submit(() -> {
- while ((!Thread.currentThread().isInterrupted()) && run) {
- try {
- Map.Entry<String, DefaultYangAutoPrefixNotify> entry = blockingQueue.take();
- DefaultYangAutoPrefixNotify notify = entry.getValue();
-
- CloseableHttpClient client = HttpClients.createDefault();
- HttpPost httpPost = new HttpPost(entry.getKey());
- httpPost.addHeader("User-Agent", "ONOS Notification Agent");
- httpPost.addHeader("Charset", "utf-8");
- httpPost.addHeader("Content-type", "application/json");
- httpPost.setProtocolVersion(HttpVersion.HTTP_1_1);
-
- ResourceData dataNode = modelConverter.createDataNode(
- DefaultModelObjectData.builder()
- .addModelObject(notify)
- .build()
- );
- ObjectNode jsonNodes = RestconfUtils.convertDataNodeToJson(module, dataNode.dataNodes().get(0));
- ObjectMapper mapper = new ObjectMapper();
-
- StringEntity params = new StringEntity(mapper.writeValueAsString(jsonNodes));
- httpPost.setEntity(params);
-
- log.info("Sending HTTP POST {}", httpPost);
- HttpResponse response = client.execute(httpPost);
-
- if (notify.value() instanceof DownlinkDataNotification) {
- String msg = handler.handleResponse(response);
- JSONObject json_body = new JSONObject(msg);
- DpnNgicCommunicator.send_ddn_ack(json_body);
- log.info("Response {}", response);
- }
- } catch (Exception e) {
- log.error(ExceptionUtils.getFullStackTrace(e));
- }
- }
- });
- }
-
- @Override
- public void close() {
- run = false;
- }
-}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java
index 9ab50aa..8207205 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java
@@ -1,13 +1,15 @@
package org.onosproject.fpcagent.workers;
import org.apache.commons.lang.exception.ExceptionUtils;
+import org.onosproject.core.IdGenerator;
import org.onosproject.fpcagent.providers.DpnDeviceListener;
import org.onosproject.fpcagent.util.CacheManager;
import org.onosproject.fpcagent.util.FpcUtil;
-import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.ClientIdentifier;
-import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.DefaultYangAutoPrefixNotify;
-import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.NotificationId;
-import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.OpIdentifier;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.*;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.configresultnotification.value.DefaultConfigResult;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.opstatusvalue.OpStatusEnum;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.result.ResultEnum;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.resultbody.resulttype.DefaultEmptyCase;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.yangautoprefixnotify.value.DefaultDownlinkDataNotification;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.yangautoprefixnotify.value.DownlinkDataNotification;
import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.FpcIdentity;
@@ -28,7 +30,6 @@
import static org.onosproject.fpcagent.protocols.DpnNgicCommunicator.*;
import static org.onosproject.fpcagent.util.Converter.*;
-import static org.onosproject.fpcagent.util.FpcUtil.notificationIds;
import static org.onosproject.fpcagent.util.FpcUtil.sendNotification;
public class ZMQSBSubscriberManager implements AutoCloseable {
@@ -43,6 +44,7 @@
private byte controllerTopic;
private boolean run;
private boolean conflictingTopic;
+ private IdGenerator notificationIds;
private Future<?> broadcastAllWorker;
private Future<?> broadcastControllersWorker;
@@ -51,7 +53,7 @@
private DpnDeviceListener dpnDeviceListener;
- protected ZMQSBSubscriberManager(String address, String nodeId, String networkId, DpnDeviceListener dpnDeviceListener) {
+ protected ZMQSBSubscriberManager(String address, String nodeId, String networkId, DpnDeviceListener dpnDeviceListener, IdGenerator notificationIds) {
this.address = address;
this.run = true;
this.nodeId = nodeId;
@@ -59,11 +61,12 @@
this.conflictingTopic = false;
this.controllerSourceId = (long) ThreadLocalRandom.current().nextInt(0, 65535);
this.dpnDeviceListener = dpnDeviceListener;
+ this.notificationIds = notificationIds;
}
- public static ZMQSBSubscriberManager createInstance(String address, String nodeId, String networkId, DpnDeviceListener providerService) {
+ public static ZMQSBSubscriberManager createInstance(String address, String nodeId, String networkId, DpnDeviceListener providerService, IdGenerator notificationIds) {
if (_instance == null) {
- _instance = new ZMQSBSubscriberManager(address, nodeId, networkId, providerService);
+ _instance = new ZMQSBSubscriberManager(address, nodeId, networkId, providerService, notificationIds);
}
return _instance;
}
@@ -161,7 +164,7 @@
ddn.sessionId(checkSessionId(toBigInt(buf, 2)));
ddn.notificationMessageType("Downlink-Data-Notification");
ddn.clientId(ClientIdentifier.of(FpcIdentity.of(FpcIdentityUnion.of(fromIntToLong(buf, 10)))));
- ddn.opId(OpIdentifier.of(BigInteger.valueOf(fromIntToLong(buf, 14))));
+ ddn.opId(OpIdentifier.of(BigInteger.valueOf(fromIntToLong(buf, 13))));
CacheManager.getCaches().forEach(
cacheManager -> {
try {
@@ -176,22 +179,30 @@
}
public Map.Entry<Object, Object> decode(byte[] buf) {
+// uint8_t topic_id;
+// uint8_t type;
s11MsgType type;
type = s11MsgType.getEnum(buf[1]);
if (type.equals(s11MsgType.DDN)) {
+// uint64_t session_id;
+// uint32_t client_id;
+// uint32_t op_id;
+// uint8_t node_network_id_buffer[NN_ID_BUF_LEN];
short nodeIdLen = buf[18];
short networkIdLen = buf[19 + nodeIdLen];
String key = new String(Arrays.copyOfRange(buf, 19, 19 + nodeIdLen)) + "/" + new String(Arrays.copyOfRange(buf, 20 + nodeIdLen, 20 + nodeIdLen + networkIdLen));
-
return new AbstractMap.SimpleEntry<>(processDDN(buf, key), null);
} else if (type.equals(s11MsgType.DPN_STATUS_INDICATION)) {
+// uint8_t source_topic_id;
+// uint8_t status;
+// uint32_t source;
+// uint8_t node_network_id_buffer[NN_ID_BUF_LEN];
DpnStatusIndication status;
-
short nodeIdLen = buf[8];
short networkIdLen = buf[9 + nodeIdLen];
String deviceId = new String(Arrays.copyOfRange(buf, 9, 9 + nodeIdLen)) + "/" + new String(Arrays.copyOfRange(buf, 10 + nodeIdLen, 10 + nodeIdLen + networkIdLen));
-
status = DpnStatusIndication.getEnum(buf[3]);
+ log.info("topic {}, status {}, source {}, node/net {}", (short) buf[2], status, toInt(buf, 4), deviceId);
if (status.equals(DpnStatusIndication.HELLO)) {
log.info("Hello {} on topic {}", deviceId, buf[2]);
dpnDeviceListener.deviceAdded(deviceId, buf[2]);
@@ -200,6 +211,25 @@
dpnDeviceListener.deviceRemoved(deviceId);
}
return new AbstractMap.SimpleEntry<>(status, deviceId);
+ } else if (type.equals(s11MsgType.DPN_RESPONSE)) {
+// uint8_t cause;
+// uint32_t client_id;
+// uint32_t op_id;
+ ClientIdentifier clientId = ClientIdentifier.of(FpcIdentity.of(FpcIdentityUnion.of(fromIntToLong(buf, 3))));
+ OpIdentifier opId = OpIdentifier.of(BigInteger.valueOf(fromIntToLong(buf, 7)));
+ log.info("cause {}, clientId {}, opId {}", (short) buf[2], clientId.toString(), opId.toString());
+ DefaultConfigResultNotification notify = new DefaultConfigResultNotification();
+ notify.notificationId(NotificationId.of(notificationIds.getNewId()));
+ notify.timestamp(BigInteger.valueOf(System.currentTimeMillis()));
+ DefaultConfigResult result = new DefaultConfigResult();
+ result.causeValue(buf[2]);
+ result.opId(opId);
+ result.opStatus(OpStatusEnum.OK);
+ result.result(Result.of(ResultEnum.OK));
+ result.resultType(new DefaultEmptyCase());
+ notify.value(result);
+
+ sendNotification(clientId.toString(), notify);
}
return null;
}
@@ -234,7 +264,8 @@
notify.notificationId(NotificationId.of(notificationIds.getNewId()));
notify.timestamp(BigInteger.valueOf(System.currentTimeMillis()));
notify.value((DownlinkDataNotification) key);
- sendNotification(notify, ((DownlinkDataNotification) key).clientId());
+
+ sendNotification(((DownlinkDataNotification) key).clientId().toString(), notify);
} else if (key instanceof DpnStatusIndication) {
if (key.equals(DpnStatusIndication.HELLO)) {
byte dpnTopic = FpcUtil.getTopicFromNode(msg.getValue().toString());