configurable PPPoE flow per service

Change-Id: I9876c281d56b2e0d25b2ddd7e84bd54359788633
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
index ecaaa3d..88cd942 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
@@ -117,6 +117,8 @@
 import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_EAPOL_DEFAULT;
 import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_IGMP_ON_NNI;
 import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_IGMP_ON_NNI_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_PPPOE_ON_NNI;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_PPPOE_ON_NNI_DEFAULT;
 import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_PPPOE;
 import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_PPPOE_DEFAULT;
 import static org.opencord.olt.impl.OsgiPropertyConstants.UPSTREAM_OLT;
@@ -130,6 +132,7 @@
         ENABLE_DHCP_V6 + ":Boolean=" + ENABLE_DHCP_V6_DEFAULT,
         ENABLE_IGMP_ON_NNI + ":Boolean=" + ENABLE_IGMP_ON_NNI_DEFAULT,
         ENABLE_EAPOL + ":Boolean=" + ENABLE_EAPOL_DEFAULT,
+        ENABLE_PPPOE_ON_NNI + ":Boolean=" + ENABLE_PPPOE_ON_NNI_DEFAULT,
         ENABLE_PPPOE + ":Boolean=" + ENABLE_PPPOE_DEFAULT,
         DEFAULT_TP_ID + ":Integer=" + DEFAULT_TP_ID_DEFAULT,
         // FIXME remove this option as potentially dangerous in production
@@ -240,6 +243,11 @@
     /**
      * Send PPPoED authentication trap flows before subscriber provisioning.
      **/
+    protected boolean enablePppoeOnNni = ENABLE_PPPOE_ON_NNI_DEFAULT;
+
+    /**
+     * Enable flows for PPPoE if it is required in sadis config.
+     **/
     protected boolean enablePppoe = ENABLE_PPPOE_DEFAULT;
 
     /**
@@ -349,6 +357,11 @@
             enableEapol = eap;
         }
 
+        Boolean pppoeInNni = Tools.isPropertyEnabled(properties, ENABLE_PPPOE_ON_NNI);
+        if (pppoeInNni != null) {
+            enablePppoeOnNni = pppoeInNni;
+        }
+
         Boolean pppoe = Tools.isPropertyEnabled(properties, ENABLE_PPPOE);
         if (pppoe != null) {
             enablePppoe = pppoe;
@@ -362,13 +375,11 @@
         String tpId = get(properties, DEFAULT_TP_ID);
         defaultTechProfileId = isNullOrEmpty(tpId) ? DEFAULT_TP_ID_DEFAULT : Integer.parseInt(tpId.trim());
 
-        log.info("Modified. Values = enableDhcpOnNni: {}, enableDhcpV4: {}, " +
-                        "enableDhcpV6:{}, enableIgmpOnNni:{}, " +
-                        "enableEapol:{}, enablePppoe:{}, defaultTechProfileId:{}," +
-                        "waitForRemoval:{}",
-                enableDhcpOnNni, enableDhcpV4, enableDhcpV6,
-                enableIgmpOnNni, enableEapol, enablePppoe,
-                defaultTechProfileId, waitForRemoval);
+        log.info("Modified. Values = enableDhcpOnNni: {}, enableDhcpV4: {}, " + "enableDhcpV6:{}, " +
+                        "enableIgmpOnNni:{}, " + "enableEapol:{}, enablePppoeOnNni: {}, enablePppoe:{}, " +
+                        "defaultTechProfileId:{}," + "waitForRemoval:{}",
+                enableDhcpOnNni, enableDhcpV4, enableDhcpV6, enableIgmpOnNni, enableEapol,
+                enablePppoeOnNni, enablePppoe, defaultTechProfileId, waitForRemoval);
 
     }
 
@@ -454,7 +465,7 @@
                     null, null, NONE_TP_ID, VlanId.NONE, VlanId.ANY, -1);
         }
 
-        if (enablePppoe) {
+        if (enablePppoeOnNni) {
             log.debug("{} PPPoE flow on NNI {} for device {}", flowOpToString(action), port.number(), device.id());
             processPPPoEDFilteringObjectives(device.id(), port, action, FlowDirection.DOWNSTREAM,
                     null, null, NONE_TP_ID, VlanId.NONE, VlanId.ANY, null);
@@ -562,6 +573,8 @@
         // always process them before
         handleSubscriberEapolFlows(sub, FlowOperation.ADD, sub.subscriberAndDeviceInformation);
 
+        handleSubscriberPppoeFlows(sub.device.id(), sub.port, FlowOperation.ADD, sub.subscriberAndDeviceInformation);
+
         handleSubscriberDataFlows(sub.device, sub.port, FlowOperation.ADD,
                 sub.subscriberAndDeviceInformation, multicastServiceName);
 
@@ -582,6 +595,8 @@
 
         handleSubscriberDhcpFlows(sub.device.id(), sub.port, FlowOperation.REMOVE, si);
 
+        handleSubscriberPppoeFlows(sub.device.id(), sub.port, FlowOperation.REMOVE, sub.subscriberAndDeviceInformation);
+
         if (enableEapol) {
             // remove the tagged eapol
             handleSubscriberEapolFlows(sub, FlowOperation.REMOVE, si);
@@ -661,6 +676,18 @@
     }
 
     @Override
+    public boolean hasPppoeFlows(Port port, UniTagInformation uti) {
+        OltPortStatus status = getOltPortStatus(port, uti);
+        if (log.isTraceEnabled()) {
+            log.trace("Status during PPPoE flow check {} for port {} and service {}",
+                    status, portWithName(port), uti.getServiceName());
+        }
+        return status != null &&
+                (status.pppoeStatus == OltFlowsStatus.ADDED ||
+                        status.pppoeStatus == OltFlowsStatus.PENDING_ADD);
+    }
+
+    @Override
     public boolean hasSubscriberFlows(Port port, UniTagInformation uti) {
 
         OltPortStatus status = getOltPortStatus(port, uti);
@@ -775,7 +802,7 @@
         if (vlanId.id().equals(EAPOL_DEFAULT_VLAN)) {
             OltFlowsStatus status = action == FlowOperation.ADD ?
                     OltFlowsStatus.PENDING_ADD : OltFlowsStatus.PENDING_REMOVE;
-            updateConnectPointStatus(sk, status, OltFlowsStatus.NONE, OltFlowsStatus.NONE);
+            updateConnectPointStatus(sk, status, OltFlowsStatus.NONE, OltFlowsStatus.NONE, OltFlowsStatus.NONE);
 
         }
 
@@ -854,7 +881,7 @@
 
                         if (vlanId.id().equals(EAPOL_DEFAULT_VLAN)) {
                             updateConnectPointStatus(sk,
-                                    OltFlowsStatus.ERROR, null, null);
+                                    OltFlowsStatus.ERROR, null, null, null);
                         }
                     }
                 });
@@ -1017,6 +1044,42 @@
         });
     }
 
+    protected void handleSubscriberPppoeFlows(DeviceId deviceId, Port port,
+                                             FlowOperation action,
+                                             SubscriberAndDeviceInformation si) {
+        si.uniTagList().forEach(uti -> {
+
+            if (!uti.getIsPppoeRequired()) {
+                return;
+            }
+
+            // if it's an ADD skip if flows are there,
+            // if it's a DELETE skip if flows are not there
+            boolean hasFlows = hasPppoeFlows(port, uti);
+            if (action == FlowOperation.ADD && hasFlows ||
+                    action == FlowOperation.REMOVE && !hasFlows) {
+                log.debug("Not dealing with PPPoE {} on {} as PPPoE flow status is {}", action,
+                        uti.getServiceName(), hasFlows);
+                return;
+            }
+
+            log.info("{} PPPoE flows for subscriber on {} and service {}",
+                    flowOpToString(action), portWithName(port), uti.getServiceName());
+
+            // if we reached here a meter already exists
+            MeterId meterId = oltMeterService
+                    .getMeterIdForBandwidthProfile(deviceId, uti.getUpstreamBandwidthProfile());
+            MeterId oltMeterId = oltMeterService
+                    .getMeterIdForBandwidthProfile(deviceId, uti.getUpstreamOltBandwidthProfile());
+
+            if (enablePppoe) {
+                processPPPoEDFilteringObjectives(deviceId, port, action, FlowDirection.UPSTREAM, meterId, oltMeterId,
+                        uti.getTechnologyProfileId(), uti.getPonCTag(), uti.getUniTagMatch(),
+                        (byte) uti.getUsPonCTagPriority());
+            }
+        });
+    }
+
     // FIXME return boolean, if this fails we need to retry
     protected void handleSubscriberDataFlows(Device device, Port port,
                                              FlowOperation action,
@@ -1050,7 +1113,7 @@
             ServiceKey sk = new ServiceKey(new AccessDevicePort(port), uti);
             OltFlowsStatus status = action.equals(FlowOperation.ADD) ?
                     OltFlowsStatus.PENDING_ADD : OltFlowsStatus.PENDING_REMOVE;
-            updateConnectPointStatus(sk, null, status, null);
+            updateConnectPointStatus(sk, null, status, null, null);
 
             // upstream flows
             MeterId usMeterId = oltMeterService
@@ -1080,7 +1143,7 @@
 
         OltFlowsStatus status = action.equals(FlowOperation.ADD) ?
                 OltFlowsStatus.PENDING_ADD : OltFlowsStatus.PENDING_REMOVE;
-        updateConnectPointStatus(sk, null, null, status);
+        updateConnectPointStatus(sk, null, null, status, null);
 
         DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
         TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
@@ -1134,7 +1197,7 @@
                         portWithName(port),
                         action,
                         error);
-                updateConnectPointStatus(sk, null, null, OltFlowsStatus.ERROR);
+                updateConnectPointStatus(sk, null, null, OltFlowsStatus.ERROR, null);
             }
         });
         flowObjectiveService.filter(deviceId, dhcpUpstream);
@@ -1312,7 +1375,7 @@
             public void onError(Objective objective, ObjectiveError error) {
                 log.error("Upstream Data plane filter for {} failed {} because {}.",
                         sk, action, error);
-                updateConnectPointStatus(sk, null, OltFlowsStatus.ERROR, null);
+                updateConnectPointStatus(sk, null, OltFlowsStatus.ERROR, null, null);
             }
         };
 
@@ -1403,7 +1466,7 @@
             public void onError(Objective objective, ObjectiveError error) {
                 log.info("Downstream Data plane filter for {} failed {} because {}.",
                         sk, action, error);
-                updateConnectPointStatus(sk, null, OltFlowsStatus.ERROR, null);
+                updateConnectPointStatus(sk, null, OltFlowsStatus.ERROR, null, null);
             }
         };
 
@@ -1511,7 +1574,8 @@
     }
 
     protected void updateConnectPointStatus(ServiceKey key, OltFlowsStatus eapolStatus,
-                                            OltFlowsStatus subscriberFlowsStatus, OltFlowsStatus dhcpStatus) {
+                                            OltFlowsStatus subscriberFlowsStatus, OltFlowsStatus dhcpStatus,
+                                            OltFlowsStatus pppoeStatus) {
         if (log.isTraceEnabled()) {
             log.trace("Updating cpStatus {} with values: eapolFlow={}, subscriberFlows={}, dhcpFlow={}",
                     key, eapolStatus, subscriberFlowsStatus, dhcpStatus);
@@ -1543,7 +1607,8 @@
                 status = new OltPortStatus(
                         eapolStatus != null ? eapolStatus : OltFlowsStatus.NONE,
                         subscriberFlowsStatus != null ? subscriberFlowsStatus : OltFlowsStatus.NONE,
-                        dhcpStatus != null ? dhcpStatus : OltFlowsStatus.NONE
+                        dhcpStatus != null ? dhcpStatus : OltFlowsStatus.NONE,
+                        pppoeStatus != null ? pppoeStatus : OltFlowsStatus.NONE
                 );
             } else {
                 if (eapolStatus != null) {
@@ -1609,7 +1674,7 @@
                 if (log.isTraceEnabled()) {
                     log.trace("update defaultEapolStatus {} on {}", status, sk);
                 }
-                updateConnectPointStatus(sk, status, null, null);
+                updateConnectPointStatus(sk, status, null, null, null);
             } else if (isDhcpFlow(flowRule)) {
                 ServiceKey sk = getSubscriberKeyFromFlowRule(flowRule);
                 if (sk == null) {
@@ -1618,7 +1683,16 @@
                 if (log.isTraceEnabled()) {
                     log.trace("update dhcpStatus {} on {}", status, sk);
                 }
-                updateConnectPointStatus(sk, null, null, status);
+                updateConnectPointStatus(sk, null, null, status, null);
+            } else if (isPppoeFlow(flowRule)) {
+                ServiceKey sk = getSubscriberKeyFromFlowRule(flowRule);
+                if (sk == null) {
+                    return;
+                }
+                if (log.isTraceEnabled()) {
+                    log.trace("update pppoeStatus {} on {}", status, sk);
+                }
+                updateConnectPointStatus(sk, null, null, null, status);
             } else if (isDataFlow(flowRule)) {
 
                 if (oltDeviceService.isNniPort(deviceService.getDevice(flowRule.deviceId()),
@@ -1634,7 +1708,7 @@
                 if (log.isTraceEnabled()) {
                     log.trace("update dataplaneStatus {} on {}", status, sk);
                 }
-                updateConnectPointStatus(sk, null, status, null);
+                updateConnectPointStatus(sk, null, status, null, null);
             }
         }
 
@@ -1686,6 +1760,16 @@
                     (src.udpPort().toInt() == 68 || src.udpPort().toInt() == 67);
         }
 
+        private boolean isPppoeFlow(FlowRule flowRule) {
+            EthTypeCriterion ethTypeCriterion = (EthTypeCriterion) flowRule.selector()
+                    .getCriterion(Criterion.Type.ETH_TYPE);
+
+            if (ethTypeCriterion == null) {
+                return false;
+            }
+            return EthType.EtherType.PPPoED.ethType().equals(ethTypeCriterion.ethType());
+        }
+
         private boolean isDataFlow(FlowRule flowRule) {
             // we consider subscriber flows the one that matches on VLAN_VID
             // method is valid only because it's the last check after EAPOL and DHCP.