ddn
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java
index 85d3098..63bcd25 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java
@@ -16,10 +16,6 @@
package org.onosproject.fpcagent;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
@@ -29,19 +25,16 @@
import org.onosproject.config.DynamicConfigService;
import org.onosproject.config.DynamicConfigStore;
import org.onosproject.core.CoreService;
-import org.onosproject.core.IdGenerator;
import org.onosproject.fpcagent.protocols.DpnCommunicationService;
import org.onosproject.fpcagent.protocols.DpnNgicCommunicator;
import org.onosproject.fpcagent.protocols.DpnP4Communicator;
import org.onosproject.fpcagent.providers.CpProviderService;
import org.onosproject.fpcagent.util.CacheManager;
import org.onosproject.fpcagent.util.FpcUtil;
-import org.onosproject.fpcagent.workers.HTTPNotifier;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.device.DeviceStore;
-import org.onosproject.restconf.utils.RestconfUtils;
import org.onosproject.yang.gen.v1.fpc.rev20150105.fpc.DefaultConnectionInfo;
import org.onosproject.yang.gen.v1.fpc.rev20150105.fpc.P4DpnControlProtocol;
import org.onosproject.yang.gen.v1.fpc.rev20150105.fpc.ZmqDpnControlProtocol;
@@ -95,8 +88,14 @@
import java.math.BigInteger;
import java.time.Duration;
import java.time.Instant;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import static org.onosproject.fpcagent.util.Converter.convertContext;
import static org.onosproject.fpcagent.util.FpcUtil.*;
@@ -134,13 +133,9 @@
private DeviceService deviceService;
private InternalDeviceListener listener = new InternalDeviceListener();
- private ConcurrentMap<ClientIdentifier, DefaultRegisterClientInput> clientInfo = Maps.newConcurrentMap();
- private ConcurrentMap<FpcIdentity, HashSet<ClientIdentifier>> tenantInfo = Maps.newConcurrentMap();
- private HashSet<FpcDpnId> dpnInfo = Sets.newHashSet();
// FIXME configurable
private ExecutorService executorService = Executors.newFixedThreadPool(25);
- private IdGenerator notificationIds;
@Activate
protected void activate() {
@@ -991,28 +986,6 @@
}, executorService).join();
}
- private void sendNotification(DefaultYangAutoPrefixNotify notify, ClientIdentifier client) {
- ResourceData dataNode = modelConverter.createDataNode(
- DefaultModelObjectData.builder()
- .addModelObject(notify)
- .build()
- );
- ObjectNode jsonNodes = RestconfUtils.convertDataNodeToJson(notification, dataNode.dataNodes().get(0));
- ObjectMapper mapper = new ObjectMapper();
-
- try {
- log.info("Sending HTTP notification {} to {}", notify, client);
- HTTPNotifier.getInstance().send(
- new AbstractMap.SimpleEntry<>(
- clientInfo.get(client).endpointUri().toString(),
- mapper.writeValueAsString(jsonNodes)
- )
- );
- } catch (JsonProcessingException e) {
- log.error(ExceptionUtils.getFullStackTrace(e));
- }
- }
-
public class InternalDeviceListener implements DeviceListener {
@Override
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/CacheManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/CacheManager.java
index efac596..2ee0e11 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/CacheManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/CacheManager.java
@@ -33,6 +33,7 @@
import java.util.Optional;
import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Stream;
import static org.onosproject.fpcagent.util.FpcUtil.getTenant;
@@ -118,7 +119,7 @@
log.debug("tenant {}", tenant);
if (tenant.fpcTopology().dpns() != null) {
return tenant.fpcTopology().dpns().stream()
- .filter(dpns -> s.equals(dpns.nodeId()+"/"+dpns.networkId()))
+ .filter(dpns -> s.equals(dpns.nodeId() + "/" + dpns.networkId()))
.findFirst()
.map(Dpns::dpnId);
}
@@ -137,4 +138,8 @@
return cacheInfo.get(identity);
}
+ public static Stream<CacheManager> getCaches() {
+ return cacheInfo.values().stream();
+ }
+
}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java
index 9de92c8..98585bf 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java
@@ -16,21 +16,36 @@
package org.onosproject.fpcagent.util;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang.exception.ExceptionUtils;
import org.onosproject.config.DynamicConfigService;
import org.onosproject.config.Filter;
+import org.onosproject.core.IdGenerator;
+import org.onosproject.fpcagent.workers.HTTPNotifier;
import org.onosproject.net.Device;
import org.onosproject.net.device.DeviceStore;
+import org.onosproject.restconf.utils.RestconfUtils;
+import org.onosproject.yang.gen.v1.fpc.rev20150105.fpc.registerclient.DefaultRegisterClientInput;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.ClientIdentifier;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.DefaultTenants;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.DefaultYangAutoPrefixNotify;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.tenants.DefaultTenant;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.tenants.TenantKeys;
+import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.FpcDpnId;
import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.FpcIdentity;
import org.onosproject.yang.model.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.AbstractMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.ConcurrentMap;
import static org.onosproject.net.DeviceId.deviceId;
@@ -55,6 +70,12 @@
public static ResourceId module;
public static ResourceId notification;
+ public static IdGenerator notificationIds;
+
+ public static final ConcurrentMap<ClientIdentifier, DefaultRegisterClientInput> clientInfo = Maps.newConcurrentMap();
+ public static final ConcurrentMap<FpcIdentity, HashSet<ClientIdentifier>> tenantInfo = Maps.newConcurrentMap();
+ public static final HashSet<FpcDpnId> dpnInfo = Sets.newHashSet();
+
/**
* Returns resource id from model converter.
*
@@ -275,4 +296,26 @@
node -> dynamicConfigService.updateNode(dataNode.resourceId(), node)
);
}
+
+ public static void sendNotification(DefaultYangAutoPrefixNotify notify, ClientIdentifier client) {
+ ResourceData dataNode = modelConverter.createDataNode(
+ DefaultModelObjectData.builder()
+ .addModelObject(notify)
+ .build()
+ );
+ ObjectNode jsonNodes = RestconfUtils.convertDataNodeToJson(module, dataNode.dataNodes().get(0));
+ ObjectMapper mapper = new ObjectMapper();
+
+ try {
+ log.info("Sending HTTP notification {} to {}", notify, client);
+ HTTPNotifier.getInstance().send(
+ new AbstractMap.SimpleEntry<>(
+ clientInfo.get(client).endpointUri().toString(),
+ mapper.writeValueAsString(jsonNodes)
+ )
+ );
+ } catch (JsonProcessingException e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ }
}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java
index a8e93e4..14ebe91 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java
@@ -1,22 +1,37 @@
package org.onosproject.fpcagent.workers;
-import javafx.util.Pair;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.onosproject.fpcagent.providers.DpnDeviceListener;
+import org.onosproject.fpcagent.util.CacheManager;
import org.onosproject.fpcagent.util.FpcUtil;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.ClientIdentifier;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.DefaultYangAutoPrefixNotify;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.NotificationId;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.OpIdentifier;
+import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.yangautoprefixnotify.value.DefaultDownlinkDataNotification;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.yangautoprefixnotify.value.DownlinkDataNotification;
+import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.FpcDpnId;
+import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.FpcIdentity;
+import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.fpcidentity.FpcIdentityUnion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
+import java.math.BigInteger;
+import java.util.AbstractMap;
import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import static org.onosproject.fpcagent.protocols.DpnNgicCommunicator.*;
-import static org.onosproject.fpcagent.util.Converter.toInt;
+import static org.onosproject.fpcagent.util.Converter.*;
+import static org.onosproject.fpcagent.util.FpcUtil.notificationIds;
+import static org.onosproject.fpcagent.util.FpcUtil.sendNotification;
public class ZMQSBSubscriberManager implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(ZMQSBSubscriberManager.class);
@@ -123,37 +138,46 @@
this.ctx = new ZContext();
}
-// /**
-// * Ensures the session id is an unsigned 64 bit integer
-// *
-// * @param sessionId - session id received from the DPN
-// * @return unsigned session id
-// */
-// private static BigInteger checkSessionId(BigInteger sessionId) {
-// if (sessionId.compareTo(BigInteger.ZERO) < 0) {
-// sessionId = sessionId.add(BigInteger.ONE.shiftLeft(64));
-// }
-// return sessionId;
-// }
-//
-// /**
-// * Decodes a DownlinkDataNotification
-// *
-// * @param buf - message buffer
-// * @param key - Concatenation of node id + / + network id
-// * @return DownlinkDataNotification or null if it could not be successfully decoded
-// */
-// private static DownlinkDataNotification processDDN(byte[] buf, String key) {
-// DownlinkDataNotification ddnB = new DefaultDownlinkDataNotification();
-// ddnB.sessionId(checkSessionId(toBigInt(buf, 2)));
-// ddnB.notificationMessageType(DOWNLINK_DATA_NOTIFICATION_STRING);
-// ddnB.clientId(ClientIdentifier.of(FpcIdentity.of(FpcIdentityUnion.of(fromIntToLong(buf, 10)))));
-// ddnB.opId(OpIdentifier.of(BigInteger.valueOf(fromIntToLong(buf, 14))));
-// ddnB.notificationDpnId(uplinkDpnMap.get(key));
-// return ddnB;
-// }
+ /**
+ * Ensures the session id is an unsigned 64 bit integer
+ *
+ * @param sessionId - session id received from the DPN
+ * @return unsigned session id
+ */
+ private BigInteger checkSessionId(BigInteger sessionId) {
+ if (sessionId.compareTo(BigInteger.ZERO) < 0) {
+ sessionId = sessionId.add(BigInteger.ONE.shiftLeft(64));
+ }
+ return sessionId;
+ }
- public Pair<Object, Object> decode(byte[] buf) {
+ /**
+ * Decodes a DownlinkDataNotification
+ *
+ * @param buf - message buffer
+ * @param key - Concatenation of node id + / + network id
+ * @return DownlinkDataNotification or null if it could not be successfully decoded
+ */
+ private DownlinkDataNotification processDDN(byte[] buf, String key) {
+ DownlinkDataNotification ddnB = new DefaultDownlinkDataNotification();
+ ddnB.sessionId(checkSessionId(toBigInt(buf, 2)));
+ ddnB.notificationMessageType("Downlink-Data-Notification");
+ ddnB.clientId(ClientIdentifier.of(FpcIdentity.of(FpcIdentityUnion.of(fromIntToLong(buf, 10)))));
+ ddnB.opId(OpIdentifier.of(BigInteger.valueOf(fromIntToLong(buf, 14))));
+ CacheManager.getCaches().forEach(
+ cacheManager -> {
+ try {
+ Optional<FpcDpnId> fpcDpnId = cacheManager.nodeNetworkCache.get(key);
+ fpcDpnId.ifPresent(ddnB::notificationDpnId);
+ } catch (ExecutionException e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ }
+ );
+ return ddnB;
+ }
+
+ public Map.Entry<Object, Object> decode(byte[] buf) {
s11MsgType type;
type = s11MsgType.getEnum(buf[1]);
if (type.equals(s11MsgType.DDN)) {
@@ -161,7 +185,7 @@
short networkIdLen = buf[19 + nodeIdLen];
String key = new String(Arrays.copyOfRange(buf, 19, 19 + nodeIdLen)) + "/" + new String(Arrays.copyOfRange(buf, 20 + nodeIdLen, 20 + nodeIdLen + networkIdLen));
-// return uplinkDpnMap.get(key) == null ? null : new AbstractMap.SimpleEntry<>(uplinkDpnMap.get(key), processDDN(buf, key));
+ return new AbstractMap.SimpleEntry<>(processDDN(buf, key), null);
} else if (type.equals(s11MsgType.DPN_STATUS_INDICATION)) {
DpnStatusIndication status;
@@ -177,7 +201,7 @@
log.info("Bye {}", deviceId);
dpnDeviceListener.deviceRemoved(deviceId);
}
- return new Pair<>(status, deviceId);
+ return new AbstractMap.SimpleEntry<>(status, deviceId);
}
return null;
}
@@ -204,11 +228,15 @@
}
break;
default:
- Pair msg = decode(contents);
+ Map.Entry msg = decode(contents);
if (msg != null) {
Object key = msg.getKey();
if (key instanceof DownlinkDataNotification) {
- // TODO handle DL notification
+ DefaultYangAutoPrefixNotify notify = new DefaultYangAutoPrefixNotify();
+ notify.notificationId(NotificationId.of(notificationIds.getNewId()));
+ notify.timestamp(BigInteger.valueOf(System.currentTimeMillis()));
+ notify.value((DownlinkDataNotification) key);
+ sendNotification(notify , ((DownlinkDataNotification) key).clientId());
} else if (key instanceof DpnStatusIndication) {
if (key.equals(DpnStatusIndication.HELLO)) {
byte dpnTopic = FpcUtil.getTopicFromNode(msg.getValue().toString());