configurable PPPoE flow per service

Change-Id: I9876c281d56b2e0d25b2ddd7e84bd54359788633
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
index ecaaa3d..88cd942 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
@@ -117,6 +117,8 @@
 import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_EAPOL_DEFAULT;
 import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_IGMP_ON_NNI;
 import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_IGMP_ON_NNI_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_PPPOE_ON_NNI;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_PPPOE_ON_NNI_DEFAULT;
 import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_PPPOE;
 import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_PPPOE_DEFAULT;
 import static org.opencord.olt.impl.OsgiPropertyConstants.UPSTREAM_OLT;
@@ -130,6 +132,7 @@
         ENABLE_DHCP_V6 + ":Boolean=" + ENABLE_DHCP_V6_DEFAULT,
         ENABLE_IGMP_ON_NNI + ":Boolean=" + ENABLE_IGMP_ON_NNI_DEFAULT,
         ENABLE_EAPOL + ":Boolean=" + ENABLE_EAPOL_DEFAULT,
+        ENABLE_PPPOE_ON_NNI + ":Boolean=" + ENABLE_PPPOE_ON_NNI_DEFAULT,
         ENABLE_PPPOE + ":Boolean=" + ENABLE_PPPOE_DEFAULT,
         DEFAULT_TP_ID + ":Integer=" + DEFAULT_TP_ID_DEFAULT,
         // FIXME remove this option as potentially dangerous in production
@@ -240,6 +243,11 @@
     /**
      * Send PPPoED authentication trap flows before subscriber provisioning.
      **/
+    protected boolean enablePppoeOnNni = ENABLE_PPPOE_ON_NNI_DEFAULT;
+
+    /**
+     * Enable flows for PPPoE if it is required in sadis config.
+     **/
     protected boolean enablePppoe = ENABLE_PPPOE_DEFAULT;
 
     /**
@@ -349,6 +357,11 @@
             enableEapol = eap;
         }
 
+        Boolean pppoeInNni = Tools.isPropertyEnabled(properties, ENABLE_PPPOE_ON_NNI);
+        if (pppoeInNni != null) {
+            enablePppoeOnNni = pppoeInNni;
+        }
+
         Boolean pppoe = Tools.isPropertyEnabled(properties, ENABLE_PPPOE);
         if (pppoe != null) {
             enablePppoe = pppoe;
@@ -362,13 +375,11 @@
         String tpId = get(properties, DEFAULT_TP_ID);
         defaultTechProfileId = isNullOrEmpty(tpId) ? DEFAULT_TP_ID_DEFAULT : Integer.parseInt(tpId.trim());
 
-        log.info("Modified. Values = enableDhcpOnNni: {}, enableDhcpV4: {}, " +
-                        "enableDhcpV6:{}, enableIgmpOnNni:{}, " +
-                        "enableEapol:{}, enablePppoe:{}, defaultTechProfileId:{}," +
-                        "waitForRemoval:{}",
-                enableDhcpOnNni, enableDhcpV4, enableDhcpV6,
-                enableIgmpOnNni, enableEapol, enablePppoe,
-                defaultTechProfileId, waitForRemoval);
+        log.info("Modified. Values = enableDhcpOnNni: {}, enableDhcpV4: {}, " + "enableDhcpV6:{}, " +
+                        "enableIgmpOnNni:{}, " + "enableEapol:{}, enablePppoeOnNni: {}, enablePppoe:{}, " +
+                        "defaultTechProfileId:{}," + "waitForRemoval:{}",
+                enableDhcpOnNni, enableDhcpV4, enableDhcpV6, enableIgmpOnNni, enableEapol,
+                enablePppoeOnNni, enablePppoe, defaultTechProfileId, waitForRemoval);
 
     }
 
@@ -454,7 +465,7 @@
                     null, null, NONE_TP_ID, VlanId.NONE, VlanId.ANY, -1);
         }
 
-        if (enablePppoe) {
+        if (enablePppoeOnNni) {
             log.debug("{} PPPoE flow on NNI {} for device {}", flowOpToString(action), port.number(), device.id());
             processPPPoEDFilteringObjectives(device.id(), port, action, FlowDirection.DOWNSTREAM,
                     null, null, NONE_TP_ID, VlanId.NONE, VlanId.ANY, null);
@@ -562,6 +573,8 @@
         // always process them before
         handleSubscriberEapolFlows(sub, FlowOperation.ADD, sub.subscriberAndDeviceInformation);
 
+        handleSubscriberPppoeFlows(sub.device.id(), sub.port, FlowOperation.ADD, sub.subscriberAndDeviceInformation);
+
         handleSubscriberDataFlows(sub.device, sub.port, FlowOperation.ADD,
                 sub.subscriberAndDeviceInformation, multicastServiceName);
 
@@ -582,6 +595,8 @@
 
         handleSubscriberDhcpFlows(sub.device.id(), sub.port, FlowOperation.REMOVE, si);
 
+        handleSubscriberPppoeFlows(sub.device.id(), sub.port, FlowOperation.REMOVE, sub.subscriberAndDeviceInformation);
+
         if (enableEapol) {
             // remove the tagged eapol
             handleSubscriberEapolFlows(sub, FlowOperation.REMOVE, si);
@@ -661,6 +676,18 @@
     }
 
     @Override
+    public boolean hasPppoeFlows(Port port, UniTagInformation uti) {
+        OltPortStatus status = getOltPortStatus(port, uti);
+        if (log.isTraceEnabled()) {
+            log.trace("Status during PPPoE flow check {} for port {} and service {}",
+                    status, portWithName(port), uti.getServiceName());
+        }
+        return status != null &&
+                (status.pppoeStatus == OltFlowsStatus.ADDED ||
+                        status.pppoeStatus == OltFlowsStatus.PENDING_ADD);
+    }
+
+    @Override
     public boolean hasSubscriberFlows(Port port, UniTagInformation uti) {
 
         OltPortStatus status = getOltPortStatus(port, uti);
@@ -775,7 +802,7 @@
         if (vlanId.id().equals(EAPOL_DEFAULT_VLAN)) {
             OltFlowsStatus status = action == FlowOperation.ADD ?
                     OltFlowsStatus.PENDING_ADD : OltFlowsStatus.PENDING_REMOVE;
-            updateConnectPointStatus(sk, status, OltFlowsStatus.NONE, OltFlowsStatus.NONE);
+            updateConnectPointStatus(sk, status, OltFlowsStatus.NONE, OltFlowsStatus.NONE, OltFlowsStatus.NONE);
 
         }
 
@@ -854,7 +881,7 @@
 
                         if (vlanId.id().equals(EAPOL_DEFAULT_VLAN)) {
                             updateConnectPointStatus(sk,
-                                    OltFlowsStatus.ERROR, null, null);
+                                    OltFlowsStatus.ERROR, null, null, null);
                         }
                     }
                 });
@@ -1017,6 +1044,42 @@
         });
     }
 
+    protected void handleSubscriberPppoeFlows(DeviceId deviceId, Port port,
+                                             FlowOperation action,
+                                             SubscriberAndDeviceInformation si) {
+        si.uniTagList().forEach(uti -> {
+
+            if (!uti.getIsPppoeRequired()) {
+                return;
+            }
+
+            // if it's an ADD skip if flows are there,
+            // if it's a DELETE skip if flows are not there
+            boolean hasFlows = hasPppoeFlows(port, uti);
+            if (action == FlowOperation.ADD && hasFlows ||
+                    action == FlowOperation.REMOVE && !hasFlows) {
+                log.debug("Not dealing with PPPoE {} on {} as PPPoE flow status is {}", action,
+                        uti.getServiceName(), hasFlows);
+                return;
+            }
+
+            log.info("{} PPPoE flows for subscriber on {} and service {}",
+                    flowOpToString(action), portWithName(port), uti.getServiceName());
+
+            // if we reached here a meter already exists
+            MeterId meterId = oltMeterService
+                    .getMeterIdForBandwidthProfile(deviceId, uti.getUpstreamBandwidthProfile());
+            MeterId oltMeterId = oltMeterService
+                    .getMeterIdForBandwidthProfile(deviceId, uti.getUpstreamOltBandwidthProfile());
+
+            if (enablePppoe) {
+                processPPPoEDFilteringObjectives(deviceId, port, action, FlowDirection.UPSTREAM, meterId, oltMeterId,
+                        uti.getTechnologyProfileId(), uti.getPonCTag(), uti.getUniTagMatch(),
+                        (byte) uti.getUsPonCTagPriority());
+            }
+        });
+    }
+
     // FIXME return boolean, if this fails we need to retry
     protected void handleSubscriberDataFlows(Device device, Port port,
                                              FlowOperation action,
@@ -1050,7 +1113,7 @@
             ServiceKey sk = new ServiceKey(new AccessDevicePort(port), uti);
             OltFlowsStatus status = action.equals(FlowOperation.ADD) ?
                     OltFlowsStatus.PENDING_ADD : OltFlowsStatus.PENDING_REMOVE;
-            updateConnectPointStatus(sk, null, status, null);
+            updateConnectPointStatus(sk, null, status, null, null);
 
             // upstream flows
             MeterId usMeterId = oltMeterService
@@ -1080,7 +1143,7 @@
 
         OltFlowsStatus status = action.equals(FlowOperation.ADD) ?
                 OltFlowsStatus.PENDING_ADD : OltFlowsStatus.PENDING_REMOVE;
-        updateConnectPointStatus(sk, null, null, status);
+        updateConnectPointStatus(sk, null, null, status, null);
 
         DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
         TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
@@ -1134,7 +1197,7 @@
                         portWithName(port),
                         action,
                         error);
-                updateConnectPointStatus(sk, null, null, OltFlowsStatus.ERROR);
+                updateConnectPointStatus(sk, null, null, OltFlowsStatus.ERROR, null);
             }
         });
         flowObjectiveService.filter(deviceId, dhcpUpstream);
@@ -1312,7 +1375,7 @@
             public void onError(Objective objective, ObjectiveError error) {
                 log.error("Upstream Data plane filter for {} failed {} because {}.",
                         sk, action, error);
-                updateConnectPointStatus(sk, null, OltFlowsStatus.ERROR, null);
+                updateConnectPointStatus(sk, null, OltFlowsStatus.ERROR, null, null);
             }
         };
 
@@ -1403,7 +1466,7 @@
             public void onError(Objective objective, ObjectiveError error) {
                 log.info("Downstream Data plane filter for {} failed {} because {}.",
                         sk, action, error);
-                updateConnectPointStatus(sk, null, OltFlowsStatus.ERROR, null);
+                updateConnectPointStatus(sk, null, OltFlowsStatus.ERROR, null, null);
             }
         };
 
@@ -1511,7 +1574,8 @@
     }
 
     protected void updateConnectPointStatus(ServiceKey key, OltFlowsStatus eapolStatus,
-                                            OltFlowsStatus subscriberFlowsStatus, OltFlowsStatus dhcpStatus) {
+                                            OltFlowsStatus subscriberFlowsStatus, OltFlowsStatus dhcpStatus,
+                                            OltFlowsStatus pppoeStatus) {
         if (log.isTraceEnabled()) {
             log.trace("Updating cpStatus {} with values: eapolFlow={}, subscriberFlows={}, dhcpFlow={}",
                     key, eapolStatus, subscriberFlowsStatus, dhcpStatus);
@@ -1543,7 +1607,8 @@
                 status = new OltPortStatus(
                         eapolStatus != null ? eapolStatus : OltFlowsStatus.NONE,
                         subscriberFlowsStatus != null ? subscriberFlowsStatus : OltFlowsStatus.NONE,
-                        dhcpStatus != null ? dhcpStatus : OltFlowsStatus.NONE
+                        dhcpStatus != null ? dhcpStatus : OltFlowsStatus.NONE,
+                        pppoeStatus != null ? pppoeStatus : OltFlowsStatus.NONE
                 );
             } else {
                 if (eapolStatus != null) {
@@ -1609,7 +1674,7 @@
                 if (log.isTraceEnabled()) {
                     log.trace("update defaultEapolStatus {} on {}", status, sk);
                 }
-                updateConnectPointStatus(sk, status, null, null);
+                updateConnectPointStatus(sk, status, null, null, null);
             } else if (isDhcpFlow(flowRule)) {
                 ServiceKey sk = getSubscriberKeyFromFlowRule(flowRule);
                 if (sk == null) {
@@ -1618,7 +1683,16 @@
                 if (log.isTraceEnabled()) {
                     log.trace("update dhcpStatus {} on {}", status, sk);
                 }
-                updateConnectPointStatus(sk, null, null, status);
+                updateConnectPointStatus(sk, null, null, status, null);
+            } else if (isPppoeFlow(flowRule)) {
+                ServiceKey sk = getSubscriberKeyFromFlowRule(flowRule);
+                if (sk == null) {
+                    return;
+                }
+                if (log.isTraceEnabled()) {
+                    log.trace("update pppoeStatus {} on {}", status, sk);
+                }
+                updateConnectPointStatus(sk, null, null, null, status);
             } else if (isDataFlow(flowRule)) {
 
                 if (oltDeviceService.isNniPort(deviceService.getDevice(flowRule.deviceId()),
@@ -1634,7 +1708,7 @@
                 if (log.isTraceEnabled()) {
                     log.trace("update dataplaneStatus {} on {}", status, sk);
                 }
-                updateConnectPointStatus(sk, null, status, null);
+                updateConnectPointStatus(sk, null, status, null, null);
             }
         }
 
@@ -1686,6 +1760,16 @@
                     (src.udpPort().toInt() == 68 || src.udpPort().toInt() == 67);
         }
 
+        private boolean isPppoeFlow(FlowRule flowRule) {
+            EthTypeCriterion ethTypeCriterion = (EthTypeCriterion) flowRule.selector()
+                    .getCriterion(Criterion.Type.ETH_TYPE);
+
+            if (ethTypeCriterion == null) {
+                return false;
+            }
+            return EthType.EtherType.PPPoED.ethType().equals(ethTypeCriterion.ethType());
+        }
+
         private boolean isDataFlow(FlowRule flowRule) {
             // we consider subscriber flows the one that matches on VLAN_VID
             // method is valid only because it's the last check after EAPOL and DHCP.
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltFlowServiceInterface.java b/impl/src/main/java/org/opencord/olt/impl/OltFlowServiceInterface.java
index e55c789..b00594f 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OltFlowServiceInterface.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OltFlowServiceInterface.java
@@ -70,6 +70,14 @@
     boolean hasDhcpFlows(Port port, UniTagInformation uti);
 
     /**
+     * Checks if the pppoe flows are installed.
+     * @param port the port
+     * @param uti the UniTagInformation to check for
+     * @return true if installed, false otherwise.
+     */
+    boolean hasPppoeFlows(Port port, UniTagInformation uti);
+
+    /**
      * Checks if the subscriber flows are installed.
      * @param port the port
      * @param uti the UniTagInformation to check for
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltPortStatus.java b/impl/src/main/java/org/opencord/olt/impl/OltPortStatus.java
index 655a0ac..32550b6 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OltPortStatus.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OltPortStatus.java
@@ -28,13 +28,16 @@
     // NOTE we need to keep track of the DHCP status as that is installed before the other flows
     // if macLearning is enabled (DHCP is needed to learn the MacAddress from the host)
     public OltFlowService.OltFlowsStatus dhcpStatus;
+    public OltFlowService.OltFlowsStatus pppoeStatus;
 
     public OltPortStatus(OltFlowService.OltFlowsStatus defaultEapolStatus,
                          OltFlowService.OltFlowsStatus subscriberFlowsStatus,
-                         OltFlowService.OltFlowsStatus dhcpStatus) {
+                         OltFlowService.OltFlowsStatus dhcpStatus,
+                         OltFlowService.OltFlowsStatus pppoeStatus) {
         this.defaultEapolStatus = defaultEapolStatus;
         this.subscriberFlowsStatus = subscriberFlowsStatus;
         this.dhcpStatus = dhcpStatus;
+        this.pppoeStatus = pppoeStatus;
     }
 
     @Override
diff --git a/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java b/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java
index 35095b6..089edbc 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java
@@ -55,6 +55,9 @@
     public static final String ENABLE_EAPOL = "enableEapol";
     public static final boolean ENABLE_EAPOL_DEFAULT = true;
 
+    public static final String ENABLE_PPPOE_ON_NNI = "enablePppoeOnNni";
+    public static final boolean ENABLE_PPPOE_ON_NNI_DEFAULT = false;
+
     public static final String ENABLE_PPPOE = "enablePppoe";
     public static final boolean ENABLE_PPPOE_DEFAULT = false;
 
diff --git a/impl/src/test/java/org/opencord/olt/impl/OltFlowServiceTest.java b/impl/src/test/java/org/opencord/olt/impl/OltFlowServiceTest.java
index 326311d..2f0f82e 100644
--- a/impl/src/test/java/org/opencord/olt/impl/OltFlowServiceTest.java
+++ b/impl/src/test/java/org/opencord/olt/impl/OltFlowServiceTest.java
@@ -157,22 +157,22 @@
         // cpStatus map for the test
         component.cpStatus = component.storageService.
                 <ServiceKey, OltPortStatus>consistentMapBuilder().build().asJavaMap();
-        OltPortStatus cp1Status = new OltPortStatus(PENDING_ADD, NONE, NONE);
+        OltPortStatus cp1Status = new OltPortStatus(PENDING_ADD, NONE, NONE, NONE);
         component.cpStatus.put(sk1, cp1Status);
 
         //check that we only update the provided value
-        component.updateConnectPointStatus(sk1, ADDED, null, null);
+        component.updateConnectPointStatus(sk1, ADDED, null, null, null);
         OltPortStatus updated = component.cpStatus.get(sk1);
         Assert.assertEquals(ADDED, updated.defaultEapolStatus);
         Assert.assertEquals(NONE, updated.subscriberFlowsStatus);
         Assert.assertEquals(NONE, updated.dhcpStatus);
 
         // check that it creates an entry if it does not exist
-        component.updateConnectPointStatus(sk2, PENDING_ADD, NONE, NONE);
+        component.updateConnectPointStatus(sk2, PENDING_ADD, NONE, NONE, NONE);
         Assert.assertNotNull(component.cpStatus.get(sk2));
 
         // check that if we create a new entry with null values they're converted to NONE
-        component.updateConnectPointStatus(sk3, null, null, null);
+        component.updateConnectPointStatus(sk3, null, null, null, null);
         updated = component.cpStatus.get(sk3);
         Assert.assertEquals(NONE, updated.defaultEapolStatus);
         Assert.assertEquals(NONE, updated.subscriberFlowsStatus);
@@ -201,12 +201,12 @@
                 <ServiceKey, OltPortStatus>consistentMapBuilder().build().asJavaMap();
 
         // check that an entry is not created if the only status is pending remove
-        component.updateConnectPointStatus(sk1, null, null, PENDING_REMOVE);
+        component.updateConnectPointStatus(sk1, null, null, PENDING_REMOVE, null);
         OltPortStatus entry = component.cpStatus.get(sk1);
         Assert.assertNull(entry);
 
         // check that an entry is not created if the only status is ERROR
-        component.updateConnectPointStatus(sk1, null, null, ERROR);
+        component.updateConnectPointStatus(sk1, null, null, ERROR, null);
         entry = component.cpStatus.get(sk1);
         Assert.assertNull(entry);
     }
@@ -230,12 +230,14 @@
         OltPortStatus portStatusAdded = new OltPortStatus(
                 OltFlowService.OltFlowsStatus.ADDED,
                 NONE,
+                null,
                 null
         );
 
         OltPortStatus portStatusRemoved = new OltPortStatus(
                 REMOVED,
                 NONE,
+                null,
                 null
         );
 
@@ -266,19 +268,22 @@
         OltPortStatus withDefaultEapol = new OltPortStatus(
                 ADDED,
                 NONE,
+                NONE,
                 NONE
         );
 
         OltPortStatus withDhcp = new OltPortStatus(
                 REMOVED,
                 NONE,
-                ADDED
+                ADDED,
+                NONE
         );
 
         OltPortStatus withSubFlow = new OltPortStatus(
                 REMOVED,
                 ADDED,
-                ADDED
+                ADDED,
+                NONE
         );
 
         component.cpStatus.put(skWithStatus, withDefaultEapol);
@@ -558,6 +563,52 @@
     }
 
     @Test
+    public void testHandleNniFlowsPppoe() {
+        component.enablePppoeOnNni = true;
+        component.enablePppoe = true;
+        component.handleNniFlows(testDevice, nniPort, OltFlowService.FlowOperation.ADD);
+
+        FilteringObjective expectedFilter = DefaultFilteringObjective.builder()
+                .permit()
+                .withKey(Criteria.matchInPort(nniPort.number()))
+                .addCondition(Criteria.matchEthType(EthType.EtherType.PPPoED.ethType()))
+                .fromApp(testAppId)
+                .withPriority(10000)
+                .withMeta(DefaultTrafficTreatment.builder().setOutput(PortNumber.CONTROLLER).build())
+                .add();
+
+        // invoked with the correct Pppoe filtering objective
+        verify(component.flowObjectiveService, times(1))
+                .filter(eq(deviceId), argThat(new FilteringObjectiveMatcher(expectedFilter)));
+        // invoked only twice, LLDP and DHCP
+        verify(component.flowObjectiveService, times(2))
+                .filter(eq(deviceId), any());
+    }
+
+    @Test
+    public void testRemoveNniFlowsPppoe() {
+        component.enablePppoeOnNni = true;
+        component.enablePppoe = true;
+        component.handleNniFlows(testDevice, nniPortDisabled, OltFlowService.FlowOperation.REMOVE);
+
+        FilteringObjective expectedFilter = DefaultFilteringObjective.builder()
+                .deny()
+                .withKey(Criteria.matchInPort(nniPort.number()))
+                .addCondition(Criteria.matchEthType(EthType.EtherType.PPPoED.ethType()))
+                .fromApp(testAppId)
+                .withPriority(10000)
+                .withMeta(DefaultTrafficTreatment.builder().setOutput(PortNumber.CONTROLLER).build())
+                .add();
+
+        // invoked with the correct Pppoe filtering objective
+        verify(component.flowObjectiveService, times(1))
+                .filter(eq(deviceId), argThat(new FilteringObjectiveMatcher(expectedFilter)));
+        // invoked only twice, LLDP and DHCP
+        verify(component.flowObjectiveService, times(2))
+                .filter(eq(deviceId), any());
+    }
+
+    @Test
     public void testMacAddressNotRequired() {
         // create a single service that doesn't require mac address
         List<UniTagInformation> uniTagInformationList = new LinkedList<>();
@@ -671,6 +722,56 @@
     }
 
     @Test
+    public void testHandleSubscriberPppoeFlowsAdd() {
+
+        String usBp = "usBp";
+        String usOltBp = "usOltBp";
+        component.enablePppoe = true;
+
+        // create two services, one requires Pppoe the other doesn't
+        List<UniTagInformation> uniTagInformationList = new LinkedList<>();
+        VlanId hsiaCtag = VlanId.vlanId((short) 11);
+        UniTagInformation hsia = new UniTagInformation.Builder()
+                .setPonCTag(hsiaCtag)
+                .setTechnologyProfileId(64)
+                .setUniTagMatch(VlanId.vlanId(VlanId.NO_VID))
+                .setUpstreamBandwidthProfile(usBp)
+                .setUpstreamOltBandwidthProfile(usOltBp)
+                .setIsPppoeRequired(true).build();
+        UniTagInformation mc = new UniTagInformation.Builder()
+                .setIsPppoeRequired(false).build();
+        uniTagInformationList.add(hsia);
+        uniTagInformationList.add(mc);
+
+        SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
+        si.setUniTagList(uniTagInformationList);
+
+        final DiscoveredSubscriber addedSub =
+                new DiscoveredSubscriber(testDevice,
+                        uniUpdateEnabled, DiscoveredSubscriber.Status.ADDED,
+                        false, si);
+
+        // return meter IDs
+        doReturn(MeterId.meterId(2)).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(addedSub.device.id(), usBp);
+        doReturn(MeterId.meterId(3)).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(addedSub.device.id(), usOltBp);
+
+        FilteringObjective expectedFilter = DefaultFilteringObjective.builder()
+                .permit()
+                .withKey(Criteria.matchInPort(addedSub.port.number()))
+                .addCondition(Criteria.matchEthType(EthType.EtherType.PPPoED.ethType()))
+                .fromApp(testAppId)
+                .withPriority(10000)
+                .add();
+
+        component.handleSubscriberPppoeFlows(addedSub.device.id(), addedSub.port,
+                OltFlowService.FlowOperation.ADD, si);
+        verify(component.flowObjectiveService, times(1))
+                .filter(eq(addedSub.device.id()), argThat(new FilteringObjectiveMatcher(expectedFilter)));
+    }
+
+    @Test
     public void testInternalFlowListenerNotMaster() {
         doReturn(false).when(component.oltDeviceService).isLocalLeader(any());
 
diff --git a/pom.xml b/pom.xml
index 1b35b6a..d04dd41 100644
--- a/pom.xml
+++ b/pom.xml
@@ -35,7 +35,7 @@
     <description>OLT Application</description>
     <url>http://onosproject.org</url>
     <properties>
-        <sadis.api.version>5.5.1</sadis.api.version>
+        <sadis.api.version>5.6.0-SNAPSHOT</sadis.api.version>
         <olt.api.version>5.1.0-SNAPSHOT</olt.api.version>
     </properties>
     <dependencies>