Process packet-in in separate thread and cache sadis results
Change-Id: Ifa73a51c7e90ec8df0367c2e30266724f955a3c1
diff --git a/app/src/main/java/org/opencord/bng/impl/PppoeHandlerRelay.java b/app/src/main/java/org/opencord/bng/impl/PppoeHandlerRelay.java
index f6c47f3..c840efc 100644
--- a/app/src/main/java/org/opencord/bng/impl/PppoeHandlerRelay.java
+++ b/app/src/main/java/org/opencord/bng/impl/PppoeHandlerRelay.java
@@ -16,7 +16,11 @@
package org.opencord.bng.impl;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.common.collect.Maps;
+import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.onlab.packet.Data;
import org.onlab.packet.DeserializationException;
import org.onlab.packet.Ethernet;
@@ -24,10 +28,12 @@
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
import org.onlab.util.ItemNotFoundException;
+import org.onlab.util.SharedExecutors;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.event.AbstractListenerManager;
import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.Port;
import org.onosproject.net.config.ConfigFactory;
import org.onosproject.net.config.NetworkConfigEvent;
import org.onosproject.net.config.NetworkConfigListener;
@@ -69,6 +75,8 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
@@ -128,9 +136,22 @@
*/
private Map<MacAddress, BngAttachment> mapSrcMacToAttInfo;
+ /**
+ * Cache to cache Sadis results during PPPoE connection establishment.
+ */
+ private final LoadingCache<ImmutableTriple<VlanId, VlanId, ConnectPoint>, ConnectPoint>
+ oltCpCache = CacheBuilder.newBuilder()
+ .expireAfterWrite(30, TimeUnit.SECONDS)
+ .build(new CacheLoader<>() {
+ @Override
+ public ConnectPoint load(ImmutableTriple<VlanId, VlanId, ConnectPoint> key) throws Exception {
+ return getOltConnectPoint(key.left, key.middle, key.right).orElseThrow();
+ }
+ });
+
@Activate
protected void activate() {
- mapSrcMacToAttInfo = Maps.newHashMap();
+ mapSrcMacToAttInfo = Maps.newConcurrentMap();
appId = coreService.getAppId(BngManager.BNG_APP);
cfgService.addListener(cfgListener);
cfgService.registerConfigFactory(cfgFactory);
@@ -150,6 +171,7 @@
eventDispatcher.removeSink(PppoeEvent.class);
packetService.removeProcessor(internalPacketProcessor);
cfgService.unregisterConfigFactory(cfgFactory);
+ oltCpCache.invalidateAll();
pppoeRelayConfig = null;
mapSrcMacToAttInfo = null;
internalPacketProcessor = null;
@@ -285,17 +307,25 @@
BngAttachment attInfo, short pppoeSessionId,
IpAddress ip) {
// Retrive the NNI connect point
- var oltConnectPoint = getOltConnectPoint(attInfo.sTag(), attInfo.cTag(),
- pppoeRelayConfig.getAsgToOltConnectPoint());
- assert oltConnectPoint.orElse(null) != null;
+ ConnectPoint oltConnectPoint;
+ try {
+ oltConnectPoint = oltCpCache.get(ImmutableTriple.of(attInfo.sTag(), attInfo.cTag(),
+ pppoeRelayConfig.getAsgToOltConnectPoint()));
+ } catch (ExecutionException e) {
+ // If unable to retrieve the OLT Connect Point log error and return.
+ // In this way we do NOT propagate the event and eventually create an
+ // inconsistent BNG Attachment.
+ log.error("Unable to retrieve the OLT Connect Point (\"NNI\" Connect Point)", e);
+ return;
+ }
log.info("Generating event of type {}", bngAppEventType);
post(new PppoeEvent(
bngAppEventType,
new PppoeEventSubject(
- oltConnectPoint.orElseThrow(),
+ oltConnectPoint,
ip,
attInfo.macAddress(),
- getPortNameAnnotation(oltConnectPoint.orElse(null)),
+ getPortNameAnnotation(oltConnectPoint),
pppoeSessionId,
attInfo.sTag(),
attInfo.cTag())
@@ -499,7 +529,6 @@
// scope of an OLT. In lack of a better API in SADIS, we retrieve info
// for all OLT ports and match those that have same c-tag and s-tag as
// the given attachemnt info.
-
var oltDeviceIds = linkService.getIngressLinks(asgToOltConnectPoint)
.stream()
.map(link -> link.src().deviceId())
@@ -516,9 +545,12 @@
var oltConnectPoints = oltDeviceIds.stream()
.flatMap(deviceId -> deviceService.getPorts(deviceId).stream())
+ .filter(Port::isEnabled)
.filter(port -> {
var portName = port.annotations().value("portName");
- if (portName == null) {
+ // FIXME: here we support a single UNI per ONU port
+ if (portName == null ||
+ (portName.contains("-") && !portName.endsWith("-1"))) {
return false;
}
var info = sadisService.getSubscriberInfoService()
@@ -589,7 +621,13 @@
if (!Pppoe.isPPPoES(eth) && !Pppoe.isPPPoED(eth)) {
return;
}
- processPppoePacket(context);
+ SharedExecutors.getPoolThreadExecutor().submit(() -> {
+ try {
+ processPppoePacket(context);
+ } catch (Throwable e) {
+ log.error("Exception while processing packet", e);
+ }
+ });
}
}