fixed scenario when client registers after DPN
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java
index 4e9b3bb..a89c291 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/FpcRpcManager.java
@@ -92,10 +92,7 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.Optional;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.*;
import static org.onosproject.fpcagent.util.Converter.convertContext;
import static org.onosproject.fpcagent.util.FpcUtil.*;
@@ -151,7 +148,6 @@
rpcRegistry.unregisterRpcService(this);
clientInfo.clear();
tenantInfo.clear();
- dpnInfo.clear();
log.info("FPC RPC Service Stopped");
}
@@ -1011,62 +1007,12 @@
@Override
public void event(DeviceEvent event) {
if (event.subject().manufacturer().equals("fpc")) {
- switch (event.type()) {
- // in case an FPC device is added or restarted send DPN availability to all connected clients
- case DEVICE_UPDATED:
- case DEVICE_ADDED: {
- tenantInfo.forEach(
- (tenantId, clients) -> {
- Optional<DefaultTenant> defaultTenant = getTenant(tenantId);
- if (defaultTenant.isPresent()) {
- DefaultTenant tenant = defaultTenant.get();
- if (tenant.fpcTopology().dpns() != null) {
- tenant.fpcTopology().dpns().forEach(dpn -> {
- if (!dpnInfo.contains(dpn.dpnId())) {
- CacheManager.getInstance(tenantId).nodeNetworkCache.put(
- dpn.nodeId() + "/" + dpn.networkId(),
- Optional.of(dpn.dpnId())
- );
-
- DefaultYangAutoPrefixNotify notify = new DefaultYangAutoPrefixNotify();
- notify.notificationId(NotificationId.of(notificationIds.getNewId()));
-
- notify.timestamp(BigInteger.valueOf(System.currentTimeMillis()));
- DefaultDpnAvailability availability = new DefaultDpnAvailability();
- availability.availabilityMessageType("Dpn-Availability");
- availability.dpnId(dpn.dpnId());
- availability.dpnGroups(dpn.dpnGroups());
- availability.controlProtocol(dpn.controlProtocol());
- availability.networkId(dpn.networkId());
- availability.nodeId(dpn.nodeId());
- availability.dpnName(dpn.dpnName());
- availability.dpnStatus(DpnStatusEnum.AVAILABLE);
-
- notify.value(availability);
-
- clients.forEach(client -> sendNotification(notify, client));
- dpnInfo.add(dpn.dpnId());
- }
- }
- );
- }
- }
- }
- );
- break;
- }
- // in case an FPC device is removed or shutdown send DPN availability to all connected clients
- case DEVICE_AVAILABILITY_CHANGED:
- case DEVICE_REMOVED: {
- String nodeNetwork = event.subject().id().toString().split(":")[1];
- tenantInfo.forEach(
- (tenantId, clients) -> {
- try {
- FpcDpnId fpcDpnId = CacheManager.getInstance(tenantId).nodeNetworkCache.get(nodeNetwork).get();
-
- if (dpnInfo.contains(fpcDpnId)) {
- DefaultDpns dpn = CacheManager.getInstance(tenantId).dpnsCache.get(fpcDpnId).get();
-
+ tenantInfo.forEach(
+ (tenantId, clients) -> {
+ String nodeNetwork = event.subject().annotations().value("node/network");
+ try {
+ CacheManager.getInstance(tenantId).nodeNetworkCache.get(nodeNetwork).ifPresent(
+ dpn -> {
DefaultYangAutoPrefixNotify notify = new DefaultYangAutoPrefixNotify();
notify.notificationId(NotificationId.of(notificationIds.getNewId()));
@@ -1079,19 +1025,74 @@
availability.networkId(dpn.networkId());
availability.nodeId(dpn.nodeId());
availability.dpnName(dpn.dpnName());
- availability.dpnStatus(DpnStatusEnum.UNAVAILABLE);
+
+ switch (event.type()) {
+ case DEVICE_UPDATED:
+ case DEVICE_ADDED: {
+ availability.dpnStatus(DpnStatusEnum.AVAILABLE);
+ break;
+ }
+ case DEVICE_AVAILABILITY_CHANGED:
+ case DEVICE_REMOVED: {
+ availability.dpnStatus(DpnStatusEnum.UNAVAILABLE);
+ break;
+ }
+ default:
+ throw new RuntimeException("Unknown Device case.");
+ }
notify.value(availability);
-
clients.forEach(client -> sendNotification(notify, client));
- dpnInfo.remove(fpcDpnId);
}
- } catch (Exception e) {
- log.error(ExceptionUtils.getFullStackTrace(e));
+ );
+ } catch (ExecutionException e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ });
+ } else if (event.subject().manufacturer().equals("cp")) {
+ String clientId = event.subject().annotations().value("client-id");
+ DefaultRegisterClientInput clientInput = clientInfo.get(ClientIdentifier.fromString(clientId));
+
+ Optional<DefaultTenant> defaultTenant = getTenant(clientInput.tenantId());
+ if (defaultTenant.isPresent()) {
+ DefaultTenant tenant = defaultTenant.get();
+ log.debug("tenant {}", tenant);
+ if (tenant.fpcTopology().dpns() != null) {
+ tenant.fpcTopology().dpns().forEach(
+ dpn -> {
+ DefaultYangAutoPrefixNotify notify = new DefaultYangAutoPrefixNotify();
+ notify.notificationId(NotificationId.of(notificationIds.getNewId()));
+
+ notify.timestamp(BigInteger.valueOf(System.currentTimeMillis()));
+ DefaultDpnAvailability availability = new DefaultDpnAvailability();
+ availability.availabilityMessageType("Dpn-Availability");
+ availability.dpnId(dpn.dpnId());
+ availability.dpnGroups(dpn.dpnGroups());
+ availability.controlProtocol(dpn.controlProtocol());
+ availability.networkId(dpn.networkId());
+ availability.nodeId(dpn.nodeId());
+ availability.dpnName(dpn.dpnName());
+
+ switch (event.type()) {
+ case DEVICE_UPDATED:
+ case DEVICE_ADDED: {
+ availability.dpnStatus(DpnStatusEnum.AVAILABLE);
+ break;
+ }
+ case DEVICE_AVAILABILITY_CHANGED:
+ case DEVICE_REMOVED: {
+ availability.dpnStatus(DpnStatusEnum.UNAVAILABLE);
+ break;
+ }
+ default:
+ throw new RuntimeException("Unknown Device case.");
}
+
+ notify.value(availability);
+ sendNotification(notify, clientInput.clientId());
}
);
- break;
+
}
}
}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/CpProvider.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/CpProvider.java
index c26e787..0d77d14 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/CpProvider.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/CpProvider.java
@@ -98,6 +98,7 @@
.set(AnnotationKeys.NAME, id)
.set(AnnotationKeys.PROTOCOL, "RESTCONF")
.set(AnnotationKeys.MANAGEMENT_ADDRESS, address)
+ .set("client-id", id)
.build();
DeviceDescription descriptionBase = new DefaultDeviceDescription(deviceId.uri(), type, "cp", "0.1", "0.1", id, chassisId),
description = new DefaultDeviceDescription(descriptionBase, annotations);
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/DpnProvider.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/DpnProvider.java
index d72991e..69bfba6 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/DpnProvider.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/providers/DpnProvider.java
@@ -99,6 +99,7 @@
.set(AnnotationKeys.PROTOCOL, "ZMQ")
.set(AnnotationKeys.MANAGEMENT_ADDRESS, "127.0.0.1")
.set("topic", String.valueOf(topic))
+ .set("node/network", id)
.build();
DeviceDescription descriptionBase = new DefaultDeviceDescription(deviceId.uri(), type, "fpc", "0.1", "0.1", id, chassisId),
description = new DefaultDeviceDescription(descriptionBase, annotations);
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/CacheManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/CacheManager.java
index 2ee0e11..047f15e 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/CacheManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/CacheManager.java
@@ -20,7 +20,6 @@
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Maps;
-import org.apache.commons.lang.exception.ExceptionUtils;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.tenants.DefaultTenant;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.tenants.tenant.fpcmobility.DefaultContexts;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.tenants.tenant.fpctopology.DefaultDpns;
@@ -45,9 +44,12 @@
// Cacher per different Tenant
public static ConcurrentMap<FpcIdentity, CacheManager> cacheInfo = Maps.newConcurrentMap();
private final Logger log = LoggerFactory.getLogger(getClass());
+ // key: Context Id, value: Context
public LoadingCache<FpcContextId, Optional<DefaultContexts>> contextsCache;
+ // key: Dpn Id, value: DPN
public LoadingCache<FpcDpnId, Optional<DefaultDpns>> dpnsCache;
- public LoadingCache<String, Optional<FpcDpnId>> nodeNetworkCache;
+ // key: node/network, value: DPN
+ public LoadingCache<String, Optional<Dpns>> nodeNetworkCache;
private CacheManager(FpcIdentity identity) {
contextsCache = CacheBuilder.newBuilder()
@@ -109,23 +111,19 @@
nodeNetworkCache = CacheBuilder.newBuilder()
.maximumSize(100)
.build(
- new CacheLoader<String, Optional<FpcDpnId>>() {
+ new CacheLoader<String, Optional<Dpns>>() {
@Override
- public Optional<FpcDpnId> load(String s) throws Exception {
- try {
- Optional<DefaultTenant> defaultTenant = getTenant(identity);
- if (defaultTenant.isPresent()) {
- DefaultTenant tenant = defaultTenant.get();
- log.debug("tenant {}", tenant);
- if (tenant.fpcTopology().dpns() != null) {
- return tenant.fpcTopology().dpns().stream()
- .filter(dpns -> s.equals(dpns.nodeId() + "/" + dpns.networkId()))
- .findFirst()
- .map(Dpns::dpnId);
- }
+ public Optional<Dpns> load(String nodeNetwork) throws Exception {
+ Optional<DefaultTenant> defaultTenant = getTenant(identity);
+ if (defaultTenant.isPresent()) {
+ DefaultTenant tenant = defaultTenant.get();
+ log.debug("tenant {}", tenant);
+ if (tenant.fpcTopology().dpns() != null) {
+
+ return tenant.fpcTopology().dpns().stream()
+ .filter(dpns -> nodeNetwork.equals(dpns.nodeId() + "/" + dpns.networkId()))
+ .findFirst();
}
- } catch (Exception e) {
- log.error(ExceptionUtils.getFullStackTrace(e));
}
return Optional.empty();
}
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java
index 2b45678..8c1de9b 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/util/FpcUtil.java
@@ -17,7 +17,6 @@
package org.onosproject.fpcagent.util;
import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
import org.onosproject.config.DynamicConfigService;
import org.onosproject.config.Filter;
import org.onosproject.core.IdGenerator;
@@ -30,7 +29,6 @@
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.DefaultYangAutoPrefixNotify;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.tenants.DefaultTenant;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.tenants.TenantKeys;
-import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.FpcDpnId;
import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.FpcIdentity;
import org.onosproject.yang.model.*;
import org.slf4j.Logger;
@@ -52,7 +50,6 @@
public static final FpcIdentity defaultIdentity = FpcIdentity.fromString("default");
public static final ConcurrentMap<ClientIdentifier, DefaultRegisterClientInput> clientInfo = Maps.newConcurrentMap();
public static final ConcurrentMap<FpcIdentity, HashSet<ClientIdentifier>> tenantInfo = Maps.newConcurrentMap();
- public static final HashSet<FpcDpnId> dpnInfo = Sets.newHashSet();
private static final Logger log = LoggerFactory.getLogger(FpcUtil.class);
// Services
public static DynamicConfigService dynamicConfigService = null;
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/HTTPNotifier.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/HTTPNotifier.java
index 82b4790..8ef4c1e 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/HTTPNotifier.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/HTTPNotifier.java
@@ -101,7 +101,7 @@
ObjectNode jsonNodes = RestconfUtils.convertDataNodeToJson(module, dataNode.dataNodes().get(0));
ObjectMapper mapper = new ObjectMapper();
- StringEntity params = new StringEntity(mapper.writeValueAsString(jsonNodes.get(0)));
+ StringEntity params = new StringEntity(mapper.writeValueAsString(jsonNodes));
httpPost.setEntity(params);
HttpResponse response = client.execute(httpPost);
diff --git a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java
index 14ebe91..9ab50aa 100644
--- a/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java
+++ b/apps/fpcagent/src/main/java/org/onosproject/fpcagent/workers/ZMQSBSubscriberManager.java
@@ -10,7 +10,6 @@
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.OpIdentifier;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.yangautoprefixnotify.value.DefaultDownlinkDataNotification;
import org.onosproject.yang.gen.v1.ietfdmmfpcagent.rev20160803.ietfdmmfpcagent.yangautoprefixnotify.value.DownlinkDataNotification;
-import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.FpcDpnId;
import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.FpcIdentity;
import org.onosproject.yang.gen.v1.ietfdmmfpcbase.rev20160803.ietfdmmfpcbase.fpcidentity.FpcIdentityUnion;
import org.slf4j.Logger;
@@ -22,7 +21,6 @@
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -159,22 +157,22 @@
* @return DownlinkDataNotification or null if it could not be successfully decoded
*/
private DownlinkDataNotification processDDN(byte[] buf, String key) {
- DownlinkDataNotification ddnB = new DefaultDownlinkDataNotification();
- ddnB.sessionId(checkSessionId(toBigInt(buf, 2)));
- ddnB.notificationMessageType("Downlink-Data-Notification");
- ddnB.clientId(ClientIdentifier.of(FpcIdentity.of(FpcIdentityUnion.of(fromIntToLong(buf, 10)))));
- ddnB.opId(OpIdentifier.of(BigInteger.valueOf(fromIntToLong(buf, 14))));
+ DownlinkDataNotification ddn = new DefaultDownlinkDataNotification();
+ ddn.sessionId(checkSessionId(toBigInt(buf, 2)));
+ ddn.notificationMessageType("Downlink-Data-Notification");
+ ddn.clientId(ClientIdentifier.of(FpcIdentity.of(FpcIdentityUnion.of(fromIntToLong(buf, 10)))));
+ ddn.opId(OpIdentifier.of(BigInteger.valueOf(fromIntToLong(buf, 14))));
CacheManager.getCaches().forEach(
cacheManager -> {
try {
- Optional<FpcDpnId> fpcDpnId = cacheManager.nodeNetworkCache.get(key);
- fpcDpnId.ifPresent(ddnB::notificationDpnId);
+ cacheManager.nodeNetworkCache.get(key)
+ .ifPresent(dpn -> ddn.notificationDpnId(dpn.dpnId()));
} catch (ExecutionException e) {
log.error(ExceptionUtils.getFullStackTrace(e));
}
}
);
- return ddnB;
+ return ddn;
}
public Map.Entry<Object, Object> decode(byte[] buf) {
@@ -236,7 +234,7 @@
notify.notificationId(NotificationId.of(notificationIds.getNewId()));
notify.timestamp(BigInteger.valueOf(System.currentTimeMillis()));
notify.value((DownlinkDataNotification) key);
- sendNotification(notify , ((DownlinkDataNotification) key).clientId());
+ sendNotification(notify, ((DownlinkDataNotification) key).clientId());
} else if (key instanceof DpnStatusIndication) {
if (key.equals(DpnStatusIndication.HELLO)) {
byte dpnTopic = FpcUtil.getTopicFromNode(msg.getValue().toString());