Process packet-in in separate thread and cache sadis results

Change-Id: Ifa73a51c7e90ec8df0367c2e30266724f955a3c1
diff --git a/app/src/main/java/org/opencord/bng/impl/PppoeHandlerRelay.java b/app/src/main/java/org/opencord/bng/impl/PppoeHandlerRelay.java
index f6c47f3..c840efc 100644
--- a/app/src/main/java/org/opencord/bng/impl/PppoeHandlerRelay.java
+++ b/app/src/main/java/org/opencord/bng/impl/PppoeHandlerRelay.java
@@ -16,7 +16,11 @@
 
 package org.opencord.bng.impl;
 
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Maps;
+import org.apache.commons.lang3.tuple.ImmutableTriple;
 import org.onlab.packet.Data;
 import org.onlab.packet.DeserializationException;
 import org.onlab.packet.Ethernet;
@@ -24,10 +28,12 @@
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.VlanId;
 import org.onlab.util.ItemNotFoundException;
+import org.onlab.util.SharedExecutors;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.event.AbstractListenerManager;
 import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.Port;
 import org.onosproject.net.config.ConfigFactory;
 import org.onosproject.net.config.NetworkConfigEvent;
 import org.onosproject.net.config.NetworkConfigListener;
@@ -69,6 +75,8 @@
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
@@ -128,9 +136,22 @@
      */
     private Map<MacAddress, BngAttachment> mapSrcMacToAttInfo;
 
+    /**
+     * Cache to cache Sadis results during PPPoE connection establishment.
+     */
+    private final LoadingCache<ImmutableTriple<VlanId, VlanId, ConnectPoint>, ConnectPoint>
+            oltCpCache = CacheBuilder.newBuilder()
+            .expireAfterWrite(30, TimeUnit.SECONDS)
+            .build(new CacheLoader<>() {
+                @Override
+                public ConnectPoint load(ImmutableTriple<VlanId, VlanId, ConnectPoint> key) throws Exception {
+                    return getOltConnectPoint(key.left, key.middle, key.right).orElseThrow();
+                }
+            });
+
     @Activate
     protected void activate() {
-        mapSrcMacToAttInfo = Maps.newHashMap();
+        mapSrcMacToAttInfo = Maps.newConcurrentMap();
         appId = coreService.getAppId(BngManager.BNG_APP);
         cfgService.addListener(cfgListener);
         cfgService.registerConfigFactory(cfgFactory);
@@ -150,6 +171,7 @@
         eventDispatcher.removeSink(PppoeEvent.class);
         packetService.removeProcessor(internalPacketProcessor);
         cfgService.unregisterConfigFactory(cfgFactory);
+        oltCpCache.invalidateAll();
         pppoeRelayConfig = null;
         mapSrcMacToAttInfo = null;
         internalPacketProcessor = null;
@@ -285,17 +307,25 @@
                                     BngAttachment attInfo, short pppoeSessionId,
                                     IpAddress ip) {
         // Retrive the NNI connect point
-        var oltConnectPoint = getOltConnectPoint(attInfo.sTag(), attInfo.cTag(),
-                                                 pppoeRelayConfig.getAsgToOltConnectPoint());
-        assert oltConnectPoint.orElse(null) != null;
+        ConnectPoint oltConnectPoint;
+        try {
+            oltConnectPoint = oltCpCache.get(ImmutableTriple.of(attInfo.sTag(), attInfo.cTag(),
+                                                                pppoeRelayConfig.getAsgToOltConnectPoint()));
+        } catch (ExecutionException e) {
+            // If unable to retrieve the OLT Connect Point log error and return.
+            // In this way we do NOT propagate the event and eventually create an
+            // inconsistent BNG Attachment.
+            log.error("Unable to retrieve the OLT Connect Point (\"NNI\" Connect Point)", e);
+            return;
+        }
         log.info("Generating event of type {}", bngAppEventType);
         post(new PppoeEvent(
                      bngAppEventType,
                      new PppoeEventSubject(
-                             oltConnectPoint.orElseThrow(),
+                             oltConnectPoint,
                              ip,
                              attInfo.macAddress(),
-                             getPortNameAnnotation(oltConnectPoint.orElse(null)),
+                             getPortNameAnnotation(oltConnectPoint),
                              pppoeSessionId,
                              attInfo.sTag(),
                              attInfo.cTag())
@@ -499,7 +529,6 @@
         // scope of an OLT. In lack of a better API in SADIS, we retrieve info
         // for all OLT ports and match those that have same c-tag and s-tag as
         // the given attachemnt info.
-
         var oltDeviceIds = linkService.getIngressLinks(asgToOltConnectPoint)
                 .stream()
                 .map(link -> link.src().deviceId())
@@ -516,9 +545,12 @@
 
         var oltConnectPoints = oltDeviceIds.stream()
                 .flatMap(deviceId -> deviceService.getPorts(deviceId).stream())
+                .filter(Port::isEnabled)
                 .filter(port -> {
                     var portName = port.annotations().value("portName");
-                    if (portName == null) {
+                    // FIXME: here we support a single UNI per ONU port
+                    if (portName == null ||
+                            (portName.contains("-") && !portName.endsWith("-1"))) {
                         return false;
                     }
                     var info = sadisService.getSubscriberInfoService()
@@ -589,7 +621,13 @@
             if (!Pppoe.isPPPoES(eth) && !Pppoe.isPPPoED(eth)) {
                 return;
             }
-            processPppoePacket(context);
+            SharedExecutors.getPoolThreadExecutor().submit(() -> {
+                try {
+                    processPppoePacket(context);
+                } catch (Throwable e) {
+                    log.error("Exception while processing packet", e);
+                }
+            });
         }
     }