[VOL-4746] removing sub based on programmed data

Fix originally sent by Matteo Scandolo.

Change-Id: Ia62300f17af84d8cac98e3fc18a99eb697d4de15
Signed-off-by: Gustavo Silva <gsilva@furukawalatam.com>
diff --git a/api/src/main/java/org/opencord/olt/OltFlowsStatus.java b/api/src/main/java/org/opencord/olt/OltFlowsStatus.java
index b002d57..1b7999c 100644
--- a/api/src/main/java/org/opencord/olt/OltFlowsStatus.java
+++ b/api/src/main/java/org/opencord/olt/OltFlowsStatus.java
@@ -41,5 +41,13 @@
     /**
      * An error occurred.
      */
-    ERROR
+    ERROR;
+
+    /**
+     * Checks if this status means the flow is still available or in progress to be available.
+     * @return true if the status represents an available flow.
+     */
+    public boolean hasFlow() {
+        return !OltFlowsStatus.NONE.equals(this) && !OltFlowsStatus.REMOVED.equals(this);
+    }
 }
\ No newline at end of file
diff --git a/impl/src/main/java/org/opencord/olt/impl/Olt.java b/impl/src/main/java/org/opencord/olt/impl/Olt.java
index d744c1e..e95df6f 100644
--- a/impl/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/impl/src/main/java/org/opencord/olt/impl/Olt.java
@@ -68,6 +68,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -85,6 +86,7 @@
 import static org.onlab.util.Tools.groupedThreads;
 import static org.opencord.olt.impl.OltUtils.getPortName;
 import static org.opencord.olt.impl.OltUtils.portWithName;
+import static org.opencord.olt.impl.OltUtils.getProgrammedSubscriber;
 import static org.opencord.olt.impl.OsgiPropertyConstants.*;
 
 /**
@@ -374,14 +376,20 @@
                 return false;
             }
 
-            SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
+            //First check if the subscriber is in the programmed subscriber map, if not fallback to sadis
+            SubscriberAndDeviceInformation si = getProgrammedSubscriber(oltFlowService, accessDevicePort);
             if (si == null) {
-                log.error("Subscriber information not found in sadis for port {}",
+                si = subsService.get(getPortName(port));
+            }
+            // if it's still null we can't proceed
+            if (si == null) {
+                log.error("Subscriber information not found in programmed subscribers or sadis for port {}",
                         accessDevicePort);
                 // NOTE that we are returning true so that the subscriber is removed from the queue
                 // and we can move on provisioning others
                 return false;
             }
+
             DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
                     DiscoveredSubscriber.Status.ADMIN_REMOVED, true, si);
 
@@ -413,7 +421,7 @@
         }
 
         SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
-        UniTagInformation specificService = getUniTagInformation(getPortName(port), cTag, sTag, tpId);
+        UniTagInformation specificService = getUniTagInformation(port, cTag, sTag, tpId);
         if (specificService == null) {
             log.error("Can't find Information for subscriber on {}, with cTag {}, " +
                     "stag {}, tpId {}", cp, cTag, sTag, tpId);
@@ -453,7 +461,7 @@
         }
 
         SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
-        UniTagInformation specificService = getUniTagInformation(getPortName(port), cTag, sTag, tpId);
+        UniTagInformation specificService = getUniTagInformation(port, cTag, sTag, tpId);
         if (specificService == null) {
             log.error("Can't find Information for subscriber on {}, with cTag {}, " +
                     "stag {}, tpId {}", cp, cTag, sTag, tpId);
@@ -616,16 +624,30 @@
      * using the pon c tag, pon s tag and the technology profile id
      * May return Optional<null>
      *
-     * @param portName  port of the subscriber
+     * @param port  port of the subscriber
      * @param innerVlan pon c tag
      * @param outerVlan pon s tag
      * @param tpId      the technology profile id
      * @return the found uni tag information
      */
-    private UniTagInformation getUniTagInformation(String portName, VlanId innerVlan,
+    private UniTagInformation getUniTagInformation(Port port, VlanId innerVlan,
                                                    VlanId outerVlan, int tpId) {
+        String portName = portWithName(port);
         log.debug("Getting uni tag information for {}, innerVlan: {}, outerVlan: {}, tpId: {}",
                 portName, innerVlan, outerVlan, tpId);
+        //First check if the subscriber is in the programmed subscriber map, if not fallback to sadis
+        //there should be only one sub service with these characteristics.
+        Optional<Map.Entry<ServiceKey, UniTagInformation>> service = oltFlowService.getProgrammedSubscribers()
+                .entrySet().stream()
+                .filter(entry -> entry.getKey().getPort().equals(new AccessDevicePort(port))
+                        && entry.getValue().getPonSTag().equals(outerVlan)
+                        && entry.getValue().getPonCTag().equals(innerVlan))
+                .findFirst();
+        if (service.isPresent()) {
+            log.debug("Subscriber was programmed with uni tag info for {}, innerVlan: {}, outerVlan: {}, tpId: {}",
+                    portName, innerVlan, outerVlan, tpId);
+            return service.get().getValue();
+        }
         SubscriberAndDeviceInformation subInfo = subsService.get(portName);
         if (subInfo == null) {
             log.warn("Subscriber information doesn't exist for {}", portName);
@@ -638,22 +660,22 @@
             return null;
         }
 
-        UniTagInformation service = OltUtils.getUniTagInformation(subInfo, innerVlan, outerVlan, tpId);
+        UniTagInformation uniTagInformation = OltUtils.getUniTagInformation(subInfo, innerVlan, outerVlan, tpId);
+        if (uniTagInformation == null) {
 
-        if (service == null) {
             // Try again after invalidating cache for the particular port name.
             subsService.invalidateId(portName);
             subInfo = subsService.get(portName);
-            service = OltUtils.getUniTagInformation(subInfo, innerVlan, outerVlan, tpId);
+            uniTagInformation = OltUtils.getUniTagInformation(subInfo, innerVlan, outerVlan, tpId);
         }
 
-        if (service == null) {
+        if (uniTagInformation == null) {
             log.warn("SADIS doesn't include the service with ponCtag {} ponStag {} and tpId {} on {}",
                     innerVlan, outerVlan, tpId, portName);
             return null;
         }
 
-        return service;
+        return uniTagInformation;
     }
 
     protected void bindSadisService(SadisService service) {
@@ -883,6 +905,7 @@
                 log.debug("Ignoring PORT_ADD on UNI port {}", portWithName(port));
                 return;
             }
+            AccessDevicePort accessDevicePort = new AccessDevicePort(port);
 
             if (port.isEnabled()) {
                 if (isNni) {
@@ -902,8 +925,17 @@
                     // NOTE if the subscriber was previously provisioned,
                     // then add it back to the queue to be re-provisioned
                     boolean provisionSubscriber = oltFlowService.
-                            isSubscriberServiceProvisioned(new AccessDevicePort(port));
-                    SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
+                            isSubscriberServiceProvisioned(accessDevicePort);
+
+                    SubscriberAndDeviceInformation si;
+                    DiscoveredSubscriber.Status status = DiscoveredSubscriber.Status.ADDED;
+                    if (type == DeviceEvent.Type.PORT_REMOVED) {
+                        status = DiscoveredSubscriber.Status.REMOVED;
+                        si = getProgrammedSubscriber(oltFlowService, accessDevicePort);
+                    } else {
+                        si = subsService.get(getPortName(port));
+                    }
+
                     if (si == null) {
                         //NOTE this should not happen given that the subscriber was provisioned before
                         log.error("Subscriber information not found in sadis for port {}",
@@ -911,11 +943,6 @@
                         return;
                     }
 
-                    DiscoveredSubscriber.Status status = DiscoveredSubscriber.Status.ADDED;
-                    if (type == DeviceEvent.Type.PORT_REMOVED) {
-                        status = DiscoveredSubscriber.Status.REMOVED;
-                    }
-
                     DiscoveredSubscriber sub =
                             new DiscoveredSubscriber(device, port,
                                     status, provisionSubscriber, si);
@@ -948,7 +975,12 @@
                         addSubscriberToQueue(sub);
 
                     } else if (oltFlowService.isSubscriberServiceProvisioned(new AccessDevicePort(port))) {
-                        SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
+                        //First check if the subscriber is in the programmed subscriber map, if not fallback to sadis
+                        SubscriberAndDeviceInformation si = getProgrammedSubscriber(oltFlowService, accessDevicePort);
+                        if (si == null) {
+                            si = subsService.get(getPortName(port));
+                        }
+                        // if it's still null we can't proceed
                         if (si == null) {
                             //NOTE this should not happen given that the subscriber was provisioned before
                             log.error("Subscriber information not found in sadis for port {}",
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
index a339c87..f303f4a 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
@@ -111,6 +111,7 @@
 import static org.opencord.olt.impl.OltUtils.flowOpToString;
 import static org.opencord.olt.impl.OltUtils.getPortName;
 import static org.opencord.olt.impl.OltUtils.portWithName;
+import static org.opencord.olt.impl.OltUtils.getProgrammedSubscriber;
 import static org.opencord.olt.impl.OsgiPropertyConstants.*;
 import static org.opencord.olt.impl.fttb.FttbUtils.FTTB_FLOW_DIRECTION;
 import static org.opencord.olt.impl.fttb.FttbUtils.FTTB_FLOW_DOWNSTREAM;
@@ -270,7 +271,7 @@
     public void activate(ComponentContext context) {
         cfgService.registerProperties(getClass());
         appId = coreService.registerApplication(APP_NAME);
-        internalFlowListener = new InternalFlowListener();
+        internalFlowListener = new InternalFlowListener(this);
 
         modified(context);
 
@@ -401,16 +402,13 @@
             cpStatusReadLock.lock();
 
             cpStatus.forEach((sk, status) -> {
-                if (
-                    // not NNI Port
-                        !oltDeviceService.isNniPort(deviceService.getDevice(sk.getPort().connectPoint().deviceId()),
-                                sk.getPort().connectPoint().port()) &&
-                                // not EAPOL flow
-                                !sk.getService().equals(defaultEapolUniTag) &&
-                                !status.subscriberFlowsStatus.equals(OltFlowsStatus.PENDING_REMOVE)
-                                && !status.subscriberFlowsStatus.equals(OltFlowsStatus.REMOVED)
-
-                ) {
+                ConnectPoint cp = sk.getPort().connectPoint();
+                Device device = deviceService.getDevice(cp.deviceId());
+                boolean notNni = !oltDeviceService.isNniPort(device, cp.port());
+                boolean notEapol = !sk.getService().equals(defaultEapolUniTag);
+                boolean hasHsia = status.subscriberFlowsStatus.hasFlow();
+                boolean hasDhcp = status.dhcpStatus.hasFlow();
+                if (notNni && notEapol && (hasHsia || hasDhcp)) {
                     subscribers.put(sk, sk.getService());
                 }
             });
@@ -1601,6 +1599,13 @@
     }
 
     protected class InternalFlowListener implements FlowRuleListener {
+
+        private OltFlowServiceInterface oltFlowService;
+
+        public InternalFlowListener(OltFlowServiceInterface oltFlowService) {
+            this.oltFlowService = oltFlowService;
+        }
+
         @Override
         public void event(FlowRuleEvent event) {
             if (appId.id() != (event.subject().appId())) {
@@ -1728,7 +1733,12 @@
         }
 
         private ServiceKey getSubscriberKeyFromFlowRule(FlowRule flowRule, Port flowPort) {
-            SubscriberAndDeviceInformation si = subsService.get(getPortName(flowPort));
+            AccessDevicePort accessDevicePort = new AccessDevicePort(flowPort);
+            SubscriberAndDeviceInformation si = getProgrammedSubscriber(oltFlowService, accessDevicePort);
+            if (si == null) {
+                log.debug("si not found in programmedSubscribers, getting it from sadis.");
+                si = subsService.get(getPortName(flowPort));
+            }
 
             Boolean isNni = oltDeviceService.isNniPort(deviceService.getDevice(flowRule.deviceId()), flowPort.number());
             if (si == null && !isNni) {
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltUtils.java b/impl/src/main/java/org/opencord/olt/impl/OltUtils.java
index 45d684e..339bfb1 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OltUtils.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OltUtils.java
@@ -19,10 +19,17 @@
 import org.onlab.packet.VlanId;
 import org.onosproject.net.AnnotationKeys;
 import org.onosproject.net.Port;
+import org.opencord.olt.AccessDevicePort;
 import org.opencord.olt.FlowOperation;
+import org.opencord.olt.OltFlowServiceInterface;
+import org.opencord.olt.ServiceKey;
 import org.opencord.sadis.SubscriberAndDeviceInformation;
 import org.opencord.sadis.UniTagInformation;
 
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
 /**
  * Utility class for OLT app.
  */
@@ -83,4 +90,21 @@
         }
         return service;
     }
+
+    public static SubscriberAndDeviceInformation getProgrammedSubscriber(
+            OltFlowServiceInterface service, AccessDevicePort accessDevicePort) {
+        List<Map.Entry<ServiceKey, UniTagInformation>> entries =
+                service.getProgrammedSubscribers().entrySet().stream()
+                        .filter(entry -> entry.getKey().getPort().equals(accessDevicePort))
+                        .collect(Collectors.toList());
+        if (!entries.isEmpty()) {
+            List<UniTagInformation> programmedList = entries.stream()
+                    .map(entry -> entry.getKey().getService())
+                    .collect(Collectors.toList());
+            SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
+            si.setUniTagList(programmedList);
+            return si;
+        }
+        return null;
+    }
 }