[VOL-4246] Feature parity with the previous implementation

Change-Id: I3741edb3c1b88b1cf8b5e6d4ff0900132e2e5e6a
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
index 1868421..ee184a5 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-present Open Networking Foundation
+ * Copyright 2021-present Open Networking Foundation
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -13,8 +13,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.opencord.olt.impl;
 
+import com.google.common.collect.ImmutableMap;
 import org.onlab.packet.EthType;
 import org.onlab.packet.IPv4;
 import org.onlab.packet.IPv6;
@@ -26,18 +28,31 @@
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
-import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.Annotations;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.Host;
+import org.onosproject.net.Port;
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.flow.DefaultTrafficSelector;
 import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleEvent;
+import org.onosproject.net.flow.FlowRuleListener;
+import org.onosproject.net.flow.FlowRuleService;
 import org.onosproject.net.flow.TrafficSelector;
 import org.onosproject.net.flow.TrafficTreatment;
 import org.onosproject.net.flow.criteria.Criteria;
+import org.onosproject.net.flow.criteria.Criterion;
+import org.onosproject.net.flow.criteria.EthTypeCriterion;
+import org.onosproject.net.flow.criteria.IPProtocolCriterion;
+import org.onosproject.net.flow.criteria.PortCriterion;
+import org.onosproject.net.flow.criteria.UdpPortCriterion;
+import org.onosproject.net.flow.criteria.VlanIdCriterion;
+import org.onosproject.net.flow.instructions.L2ModificationInstruction;
 import org.onosproject.net.flowobjective.DefaultFilteringObjective;
 import org.onosproject.net.flowobjective.DefaultForwardingObjective;
 import org.onosproject.net.flowobjective.FilteringObjective;
@@ -46,13 +61,11 @@
 import org.onosproject.net.flowobjective.Objective;
 import org.onosproject.net.flowobjective.ObjectiveContext;
 import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.host.HostService;
 import org.onosproject.net.meter.MeterId;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
-import org.opencord.olt.AccessDevicePort;
-import org.opencord.olt.internalapi.AccessDeviceFlowService;
-import org.opencord.olt.internalapi.AccessDeviceMeterService;
 import org.opencord.sadis.BandwidthProfileInformation;
 import org.opencord.sadis.BaseInformationService;
 import org.opencord.sadis.SadisService;
@@ -67,26 +80,46 @@
 import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.osgi.service.component.annotations.ReferencePolicy;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
 import java.util.Dictionary;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static com.google.common.base.Strings.isNullOrEmpty;
 import static org.onlab.util.Tools.get;
-import static org.opencord.olt.impl.OsgiPropertyConstants.*;
-import static org.slf4j.LoggerFactory.getLogger;
+import static org.onosproject.net.flow.instructions.Instruction.Type.L2MODIFICATION;
+import static org.opencord.olt.impl.OltUtils.completeFlowOpToString;
+import static org.opencord.olt.impl.OltUtils.flowOpToString;
+import static org.opencord.olt.impl.OltUtils.getPortName;
+import static org.opencord.olt.impl.OltUtils.portWithName;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_TP_ID;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_TP_ID_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DOWNSTREAM_OLT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DOWNSTREAM_ONU;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_ON_NNI;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_ON_NNI_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_V4;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_V4_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_V6;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_V6_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_EAPOL;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_EAPOL_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_IGMP_ON_NNI;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_IGMP_ON_NNI_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_PPPOE;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_PPPOE_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.UPSTREAM_OLT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.UPSTREAM_ONU;
+import static org.opencord.olt.impl.OsgiPropertyConstants.WAIT_FOR_REMOVAL;
+import static org.opencord.olt.impl.OsgiPropertyConstants.WAIT_FOR_REMOVAL_DEFAULT;
 
-/**
- * Provisions flow rules on access devices.
- */
 @Component(immediate = true, property = {
         ENABLE_DHCP_ON_NNI + ":Boolean=" + ENABLE_DHCP_ON_NNI_DEFAULT,
         ENABLE_DHCP_V4 + ":Boolean=" + ENABLE_DHCP_V4_DEFAULT,
@@ -94,32 +127,20 @@
         ENABLE_IGMP_ON_NNI + ":Boolean=" + ENABLE_IGMP_ON_NNI_DEFAULT,
         ENABLE_EAPOL + ":Boolean=" + ENABLE_EAPOL_DEFAULT,
         ENABLE_PPPOE + ":Boolean=" + ENABLE_PPPOE_DEFAULT,
-        DEFAULT_TP_ID + ":Integer=" + DEFAULT_TP_ID_DEFAULT
+        DEFAULT_TP_ID + ":Integer=" + DEFAULT_TP_ID_DEFAULT,
+        // FIXME remove this option as potentially dangerous in production
+        WAIT_FOR_REMOVAL + ":Boolean=" + WAIT_FOR_REMOVAL_DEFAULT
 })
-public class OltFlowService implements AccessDeviceFlowService {
-    private static final String SADIS_NOT_RUNNING = "Sadis is not running.";
-    private static final String APP_NAME = "org.opencord.olt";
-    private static final int NONE_TP_ID = -1;
-    private static final int NO_PCP = -1;
-    private static final Integer MAX_PRIORITY = 10000;
-    private static final Integer MIN_PRIORITY = 1000;
-    private static final String INSTALLED = "installed";
-    private static final String REMOVED = "removed";
-    private static final String INSTALLATION = "installation";
-    private static final String REMOVAL = "removal";
-    private static final String V4 = "V4";
-    private static final String V6 = "V6";
-
-    private final Logger log = getLogger(getClass());
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected FlowObjectiveService flowObjectiveService;
+public class OltFlowService implements OltFlowServiceInterface {
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected CoreService coreService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected MastershipService mastershipService;
+    protected ComponentConfigService cfgService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected FlowObjectiveService flowObjectiveService;
 
     @Reference(cardinality = ReferenceCardinality.OPTIONAL,
             bind = "bindSadisService",
@@ -128,17 +149,65 @@
     protected volatile SadisService sadisService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected OltMeterServiceInterface oltMeterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected OltDeviceServiceInterface oltDeviceService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected FlowRuleService flowRuleService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected HostService hostService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected DeviceService deviceService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected AccessDeviceMeterService oltMeterService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected ComponentConfigService componentConfigService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected StorageService storageService;
 
+    protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
+    protected BaseInformationService<BandwidthProfileInformation> bpService;
+
+    private static final String APP_NAME = "org.opencord.olt";
+    protected ApplicationId appId;
+    private static final Integer MAX_PRIORITY = 10000;
+    private static final Integer MIN_PRIORITY = 1000;
+    private static final short EAPOL_DEFAULT_VLAN = 4091;
+    private static final int NONE_TP_ID = -1;
+    private static final String V4 = "V4";
+    private static final String V6 = "V6";
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    protected UniTagInformation defaultEapolUniTag = new UniTagInformation.Builder()
+            .setServiceName("defaultEapol").build();
+    protected UniTagInformation nniUniTag = new UniTagInformation.Builder()
+            .setServiceName("nni")
+            .setTechnologyProfileId(NONE_TP_ID)
+            .setPonCTag(VlanId.NONE)
+            .setUniTagMatch(VlanId.ANY)
+            .setUsPonCTagPriority(-1)
+            .build();
+
+    /**
+     * Connect Point status map.
+     * Used to keep track of which cp has flows that needs to be removed when the status changes.
+     */
+    protected Map<ServiceKey, OltPortStatus> cpStatus;
+    private final ReentrantReadWriteLock cpStatusLock = new ReentrantReadWriteLock();
+    private final Lock cpStatusWriteLock = cpStatusLock.writeLock();
+    private final Lock cpStatusReadLock = cpStatusLock.readLock();
+
+    /**
+     * This map contains the subscriber that have been provisioned by the operator.
+     * They may or may not have flows, depending on the port status.
+     * The map is used to define whether flows need to be provisioned when a port comes up.
+     */
+    protected Map<ServiceKey, Boolean> provisionedSubscribers;
+    private final ReentrantReadWriteLock provisionedSubscribersLock = new ReentrantReadWriteLock();
+    private final Lock provisionedSubscribersWriteLock = provisionedSubscribersLock.writeLock();
+    private final Lock provisionedSubscribersReadLock = provisionedSubscribersLock.readLock();
+
     /**
      * Create DHCP trap flow on NNI port(s).
      */
@@ -174,42 +243,76 @@
      **/
     protected int defaultTechProfileId = DEFAULT_TP_ID_DEFAULT;
 
-    protected ApplicationId appId;
-    protected BaseInformationService<BandwidthProfileInformation> bpService;
-    protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
-    protected Map<DeviceId, BlockingQueue<SubscriberFlowInfo>> pendingEapolForDevice;
+    protected boolean waitForRemoval = WAIT_FOR_REMOVAL_DEFAULT;
+
+    public enum FlowOperation {
+        ADD,
+        REMOVE;
+
+
+        @Override
+        public String toString() {
+            return super.toString().toLowerCase();
+        }
+    }
+
+    public enum FlowDirection {
+        UPSTREAM,
+        DOWNSTREAM,
+    }
+
+    public enum OltFlowsStatus {
+        NONE,
+        PENDING_ADD,
+        ADDED,
+        PENDING_REMOVE,
+        REMOVED,
+        ERROR
+    }
+
+    protected InternalFlowListener internalFlowListener;
 
     @Activate
     public void activate(ComponentContext context) {
-        if (sadisService != null) {
-            bpService = sadisService.getBandwidthProfileService();
-            subsService = sadisService.getSubscriberInfoService();
-        } else {
-            log.warn(SADIS_NOT_RUNNING);
-        }
-        componentConfigService.registerProperties(getClass());
-        appId = coreService.getAppId(APP_NAME);
+        cfgService.registerProperties(getClass());
+        appId = coreService.registerApplication(APP_NAME);
+        internalFlowListener = new InternalFlowListener();
+
+        modified(context);
+
         KryoNamespace serializer = KryoNamespace.newBuilder()
                 .register(KryoNamespaces.API)
-                .register(UniTagInformation.class)
-                .register(SubscriberFlowInfo.class)
+                .register(OltFlowsStatus.class)
+                .register(FlowDirection.class)
+                .register(OltPortStatus.class)
+                .register(OltFlowsStatus.class)
                 .register(AccessDevicePort.class)
-                .register(AccessDevicePort.Type.class)
-                .register(LinkedBlockingQueue.class)
+                .register(new ServiceKeySerializer(), ServiceKey.class)
+                .register(UniTagInformation.class)
                 .build();
-        pendingEapolForDevice = storageService.<DeviceId, BlockingQueue<SubscriberFlowInfo>>consistentMapBuilder()
-                .withName("volt-pending-eapol")
-                .withSerializer(Serializer.using(serializer))
-                .withApplicationId(appId)
-                .build().asJavaMap();
-        log.info("started");
-    }
 
+        cpStatus = storageService.<ServiceKey, OltPortStatus>consistentMapBuilder()
+                .withName("volt-cp-status")
+                .withApplicationId(appId)
+                .withSerializer(Serializer.using(serializer))
+                .build().asJavaMap();
+
+        provisionedSubscribers = storageService.<ServiceKey, Boolean>consistentMapBuilder()
+                .withName("volt-provisioned-subscriber")
+                .withApplicationId(appId)
+                .withSerializer(Serializer.using(serializer))
+                .build().asJavaMap();
+
+        flowRuleService.addListener(internalFlowListener);
+
+        log.info("Started");
+    }
 
     @Deactivate
     public void deactivate(ComponentContext context) {
-        componentConfigService.unregisterProperties(getClass(), false);
-        log.info("stopped");
+        cfgService.unregisterProperties(getClass(), false);
+        flowRuleService.removeListener(internalFlowListener);
+        log.info("Stopped");
     }
 
     @Modified
@@ -247,106 +350,712 @@
             enablePppoe = pppoe;
         }
 
+        Boolean wait = Tools.isPropertyEnabled(properties, WAIT_FOR_REMOVAL);
+        if (wait != null) {
+            waitForRemoval = wait;
+        }
+
         String tpId = get(properties, DEFAULT_TP_ID);
         defaultTechProfileId = isNullOrEmpty(tpId) ? DEFAULT_TP_ID_DEFAULT : Integer.parseInt(tpId.trim());
 
-        log.info("modified. Values = enableDhcpOnNni: {}, enableDhcpV4: {}, " +
-                         "enableDhcpV6:{}, enableIgmpOnNni:{}, " +
-                         "enableEapol:{}, enablePppoe:{}, defaultTechProfileId:{}",
-                 enableDhcpOnNni, enableDhcpV4, enableDhcpV6,
-                 enableIgmpOnNni, enableEapol,  enablePppoe,
-                 defaultTechProfileId);
+        log.info("Modified. Values = enableDhcpOnNni: {}, enableDhcpV4: {}, " +
+                        "enableDhcpV6:{}, enableIgmpOnNni:{}, " +
+                        "enableEapol:{}, enablePppoe:{}, defaultTechProfileId:{}," +
+                        "waitForRemoval:{}",
+                enableDhcpOnNni, enableDhcpV4, enableDhcpV6,
+                enableIgmpOnNni, enableEapol, enablePppoe,
+                defaultTechProfileId, waitForRemoval);
 
     }
 
-    protected void bindSadisService(SadisService service) {
-        sadisService = service;
-        bpService = sadisService.getBandwidthProfileService();
-        subsService = sadisService.getSubscriberInfoService();
-        log.info("Sadis-service binds to onos.");
-    }
-
-    protected void unbindSadisService(SadisService service) {
-        sadisService = null;
-        bpService = null;
-        subsService = null;
-        log.info("Sadis-service unbinds from onos.");
-    }
-
     @Override
-    public void processDhcpFilteringObjectives(AccessDevicePort port,
-                                               MeterId upstreamMeterId,
-                                               MeterId upstreamOltMeterId,
-                                               UniTagInformation tagInformation,
-                                               boolean install,
-                                               boolean upstream,
-                                               Optional<CompletableFuture<ObjectiveError>> dhcpFuture) {
-        if (upstream) {
-            // for UNI ports
-            if (tagInformation != null && !tagInformation.getIsDhcpRequired()) {
-                log.debug("Dhcp provisioning is disabled for UNI port {} for service {}",
-                        port, tagInformation.getServiceName());
-                dhcpFuture.ifPresent(f -> f.complete(null));
-                return;
-            }
-        } else {
-            // for NNI ports
-            if (!enableDhcpOnNni) {
-                log.debug("Dhcp provisioning is disabled for NNI port {}", port);
-                dhcpFuture.ifPresent(f -> f.complete(null));
-                return;
-            }
-        }
-        int techProfileId = tagInformation != null ? tagInformation.getTechnologyProfileId() : NONE_TP_ID;
-        VlanId cTag = tagInformation != null ? tagInformation.getPonCTag() : VlanId.NONE;
-        VlanId unitagMatch = tagInformation != null ? tagInformation.getUniTagMatch() : VlanId.ANY;
-        Byte vlanPcp = tagInformation != null && tagInformation.getUsPonCTagPriority() != NO_PCP
-                ? (byte) tagInformation.getUsPonCTagPriority() : null;
-
-
-        if (enableDhcpV4) {
-            int udpSrc = (upstream) ? 68 : 67;
-            int udpDst = (upstream) ? 67 : 68;
-
-            EthType ethType = EthType.EtherType.IPV4.ethType();
-            byte protocol = IPv4.PROTOCOL_UDP;
-
-            addDhcpFilteringObjectives(port, udpSrc, udpDst, ethType,
-                    upstreamMeterId, upstreamOltMeterId, techProfileId, protocol, cTag, unitagMatch,
-                    vlanPcp, upstream, install, dhcpFuture);
-        }
-
-        if (enableDhcpV6) {
-            int udpSrc = (upstream) ? 547 : 546;
-            int udpDst = (upstream) ? 546 : 547;
-
-            EthType ethType = EthType.EtherType.IPV6.ethType();
-            byte protocol = IPv6.PROTOCOL_UDP;
-
-            addDhcpFilteringObjectives(port, udpSrc, udpDst, ethType,
-                    upstreamMeterId, upstreamOltMeterId, techProfileId, protocol, cTag, unitagMatch,
-                    vlanPcp, upstream, install, dhcpFuture);
+    public ImmutableMap<ServiceKey, OltPortStatus> getConnectPointStatus() {
+        try {
+            cpStatusReadLock.lock();
+            return ImmutableMap.copyOf(cpStatus);
+        } finally {
+            cpStatusReadLock.unlock();
         }
     }
 
-    private void addDhcpFilteringObjectives(AccessDevicePort port, int udpSrc, int udpDst,
-                                            EthType ethType, MeterId upstreamMeterId, MeterId upstreamOltMeterId,
-                                            int techProfileId, byte protocol, VlanId cTag, VlanId unitagMatch,
-                                            Byte vlanPcp, boolean upstream,
-                                            boolean install, Optional<CompletableFuture<ObjectiveError>> dhcpFuture) {
+    @Override
+    public ImmutableMap<ServiceKey, UniTagInformation> getProgrammedSubscribers() {
+        // NOTE we might want to remove this conversion and directly use cpStatus as it contains
+        // all the required information about suscribers
+        Map<ServiceKey, UniTagInformation> subscribers =
+                new HashMap<>();
+        try {
+            cpStatusReadLock.lock();
+
+            cpStatus.forEach((sk, status) -> {
+                if (
+                    // not NNI Port
+                        !oltDeviceService.isNniPort(deviceService.getDevice(sk.getPort().connectPoint().deviceId()),
+                                sk.getPort().connectPoint().port()) &&
+                                // not EAPOL flow
+                                !sk.getService().equals(defaultEapolUniTag)
+                ) {
+                    subscribers.put(sk, sk.getService());
+                }
+            });
+
+            return ImmutableMap.copyOf(subscribers);
+        } finally {
+            cpStatusReadLock.unlock();
+        }
+    }
+
+    @Override
+    public Map<ServiceKey, Boolean> getRequestedSubscribers() {
+        try {
+            provisionedSubscribersReadLock.lock();
+            return ImmutableMap.copyOf(provisionedSubscribers);
+        } finally {
+            provisionedSubscribersReadLock.unlock();
+        }
+    }
+
+    @Override
+    public void handleNniFlows(Device device, Port port, FlowOperation action) {
+
+        // always handle the LLDP flow
+        processLldpFilteringObjective(device.id(), port, action);
+
+        if (enableDhcpOnNni) {
+            if (enableDhcpV4) {
+                log.debug("Installing DHCPv4 trap flow on NNI {} for device {}", portWithName(port), device.id());
+                processDhcpFilteringObjectives(device.id(), port, action, FlowDirection.DOWNSTREAM,
+                        67, 68, EthType.EtherType.IPV4.ethType(), IPv4.PROTOCOL_UDP,
+                        null, null, nniUniTag);
+            }
+            if (enableDhcpV6) {
+                log.debug("Installing DHCPv6 trap flow on NNI {} for device {}", portWithName(port), device.id());
+                processDhcpFilteringObjectives(device.id(), port, action, FlowDirection.DOWNSTREAM,
+                        546, 547, EthType.EtherType.IPV6.ethType(), IPv6.PROTOCOL_UDP,
+                        null, null, nniUniTag);
+            }
+        } else {
+            log.info("DHCP is not required on NNI {} for device {}", portWithName(port), device.id());
+        }
+
+        if (enableIgmpOnNni) {
+            log.debug("Installing IGMP flow on NNI {} for device {}", portWithName(port), device.id());
+            processIgmpFilteringObjectives(device.id(), port, action, FlowDirection.DOWNSTREAM,
+                    null, null, NONE_TP_ID, VlanId.NONE, VlanId.ANY, -1);
+        }
+
+        if (enablePppoe) {
+            log.debug("Installing PPPoE flow on NNI {} for device {}", port.number(), device.id());
+            processPPPoEDFilteringObjectives(device.id(), port, action, FlowDirection.DOWNSTREAM,
+                    null, null, NONE_TP_ID, VlanId.NONE, VlanId.ANY, null);
+        }
+    }
+
+    @Override
+    public boolean handleBasicPortFlows(DiscoveredSubscriber sub, String bandwidthProfileId,
+                                        String oltBandwidthProfileId) {
+
+        // we only need to something if EAPOL is enabled
+        if (!enableEapol) {
+            return true;
+        }
+
+        if (sub.status == DiscoveredSubscriber.Status.ADDED) {
+            return addDefaultFlows(sub, bandwidthProfileId, oltBandwidthProfileId);
+        } else if (sub.status == DiscoveredSubscriber.Status.REMOVED) {
+            return removeDefaultFlows(sub, bandwidthProfileId, oltBandwidthProfileId);
+        } else {
+            log.error("Unknown Status {} on DiscoveredSubscriber {}", sub.status, sub);
+            return false;
+        }
+
+    }
+
+    private boolean addDefaultFlows(DiscoveredSubscriber sub, String bandwidthProfileId, String oltBandwidthProfileId) {
+
+        if (!oltMeterService.createMeter(sub.device.id(), bandwidthProfileId)) {
+            if (log.isTraceEnabled()) {
+                log.trace("waiting on meter for bp {} and sub {}", bandwidthProfileId, sub);
+            }
+            return false;
+        }
+        if (hasDefaultEapol(sub.port)) {
+            return true;
+        }
+        return handleEapolFlow(sub, bandwidthProfileId,
+                oltBandwidthProfileId, FlowOperation.ADD, VlanId.vlanId(EAPOL_DEFAULT_VLAN));
+
+    }
+
+    private boolean removeDefaultFlows(DiscoveredSubscriber sub, String bandwidthProfile, String oltBandwidthProfile) {
+        // NOTE that we are not checking for meters as they must have been created to install the flow in first place
+        return handleEapolFlow(sub, bandwidthProfile, oltBandwidthProfile,
+                FlowOperation.REMOVE, VlanId.vlanId(EAPOL_DEFAULT_VLAN));
+    }
+
+    @Override
+    public boolean handleSubscriberFlows(DiscoveredSubscriber sub, String defaultBandwidthProfile,
+                                         String multicastServiceName) {
+        // NOTE that we are taking defaultBandwithProfile as a parameter as that can be configured in the Olt component
+        if (sub.status == DiscoveredSubscriber.Status.ADDED) {
+            return addSubscriberFlows(sub, defaultBandwidthProfile, multicastServiceName);
+        } else if (sub.status == DiscoveredSubscriber.Status.REMOVED) {
+            return removeSubscriberFlows(sub, defaultBandwidthProfile, multicastServiceName);
+        } else {
+            log.error("don't know how to handle {}", sub);
+            return false;
+        }
+    }
+
+    private boolean addSubscriberFlows(DiscoveredSubscriber sub, String defaultBandwithProfile,
+                                       String multicastServiceName) {
+        if (log.isTraceEnabled()) {
+            log.trace("Provisioning of subscriber on {} started", portWithName(sub.port));
+        }
+        if (enableEapol) {
+            if (hasDefaultEapol(sub.port)) {
+                // remove EAPOL flow and throw exception so that we'll retry later
+                if (!isDefaultEapolPendingRemoval(sub.port)) {
+                    removeDefaultFlows(sub, defaultBandwithProfile, defaultBandwithProfile);
+                }
+
+                if (waitForRemoval) {
+                    // NOTE wait for removal is a flag only needed to make sure VOLTHA
+                    // does not explode with the flows remove/add in the same batch
+                    log.debug("Awaiting for default flows removal for {}", portWithName(sub.port));
+                    return false;
+                } else {
+                    log.warn("continuing provisioning on {}", portWithName(sub.port));
+                }
+            }
+
+        }
+
+        // NOTE createMeters will return if the meters are not installed
+        if (!oltMeterService.createMeters(sub.device.id(),
+                sub.subscriberAndDeviceInformation)) {
+            return false;
+        }
+
+        // NOTE we need to add the DHCP flow regardless so that the host can be discovered and the MacAddress added
+        handleSubscriberDhcpFlows(sub.device.id(), sub.port, FlowOperation.ADD,
+                sub.subscriberAndDeviceInformation);
+
+        if (isMacLearningEnabled(sub.subscriberAndDeviceInformation)
+                && !isMacAddressAvailable(sub.device.id(), sub.port,
+                sub.subscriberAndDeviceInformation)) {
+            log.debug("Awaiting for macAddress on {}", portWithName(sub.port));
+            return false;
+        }
+
+        handleSubscriberDataFlows(sub.device, sub.port, FlowOperation.ADD,
+                sub.subscriberAndDeviceInformation, multicastServiceName);
+
+        handleSubscriberEapolFlows(sub, FlowOperation.ADD, sub.subscriberAndDeviceInformation);
+
+        handleSubscriberIgmpFlows(sub, FlowOperation.ADD);
+
+        log.info("Provisioning of subscriber on {} completed", portWithName(sub.port));
+        return true;
+    }
+
+    private boolean removeSubscriberFlows(DiscoveredSubscriber sub, String defaultBandwithProfile,
+                                          String multicastServiceName) {
+
+        if (log.isTraceEnabled()) {
+            log.trace("Removal of subscriber on {} started",
+                    portWithName(sub.port));
+        }
+        SubscriberAndDeviceInformation si = subsService.get(sub.portName());
+        if (si == null) {
+            log.error("Subscriber information not found in sadis for port {} during subscriber removal",
+                    portWithName(sub.port));
+            // NOTE that we are returning true so that the subscriber is removed from the queue
+            // and we can move on provisioning others
+            return true;
+        }
+
+        handleSubscriberDhcpFlows(sub.device.id(), sub.port, FlowOperation.REMOVE, si);
+
+        if (enableEapol) {
+            // remove the tagged eapol
+            handleSubscriberEapolFlows(sub, FlowOperation.REMOVE, si);
+
+            // and add the default one back
+            if (sub.port.isEnabled()) {
+                // NOTE we remove the subscriber when the port goes down
+                // but in that case we don't need to add default eapol
+                handleEapolFlow(sub, defaultBandwithProfile, defaultBandwithProfile,
+                        FlowOperation.ADD, VlanId.vlanId(EAPOL_DEFAULT_VLAN));
+            }
+        }
+        handleSubscriberDataFlows(sub.device, sub.port, FlowOperation.REMOVE, si, multicastServiceName);
+
+        handleSubscriberIgmpFlows(sub, FlowOperation.REMOVE);
+
+        // FIXME check the return status of the flow and return accordingly
+        log.info("Removal of subscriber on {} completed", portWithName(sub.port));
+        return true;
+    }
+
+    @Override
+    public boolean hasDefaultEapol(Port port) {
+        OltPortStatus status = getOltPortStatus(port, defaultEapolUniTag);
+        // NOTE we consider ERROR as a present EAPOL flow as ONOS
+        // will keep trying to add it
+        return status != null && (status.defaultEapolStatus == OltFlowsStatus.ADDED ||
+                status.defaultEapolStatus == OltFlowsStatus.PENDING_ADD ||
+                status.defaultEapolStatus == OltFlowsStatus.ERROR);
+    }
+
+    private OltPortStatus getOltPortStatus(Port port, UniTagInformation uniTagInformation) {
+        try {
+            cpStatusReadLock.lock();
+            ServiceKey sk = new ServiceKey(new AccessDevicePort(port), uniTagInformation);
+            OltPortStatus status = cpStatus.get(sk);
+            return status;
+        } finally {
+            cpStatusReadLock.unlock();
+        }
+    }
+
+    public boolean isDefaultEapolPendingRemoval(Port port) {
+        OltPortStatus status = getOltPortStatus(port, defaultEapolUniTag);
+        if (log.isTraceEnabled()) {
+            log.trace("Status during EAPOL flow check {} for port {} and UniTagInformation {}",
+                    status, portWithName(port), defaultEapolUniTag);
+        }
+        return status != null && status.defaultEapolStatus == OltFlowsStatus.PENDING_REMOVE;
+    }
+
+    @Override
+    public boolean hasDhcpFlows(Port port, UniTagInformation uti) {
+        OltPortStatus status = getOltPortStatus(port, uti);
+        if (log.isTraceEnabled()) {
+            log.trace("Status during DHCP flow check {} for port {} and service {}",
+                    status, portWithName(port), uti.getServiceName());
+        }
+        return status != null &&
+                (status.dhcpStatus == OltFlowsStatus.ADDED ||
+                        status.dhcpStatus == OltFlowsStatus.PENDING_ADD);
+    }
+
+    @Override
+    public boolean hasSubscriberFlows(Port port, UniTagInformation uti) {
+
+        OltPortStatus status = getOltPortStatus(port, uti);
+        if (log.isTraceEnabled()) {
+            log.trace("Status during subscriber flow check {} for port {} and service {}",
+                    status, portWithName(port), uti.getServiceName());
+        }
+        return status != null && (status.subscriberFlowsStatus == OltFlowsStatus.ADDED ||
+                status.subscriberFlowsStatus == OltFlowsStatus.PENDING_ADD);
+    }
+
+    @Override
+    public void purgeDeviceFlows(DeviceId deviceId) {
+        log.debug("Purging flows on device {}", deviceId);
+        flowRuleService.purgeFlowRules(deviceId);
+
+        // removing the status from the cpStatus map
+        try {
+            cpStatusWriteLock.lock();
+            Iterator<Map.Entry<ServiceKey, OltPortStatus>> iter = cpStatus.entrySet().iterator();
+            while (iter.hasNext()) {
+                Map.Entry<ServiceKey, OltPortStatus> entry = iter.next();
+                if (entry.getKey().getPort().connectPoint().deviceId().equals(deviceId)) {
+                    cpStatus.remove(entry.getKey());
+                }
+            }
+        } finally {
+            cpStatusWriteLock.unlock();
+        }
+
+        // removing subscribers from the provisioned map
+        try {
+            provisionedSubscribersWriteLock.lock();
+            Iterator<Map.Entry<ServiceKey, Boolean>> iter = provisionedSubscribers.entrySet().iterator();
+            while (iter.hasNext()) {
+                Map.Entry<ServiceKey, Boolean> entry = iter.next();
+                if (entry.getKey().getPort().connectPoint().deviceId().equals(deviceId)) {
+                    provisionedSubscribers.remove(entry.getKey());
+                }
+            }
+        } finally {
+            provisionedSubscribersWriteLock.unlock();
+        }
+    }
+
+    @Override
+    public boolean isSubscriberServiceProvisioned(AccessDevicePort cp) {
+        // if any service is programmed on this port, returns true
+        AtomicBoolean provisioned = new AtomicBoolean(false);
+        try {
+            provisionedSubscribersReadLock.lock();
+            for (Map.Entry<ServiceKey, Boolean> entry : provisionedSubscribers.entrySet()) {
+                if (entry.getKey().getPort().equals(cp) && entry.getValue()) {
+                    provisioned.set(true);
+                    break;
+                }
+            }
+        } finally {
+            provisionedSubscribersReadLock.unlock();
+        }
+        return provisioned.get();
+    }
+
+    @Override
+    public boolean isSubscriberServiceProvisioned(ServiceKey sk) {
+        try {
+            provisionedSubscribersReadLock.lock();
+            Boolean provisioned = provisionedSubscribers.get(sk);
+            if (provisioned == null || !provisioned) {
+                return false;
+            }
+        } finally {
+            provisionedSubscribersReadLock.unlock();
+        }
+        return true;
+    }
+
+    @Override
+    public void updateProvisionedSubscriberStatus(ServiceKey sk, Boolean status) {
+        try {
+            provisionedSubscribersWriteLock.lock();
+            provisionedSubscribers.put(sk, status);
+        } finally {
+            provisionedSubscribersWriteLock.unlock();
+        }
+    }
+
+    private boolean handleEapolFlow(DiscoveredSubscriber sub, String bandwidthProfile,
+                                    String oltBandwidthProfile, FlowOperation action, VlanId vlanId) {
+
+        // create a subscriberKey for the EAPOL flow
+        ServiceKey sk = new ServiceKey(new AccessDevicePort(sub.port), defaultEapolUniTag);
+
+        // NOTE we only need to keep track of the default EAPOL flow in the
+        // connectpoint status map
+        if (vlanId.id().equals(EAPOL_DEFAULT_VLAN)) {
+            OltFlowsStatus status = action == FlowOperation.ADD ?
+                    OltFlowsStatus.PENDING_ADD : OltFlowsStatus.PENDING_REMOVE;
+            updateConnectPointStatus(sk, status, OltFlowsStatus.NONE, OltFlowsStatus.NONE);
+
+        }
+
+        DefaultFilteringObjective.Builder filterBuilder = DefaultFilteringObjective.builder();
+        TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
+
+        int techProfileId = getDefaultTechProfileId(sub.port);
+        MeterId meterId = oltMeterService.getMeterIdForBandwidthProfile(sub.device.id(), bandwidthProfile);
+
+        // in the delete case the meter should still be there as we remove
+        // the meters only if no flows are pointing to them
+        if (meterId == null) {
+            log.debug("MeterId is null for BandwidthProfile {} on device {}",
+                    bandwidthProfile, sub.device.id());
+            return false;
+        }
+
+        MeterId oltMeterId = oltMeterService.getMeterIdForBandwidthProfile(sub.device.id(), oltBandwidthProfile);
+        if (oltMeterId == null) {
+            log.debug("MeterId is null for OltBandwidthProfile {} on device {}",
+                    oltBandwidthProfile, sub.device.id());
+            return false;
+        }
+
+        log.info("{} EAPOL flow for {} with vlanId {} and BandwidthProfile {} (meterId {})",
+                flowOpToString(action), portWithName(sub.port), vlanId, bandwidthProfile, meterId);
+
+        FilteringObjective.Builder eapolAction;
+
+        if (action == FlowOperation.ADD) {
+            eapolAction = filterBuilder.permit();
+        } else if (action == FlowOperation.REMOVE) {
+            eapolAction = filterBuilder.deny();
+        } else {
+            log.error("Operation {} not supported", action);
+            return false;
+        }
+
+        FilteringObjective.Builder baseEapol = eapolAction
+                .withKey(Criteria.matchInPort(sub.port.number()))
+                .addCondition(Criteria.matchEthType(EthType.EtherType.EAPOL.ethType()));
+
+        // NOTE we only need to add the treatment to install the flow,
+        // we can remove it based in the match
+        FilteringObjective.Builder eapol;
+
+        TrafficTreatment treatment = treatmentBuilder
+                .meter(meterId)
+                .writeMetadata(createTechProfValueForWriteMetadata(
+                        vlanId,
+                        techProfileId, oltMeterId), 0)
+                .setOutput(PortNumber.CONTROLLER)
+                .pushVlan()
+                .setVlanId(vlanId)
+                .build();
+        eapol = baseEapol
+                .withMeta(treatment);
+
+        FilteringObjective eapolObjective = eapol
+                .fromApp(appId)
+                .withPriority(MAX_PRIORITY)
+                .add(new ObjectiveContext() {
+                    @Override
+                    public void onSuccess(Objective objective) {
+                        log.info("EAPOL flow objective {} for {}",
+                                completeFlowOpToString(action), portWithName(sub.port));
+                        if (log.isTraceEnabled()) {
+                            log.trace("EAPOL flow details for port {}: {}", portWithName(sub.port), objective);
+                        }
+                    }
+
+                    @Override
+                    public void onError(Objective objective, ObjectiveError error) {
+                        log.error("Cannot {} eapol flow for {} : {}", action,
+                                portWithName(sub.port), error);
+
+                        if (vlanId.id().equals(EAPOL_DEFAULT_VLAN)) {
+                            updateConnectPointStatus(sk,
+                                    OltFlowsStatus.ERROR, null, null);
+                        }
+                    }
+                });
+
+        flowObjectiveService.filter(sub.device.id(), eapolObjective);
+
+        log.info("{} EAPOL filter for {}", completeFlowOpToString(action), portWithName(sub.port));
+        return true;
+    }
+
+    // FIXME it's confusing that si is not necessarily inside the DiscoveredSubscriber
+    private boolean handleSubscriberEapolFlows(DiscoveredSubscriber sub, FlowOperation action,
+                                               SubscriberAndDeviceInformation si) {
+        if (!enableEapol) {
+            return true;
+        }
+        // TODO verify we need an EAPOL flow for EACH service
+        AtomicBoolean success = new AtomicBoolean(true);
+        si.uniTagList().forEach(u -> {
+            // we assume that if subscriber flows are installed, tagged EAPOL is installed as well
+            boolean hasFlows = hasSubscriberFlows(sub.port, u);
+
+            // if the subscriber flows are present the EAPOL flow is present thus we don't need to install it,
+            // if the subscriber does not have flows the EAPOL flow is not present thus we don't need to remove it
+            if (action == FlowOperation.ADD && hasFlows ||
+                    action == FlowOperation.REMOVE && !hasFlows) {
+                log.debug("Not {} EAPOL on {}({}) as subscriber flow status is {}", flowOpToString(action),
+                        portWithName(sub.port), u.getServiceName(), hasFlows);
+                return;
+            }
+            log.info("{} EAPOL flows for subscriber {} on {} and service {}",
+                    flowOpToString(action), si.id(), portWithName(sub.port), u.getServiceName());
+            if (!handleEapolFlow(sub, u.getUpstreamBandwidthProfile(),
+                    u.getUpstreamOltBandwidthProfile(),
+                    action, u.getPonCTag())) {
+                //
+                log.error("Failed to {} EAPOL with suscriber tags", action);
+                //TODO this sets it for all services, maybe some services succeeded.
+                success.set(false);
+            }
+        });
+        return success.get();
+    }
+
+    private void handleSubscriberIgmpFlows(DiscoveredSubscriber sub, FlowOperation action) {
+        sub.subscriberAndDeviceInformation.uniTagList().forEach(uti -> {
+            if (uti.getIsIgmpRequired()) {
+                DeviceId deviceId = sub.device.id();
+                // if we reached here a meter already exists
+                MeterId meterId = oltMeterService
+                        .getMeterIdForBandwidthProfile(deviceId, uti.getUpstreamBandwidthProfile());
+                MeterId oltMeterId = oltMeterService
+                        .getMeterIdForBandwidthProfile(deviceId, uti.getUpstreamOltBandwidthProfile());
+
+                processIgmpFilteringObjectives(deviceId, sub.port,
+                        action, FlowDirection.UPSTREAM, meterId, oltMeterId, uti.getTechnologyProfileId(),
+                        uti.getPonCTag(), uti.getUniTagMatch(), uti.getUsPonCTagPriority());
+            }
+        });
+    }
+
+    private boolean checkSadisRunning() {
+        if (bpService == null) {
+            log.warn("Sadis is not running");
+            return false;
+        }
+        return true;
+    }
+
+    private int getDefaultTechProfileId(Port port) {
+        if (!checkSadisRunning()) {
+            return defaultTechProfileId;
+        }
+        if (port != null) {
+            SubscriberAndDeviceInformation info = subsService.get(getPortName(port));
+            if (info != null && info.uniTagList().size() == 1) {
+                return info.uniTagList().get(0).getTechnologyProfileId();
+            }
+        }
+        return defaultTechProfileId;
+    }
+
+    protected Long createTechProfValueForWriteMetadata(VlanId cVlan, int techProfileId, MeterId upstreamOltMeterId) {
+        Long writeMetadata;
+
+        if (cVlan == null || VlanId.NONE.equals(cVlan)) {
+            writeMetadata = (long) techProfileId << 32;
+        } else {
+            writeMetadata = ((long) (cVlan.id()) << 48 | (long) techProfileId << 32);
+        }
+        if (upstreamOltMeterId == null) {
+            return writeMetadata;
+        } else {
+            return writeMetadata | upstreamOltMeterId.id();
+        }
+    }
+
+    private void processLldpFilteringObjective(DeviceId deviceId, Port port, FlowOperation action) {
+        DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
+
+        FilteringObjective lldp = (action == FlowOperation.ADD ? builder.permit() : builder.deny())
+                .withKey(Criteria.matchInPort(port.number()))
+                .addCondition(Criteria.matchEthType(EthType.EtherType.LLDP.ethType()))
+                .withMeta(DefaultTrafficTreatment.builder()
+                        .setOutput(PortNumber.CONTROLLER).build())
+                .fromApp(appId)
+                .withPriority(MAX_PRIORITY)
+                .add(new ObjectiveContext() {
+                    @Override
+                    public void onSuccess(Objective objective) {
+                        log.info("{} LLDP filter for {}.", completeFlowOpToString(action), portWithName(port));
+                    }
+
+                    @Override
+                    public void onError(Objective objective, ObjectiveError error) {
+                        log.error("Falied to {} LLDP filter on {} because {}", action, portWithName(port),
+                                error);
+                    }
+                });
+
+        flowObjectiveService.filter(deviceId, lldp);
+    }
+
+    protected void handleSubscriberDhcpFlows(DeviceId deviceId, Port port,
+                                             FlowOperation action,
+                                             SubscriberAndDeviceInformation si) {
+        si.uniTagList().forEach(uti -> {
+
+            if (!uti.getIsDhcpRequired()) {
+                return;
+            }
+
+            // if it's an ADD skip if flows are there,
+            // if it's a DELETE skip if flows are not there
+            boolean hasFlows = hasDhcpFlows(port, uti);
+            if (action == FlowOperation.ADD && hasFlows ||
+                    action == FlowOperation.REMOVE && !hasFlows) {
+                log.debug("Not dealing with DHCP {} on {} as DHCP flow status is {}", action,
+                        uti.getServiceName(), hasFlows);
+                return;
+            }
+
+            log.info("{} DHCP flows for subscriber on {} and service {}",
+                    flowOpToString(action), portWithName(port), uti.getServiceName());
+
+            // if we reached here a meter already exists
+            MeterId meterId = oltMeterService
+                    .getMeterIdForBandwidthProfile(deviceId, uti.getUpstreamBandwidthProfile());
+            MeterId oltMeterId = oltMeterService
+                    .getMeterIdForBandwidthProfile(deviceId, uti.getUpstreamOltBandwidthProfile());
+
+            if (enableDhcpV4) {
+                processDhcpFilteringObjectives(deviceId, port, action, FlowDirection.UPSTREAM, 68, 67,
+                        EthType.EtherType.IPV4.ethType(), IPv4.PROTOCOL_UDP, meterId, oltMeterId,
+                        uti);
+            }
+            if (enableDhcpV6) {
+                log.error("DHCP V6 not supported for subscribers");
+            }
+        });
+    }
+
+    // FIXME return boolean, if this fails we need to retry
+    protected void handleSubscriberDataFlows(Device device, Port port,
+                                             FlowOperation action,
+                                             SubscriberAndDeviceInformation si, String multicastServiceName) {
+
+        Optional<Port> nniPort = oltDeviceService.getNniPort(device);
+        if (nniPort.isEmpty()) {
+            log.error("Cannot configure DP flows as upstream port is not configured for subscriber {} on {}",
+                    si.id(), portWithName(port));
+            return;
+        }
+        si.uniTagList().forEach(uti -> {
+
+            boolean hasFlows = hasSubscriberFlows(port, uti);
+            if (action == FlowOperation.ADD && hasFlows ||
+                    action == FlowOperation.REMOVE && !hasFlows) {
+                log.debug("Not dealing with DP flows {} on {} as subscriber flow status is {}", action,
+                        uti.getServiceName(), hasFlows);
+                return;
+            }
+
+            if (multicastServiceName.equals(uti.getServiceName())) {
+                log.debug("This is the multicast service ({}) for subscriber {} on {}, " +
+                                "dataplane flows are not needed",
+                        uti.getServiceName(), si.id(), portWithName(port));
+                return;
+            }
+
+            log.info("{} Data plane flows for subscriber {} on {} and service {}",
+                    flowOpToString(action), si.id(), portWithName(port), uti.getServiceName());
+
+            // upstream flows
+            MeterId usMeterId = oltMeterService
+                    .getMeterIdForBandwidthProfile(device.id(), uti.getUpstreamBandwidthProfile());
+            MeterId oltUsMeterId = oltMeterService
+                    .getMeterIdForBandwidthProfile(device.id(), uti.getUpstreamOltBandwidthProfile());
+            processUpstreamDataFilteringObjects(device.id(), port, nniPort.get(), action, usMeterId,
+                    oltUsMeterId, uti);
+
+            // downstream flows
+            MeterId dsMeterId = oltMeterService
+                    .getMeterIdForBandwidthProfile(device.id(), uti.getDownstreamBandwidthProfile());
+            MeterId oltDsMeterId = oltMeterService
+                    .getMeterIdForBandwidthProfile(device.id(), uti.getDownstreamOltBandwidthProfile());
+            processDownstreamDataFilteringObjects(device.id(), port, nniPort.get(), action, dsMeterId,
+                    oltDsMeterId, uti, getMacAddress(device.id(), port, uti));
+        });
+    }
+
+    private void processDhcpFilteringObjectives(DeviceId deviceId, Port port,
+                                                FlowOperation action, FlowDirection direction,
+                                                int udpSrc, int udpDst, EthType ethType, byte protocol,
+                                                MeterId meterId, MeterId oltMeterId, UniTagInformation uti) {
+        ServiceKey sk = new ServiceKey(new AccessDevicePort(port), uti);
+        log.debug("{} DHCP filtering objectives on {}", flowOpToString(action), sk);
+
+
+        OltFlowsStatus status = action.equals(FlowOperation.ADD) ?
+                OltFlowsStatus.PENDING_ADD : OltFlowsStatus.PENDING_REMOVE;
+        updateConnectPointStatus(sk, null, null, status);
 
         DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
         TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
 
-        if (upstreamMeterId != null) {
-            treatmentBuilder.meter(upstreamMeterId);
+        if (meterId != null) {
+            treatmentBuilder.meter(meterId);
         }
 
-        if (techProfileId != NONE_TP_ID) {
-            treatmentBuilder.writeMetadata(createTechProfValueForWm(unitagMatch, techProfileId, upstreamOltMeterId), 0);
+        if (uti.getTechnologyProfileId() != NONE_TP_ID) {
+            treatmentBuilder.writeMetadata(
+                    createTechProfValueForWriteMetadata(uti.getUniTagMatch(),
+                            uti.getTechnologyProfileId(), oltMeterId), 0);
         }
 
-        FilteringObjective.Builder dhcpUpstreamBuilder = (install ? builder.permit() : builder.deny())
+        FilteringObjective.Builder dhcpBuilder = (action == FlowOperation.ADD ? builder.permit() : builder.deny())
                 .withKey(Criteria.matchInPort(port.number()))
                 .addCondition(Criteria.matchEthType(ethType))
                 .addCondition(Criteria.matchIPProtocol(protocol))
@@ -356,87 +1065,124 @@
                 .withPriority(MAX_PRIORITY);
 
         //VLAN changes and PCP matching need to happen only in the upstream directions
-        if (upstream) {
-            treatmentBuilder.setVlanId(cTag);
-            if (!VlanId.vlanId(VlanId.NO_VID).equals(unitagMatch)) {
-                dhcpUpstreamBuilder.addCondition(Criteria.matchVlanId(unitagMatch));
+        if (direction == FlowDirection.UPSTREAM) {
+            treatmentBuilder.setVlanId(uti.getPonCTag());
+            if (!VlanId.vlanId(VlanId.NO_VID).equals(uti.getUniTagMatch())) {
+                dhcpBuilder.addCondition(Criteria.matchVlanId(uti.getUniTagMatch()));
             }
-            if (vlanPcp != null) {
-                treatmentBuilder.setVlanPcp(vlanPcp);
+            if (uti.getUsPonCTagPriority() != -1) {
+                treatmentBuilder.setVlanPcp((byte) uti.getUsPonCTagPriority());
             }
         }
 
-        dhcpUpstreamBuilder.withMeta(treatmentBuilder
-                                  .setOutput(PortNumber.CONTROLLER).build());
+        dhcpBuilder.withMeta(treatmentBuilder
+                .setOutput(PortNumber.CONTROLLER).build());
 
 
-        FilteringObjective dhcpUpstream = dhcpUpstreamBuilder.add(new ObjectiveContext() {
+        FilteringObjective dhcpUpstream = dhcpBuilder.add(new ObjectiveContext() {
             @Override
             public void onSuccess(Objective objective) {
-                log.info("DHCP {} filter for {} {}.",
-                        (ethType.equals(EthType.EtherType.IPV4.ethType())) ? V4 : V6, port,
-                        (install) ? INSTALLED : REMOVED);
-                dhcpFuture.ifPresent(f -> f.complete(null));
+                log.info("{} DHCP {} filter for {}.",
+                        completeFlowOpToString(action), (ethType.equals(EthType.EtherType.IPV4.ethType())) ? V4 : V6,
+                        portWithName(port));
             }
 
             @Override
             public void onError(Objective objective, ObjectiveError error) {
                 log.error("DHCP {} filter for {} failed {} because {}",
-                        (ethType.equals(EthType.EtherType.IPV4.ethType())) ? V4 : V6, port,
-                        (install) ? INSTALLATION : REMOVAL,
+                        (ethType.equals(EthType.EtherType.IPV4.ethType())) ? V4 : V6,
+                        portWithName(port),
+                        action,
                         error);
-                dhcpFuture.ifPresent(f -> f.complete(error));
+                updateConnectPointStatus(sk, null, null, OltFlowsStatus.ERROR);
             }
         });
-        flowObjectiveService.filter(port.deviceId(), dhcpUpstream);
+        flowObjectiveService.filter(deviceId, dhcpUpstream);
+    }
+
+    private void processIgmpFilteringObjectives(DeviceId deviceId, Port port,
+                                                FlowOperation action, FlowDirection direction,
+                                                MeterId meterId, MeterId oltMeterId, int techProfileId,
+                                                VlanId cTag, VlanId unitagMatch, int vlanPcp) {
+
+        DefaultFilteringObjective.Builder filterBuilder = DefaultFilteringObjective.builder();
+        TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
+        if (direction == FlowDirection.UPSTREAM) {
+
+            if (techProfileId != NONE_TP_ID) {
+                treatmentBuilder.writeMetadata(createTechProfValueForWriteMetadata(null,
+                        techProfileId, oltMeterId), 0);
+            }
+
+
+            if (meterId != null) {
+                treatmentBuilder.meter(meterId);
+            }
+
+            if (!VlanId.vlanId(VlanId.NO_VID).equals(unitagMatch)) {
+                filterBuilder.addCondition(Criteria.matchVlanId(unitagMatch));
+            }
+
+            if (!VlanId.vlanId(VlanId.NO_VID).equals(cTag)) {
+                treatmentBuilder.setVlanId(cTag);
+            }
+
+            if (vlanPcp != -1) {
+                treatmentBuilder.setVlanPcp((byte) vlanPcp);
+            }
+        }
+
+        filterBuilder = (action == FlowOperation.ADD) ? filterBuilder.permit() : filterBuilder.deny();
+
+        FilteringObjective igmp = filterBuilder
+                .withKey(Criteria.matchInPort(port.number()))
+                .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
+                .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_IGMP))
+                .withMeta(treatmentBuilder
+                        .setOutput(PortNumber.CONTROLLER).build())
+                .fromApp(appId)
+                .withPriority(MAX_PRIORITY)
+                .add(new ObjectiveContext() {
+                    @Override
+                    public void onSuccess(Objective objective) {
+                        log.info("Igmp filter for {} {}.", portWithName(port), action);
+                    }
+
+                    @Override
+                    public void onError(Objective objective, ObjectiveError error) {
+                        log.error("Igmp filter for {} failed {} because {}.", portWithName(port), action,
+                                error);
+                    }
+                });
+
+        flowObjectiveService.filter(deviceId, igmp);
 
     }
 
-    @Override
-    public void processPPPoEDFilteringObjectives(AccessDevicePort port,
-                                                 MeterId upstreamMeterId,
-                                                 MeterId upstreamOltMeterId,
-                                                 UniTagInformation tagInformation,
-                                                 boolean install,
-                                                 boolean upstream) {
-        if (!enablePppoe) {
-            log.debug("PPPoED filtering is disabled. Ignoring request.");
-            return;
-        }
-
-        int techProfileId = NONE_TP_ID;
-        VlanId cTag = VlanId.NONE;
-        VlanId unitagMatch = VlanId.ANY;
-        Byte vlanPcp = null;
-
-        if (tagInformation != null) {
-            techProfileId = tagInformation.getTechnologyProfileId();
-            cTag = tagInformation.getPonCTag();
-            unitagMatch = tagInformation.getUniTagMatch();
-            if (tagInformation.getUsPonCTagPriority() != NO_PCP) {
-                vlanPcp = (byte) tagInformation.getUsPonCTagPriority();
-            }
-        }
+    private void processPPPoEDFilteringObjectives(DeviceId deviceId, Port port,
+                                                  FlowOperation action, FlowDirection direction,
+                                                  MeterId meterId, MeterId oltMeterId, int techProfileId,
+                                                  VlanId cTag, VlanId unitagMatch, Byte vlanPcp) {
 
         DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
         TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
-        CompletableFuture<Object> meterFuture = new CompletableFuture<>();
 
-        if (upstreamMeterId != null) {
-            treatmentBuilder.meter(upstreamMeterId);
+        if (meterId != null) {
+            treatmentBuilder.meter(meterId);
         }
 
         if (techProfileId != NONE_TP_ID) {
-            treatmentBuilder.writeMetadata(createTechProfValueForWm(cTag, techProfileId, upstreamOltMeterId), 0);
+            treatmentBuilder.writeMetadata(createTechProfValueForWriteMetadata(cTag, techProfileId, oltMeterId), 0);
         }
 
-        DefaultFilteringObjective.Builder pppoedBuilder = (install ? builder.permit() : builder.deny())
+        DefaultFilteringObjective.Builder pppoedBuilder = ((action == FlowOperation.ADD)
+                ? builder.permit() : builder.deny())
                 .withKey(Criteria.matchInPort(port.number()))
                 .addCondition(Criteria.matchEthType(EthType.EtherType.PPPoED.ethType()))
                 .fromApp(appId)
                 .withPriority(10000);
 
-        if (upstream) {
+        if (direction == FlowDirection.UPSTREAM) {
             treatmentBuilder.setVlanId(cTag);
             if (!VlanId.vlanId(VlanId.NO_VID).equals(unitagMatch)) {
                 pppoedBuilder.addCondition(Criteria.matchVlanId(unitagMatch));
@@ -451,398 +1197,50 @@
                 .add(new ObjectiveContext() {
                     @Override
                     public void onSuccess(Objective objective) {
-                        log.info("PPPoED filter for {} {}.", port, (install) ? INSTALLED : REMOVED);
+                        log.info("PPPoED filter for {} {}.", portWithName(port), action);
                     }
 
                     @Override
                     public void onError(Objective objective, ObjectiveError error) {
-                        log.info("PPPoED filter for {} failed {} because {}", port,
-                                (install) ? INSTALLATION : REMOVAL, error);
+                        log.info("PPPoED filter for {} failed {} because {}", portWithName(port),
+                                action, error);
                     }
                 });
-        flowObjectiveService.filter(port.deviceId(), pppoed);
+        flowObjectiveService.filter(deviceId, pppoed);
     }
 
-    @Override
-    public void processIgmpFilteringObjectives(AccessDevicePort port,
-                                               MeterId upstreamMeterId,
-                                               MeterId upstreamOltMeterId,
-                                               UniTagInformation tagInformation,
-                                               boolean install,
-                                               boolean upstream) {
-        if (upstream) {
-            // for UNI ports
-            if (tagInformation != null && !tagInformation.getIsIgmpRequired()) {
-                log.debug("Igmp provisioning is disabled for UNI port {} for service {}",
-                        port, tagInformation.getServiceName());
-                return;
-            }
-        } else {
-            // for NNI ports
-            if (!enableIgmpOnNni) {
-                log.debug("Igmp provisioning is disabled for NNI port {}", port);
-                return;
-            }
-        }
-
-        log.debug("{} IGMP flows on {}", (install) ? "Installing" : "Removing", port);
-        DefaultFilteringObjective.Builder filterBuilder = DefaultFilteringObjective.builder();
-        TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
-        if (upstream) {
-
-            if (tagInformation.getTechnologyProfileId() != NONE_TP_ID) {
-                treatmentBuilder.writeMetadata(createTechProfValueForWm(null,
-                        tagInformation.getTechnologyProfileId(), upstreamOltMeterId), 0);
-            }
-
-
-            if (upstreamMeterId != null) {
-                treatmentBuilder.meter(upstreamMeterId);
-            }
-
-            if (!VlanId.vlanId(VlanId.NO_VID).equals(tagInformation.getUniTagMatch())) {
-                filterBuilder.addCondition(Criteria.matchVlanId(tagInformation.getUniTagMatch()));
-            }
-
-            if (!VlanId.vlanId(VlanId.NO_VID).equals(tagInformation.getPonCTag())) {
-                treatmentBuilder.setVlanId(tagInformation.getPonCTag());
-            }
-
-            if (tagInformation.getUsPonCTagPriority() != NO_PCP) {
-                treatmentBuilder.setVlanPcp((byte) tagInformation.getUsPonCTagPriority());
-            }
-        }
-
-        filterBuilder = install ? filterBuilder.permit() : filterBuilder.deny();
-
-        FilteringObjective igmp = filterBuilder
-                .withKey(Criteria.matchInPort(port.number()))
-                .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
-                .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_IGMP))
-                .withMeta(treatmentBuilder
-                        .setOutput(PortNumber.CONTROLLER).build())
-                .fromApp(appId)
-                .withPriority(MAX_PRIORITY)
-                .add(new ObjectiveContext() {
-                    @Override
-                    public void onSuccess(Objective objective) {
-                        log.info("Igmp filter for {} {}.", port, (install) ? INSTALLED : REMOVED);
-                    }
-
-                    @Override
-                    public void onError(Objective objective, ObjectiveError error) {
-                        log.error("Igmp filter for {} failed {} because {}.", port, (install) ? INSTALLATION : REMOVAL,
-                                error);
-                    }
-                });
-
-        flowObjectiveService.filter(port.deviceId(), igmp);
-    }
-
-    @Override
-    public void processEapolFilteringObjectives(AccessDevicePort port, String bpId, Optional<String> oltBpId,
-                                                CompletableFuture<ObjectiveError> filterFuture,
-                                                VlanId vlanId, boolean install) {
-
-        if (!enableEapol) {
-            log.debug("Eapol filtering is disabled. Completing filteringFuture immediately for the device {}",
-                    port.deviceId());
-            if (filterFuture != null) {
-                filterFuture.complete(null);
-            }
-            return;
-        }
-        log.info("Processing EAPOL with Bandwidth profile {} on {}", bpId, port);
-        BandwidthProfileInformation bpInfo = getBandwidthProfileInformation(bpId);
-        BandwidthProfileInformation oltBpInfo;
-        if (oltBpId.isPresent()) {
-            oltBpInfo = getBandwidthProfileInformation(oltBpId.get());
-        } else {
-            oltBpInfo = bpInfo;
-        }
-        if (bpInfo == null) {
-            log.warn("Bandwidth profile {} is not found. Authentication flow"
-                    + " will not be installed on {}", bpId, port);
-            if (filterFuture != null) {
-                filterFuture.complete(ObjectiveError.BADPARAMS);
-            }
-            return;
-        }
-
-        ConnectPoint cp = new ConnectPoint(port.deviceId(), port.number());
-        DefaultFilteringObjective.Builder filterBuilder = DefaultFilteringObjective.builder();
-        TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
-        // check if meter exists and create it only for an install
-        final MeterId meterId = oltMeterService.getMeterIdFromBpMapping(port.deviceId(), bpInfo.id());
-        MeterId oltMeterId = null;
-        if (oltBpId.isPresent()) {
-            oltMeterId = oltBpId.map(id -> oltMeterService.getMeterIdFromBpMapping(port.deviceId(), id)).orElse(null);
-        }
-        log.info("Meter id {} for Bandwidth profile {} and OLT meter id {} for OLT Bandwidth profile {} " +
-                        "associated to EAPOL on {}", meterId, bpInfo.id(), oltMeterId, oltBpId, port.deviceId());
-        if (meterId == null || (oltBpId.isPresent() && oltMeterId == null)) {
-            if (install) {
-                log.debug("Need to install meter for EAPOL with bwp {} on {}", bpInfo.id(), port);
-                SubscriberFlowInfo fi = new SubscriberFlowInfo(null, port,
-                                                            new UniTagInformation.Builder()
-                                                                    .setPonCTag(vlanId).build(),
-                                                        null, meterId, null, oltMeterId,
-                                                    null, bpInfo.id(), null, oltBpInfo.id());
-                pendingEapolForDevice.compute(port.deviceId(), (id, queue) -> {
-                    if (queue == null) {
-                        queue = new LinkedBlockingQueue<>();
-                    }
-                    queue.add(fi);
-                    return queue;
-                });
-
-                //If false the meter is already being installed, skipping installation
-                if (!oltMeterService.checkAndAddPendingMeter(port.deviceId(), bpInfo) &&
-                        !oltMeterService.checkAndAddPendingMeter(port.deviceId(), oltBpInfo)) {
-                    return;
-                }
-                List<BandwidthProfileInformation> bwpList = Arrays.asList(bpInfo, oltBpInfo);
-                bwpList.stream().distinct().filter(Objects::nonNull)
-                        .forEach(bwp -> createMeterAndProceedEapol(port, bwp, filterFuture, install,
-                        cp, filterBuilder, treatmentBuilder));
-            } else {
-                // this case should not happen as the request to remove an eapol
-                // flow should mean that the flow points to a meter that exists.
-                // Nevertheless we can still delete the flow as we only need the
-                // correct 'match' to do so.
-                log.warn("Unknown meter id for bp {}, still proceeding with "
-                        + "delete of eapol flow on {}", bpInfo.id(), port);
-                SubscriberFlowInfo fi = new SubscriberFlowInfo(null, port,
-                        new UniTagInformation.Builder().setPonCTag(vlanId).build(),
-                        null, meterId, null, oltMeterId, null,
-                        bpInfo.id(), null, oltBpInfo.id());
-                handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi, meterId, oltMeterId);
-            }
-        } else {
-            log.debug("Meter {} was previously created for bp {} on {}", meterId, bpInfo.id(), port);
-            SubscriberFlowInfo fi = new SubscriberFlowInfo(null, port,
-                    new UniTagInformation.Builder().setPonCTag(vlanId).build(),
-                    null, meterId, null, oltMeterId, null,
-                    bpInfo.id(), null, oltBpInfo.id());
-            handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi, meterId, oltMeterId);
-            //No need for the future, meter is present.
-            return;
-        }
-    }
-
-    private void createMeterAndProceedEapol(AccessDevicePort port, BandwidthProfileInformation bwpInfo,
-                                            CompletableFuture<ObjectiveError> filterFuture,
-                                            boolean install, ConnectPoint cp,
-                                            DefaultFilteringObjective.Builder filterBuilder,
-                                            TrafficTreatment.Builder treatmentBuilder) {
-        CompletableFuture<Object> meterFuture = new CompletableFuture<>();
-        MeterId meterId = oltMeterService.createMeter(port.deviceId(), bwpInfo, meterFuture);
-        DeviceId deviceId = port.deviceId();
-        meterFuture.thenAccept(result -> {
-            //for each pending eapol flow we check if the meter is there.
-            pendingEapolForDevice.compute(deviceId, (id, queue) -> {
-                if (queue != null && !queue.isEmpty()) {
-                    while (true) {
-                        //TODO this might return the reference and not the actual object
-                        // so it can be actually swapped underneath us.
-                        SubscriberFlowInfo fi = queue.peek();
-                        if (fi == null) {
-                            log.debug("No more subscribers eapol flows on {}", deviceId);
-                            queue = new LinkedBlockingQueue<>();
-                            break;
-                        }
-                        log.debug("handing pending eapol on {} for {}", fi.getUniPort(), fi);
-                        if (result == null) {
-                            MeterId mId = oltMeterService
-                                    .getMeterIdFromBpMapping(port.deviceId(), fi.getUpBpInfo());
-                            MeterId oltMeterId = oltMeterService
-                                    .getMeterIdFromBpMapping(port.deviceId(), fi.getUpOltBpInfo());
-                            if (mId != null && oltMeterId != null) {
-                                log.debug("Meter installation completed for subscriber on {}, " +
-                                                  "handling EAPOL trap flow", port);
-                                fi.setUpMeterId(mId);
-                                fi.setUpOltMeterId(oltMeterId);
-                                handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi,
-                                            mId, oltMeterId);
-                                queue.remove(fi);
-                            } else {
-                                log.debug("Not all meters for {} are yet installed for EAPOL meterID {}, " +
-                                                  "oltMeterId {}", fi, meterId, oltMeterId);
-                            }
-                        } else {
-                            log.warn("Meter installation error while sending EAPOL trap flow to {}. " +
-                                             "Result {} and MeterId {}", port, result, meterId);
-                            queue.remove(fi);
-                        }
-                        oltMeterService.removeFromPendingMeters(port.deviceId(), bwpInfo);
-                    }
-                } else {
-                    log.info("No pending EAPOLs on {}", port.deviceId());
-                    queue = new LinkedBlockingQueue<>();
-                }
-                return queue;
-            });
-        });
-    }
-
-    private void handleEapol(CompletableFuture<ObjectiveError> filterFuture,
-                             boolean install, ConnectPoint cp,
-                             DefaultFilteringObjective.Builder filterBuilder,
-                             TrafficTreatment.Builder treatmentBuilder,
-                             SubscriberFlowInfo fi, MeterId mId, MeterId oltMeterId) {
-        log.info("Meter {} for {} on {} exists. {} EAPOL trap flow",
-                 mId, fi.getUpBpInfo(), fi.getUniPort(),
-                 (install) ? "Installing" : "Removing");
-        int techProfileId = getDefaultTechProfileId(fi.getUniPort());
-        // can happen in case of removal
-        if (mId != null) {
-            treatmentBuilder.meter(mId);
-        }
-        //Authentication trap flow uses only tech profile id as write metadata value
-        FilteringObjective eapol = (install ? filterBuilder.permit() : filterBuilder.deny())
-                .withKey(Criteria.matchInPort(fi.getUniPort().number()))
-                .addCondition(Criteria.matchEthType(EthType.EtherType.EAPOL.ethType()))
-                .withMeta(treatmentBuilder
-                                  .writeMetadata(createTechProfValueForWm(
-                                          fi.getTagInfo().getPonCTag(),
-                                          techProfileId, oltMeterId), 0)
-                                  .setOutput(PortNumber.CONTROLLER)
-                                  .pushVlan()
-                                  .setVlanId(fi.getTagInfo().getPonCTag())
-                                  .build())
-                .fromApp(appId)
-                .withPriority(MAX_PRIORITY)
-                .add(new ObjectiveContext() {
-                    @Override
-                    public void onSuccess(Objective objective) {
-                        log.info("Eapol filter {} for {} on {} with meter {}.",
-                                 objective.id(), (install) ? INSTALLED : REMOVED, fi.getUniPort(), mId);
-                        if (filterFuture != null) {
-                            filterFuture.complete(null);
-                        }
-                    }
-
-                    @Override
-                    public void onError(Objective objective, ObjectiveError error) {
-                        log.error("Eapol filter {} for {} with meter {} " +
-                                         "failed {} because {}", objective.id(), fi.getUniPort(), mId,
-                                 (install) ? INSTALLATION : REMOVAL,
-                                 error);
-                        if (filterFuture != null) {
-                            filterFuture.complete(error);
-                        }
-                    }
-                });
-        flowObjectiveService.filter(fi.getDevId(), eapol);
-    }
-
-    /**
-     * Installs trap filtering objectives for particular traffic types on an
-     * NNI port.
-     *
-     * @param nniPort    NNI port
-     * @param install true to install, false to remove
-     */
-    @Override
-    public void processNniFilteringObjectives(AccessDevicePort nniPort, boolean install) {
-        log.info("{} flows for NNI port {}",
-                 install ? "Adding" : "Removing", nniPort);
-        processLldpFilteringObjective(nniPort, install);
-        processDhcpFilteringObjectives(nniPort, null, null, null, install, false, Optional.empty());
-        processIgmpFilteringObjectives(nniPort, null, null, null, install, false);
-        processPPPoEDFilteringObjectives(nniPort, null, null, null, install, false);
-    }
-
-
-    @Override
-    public void processLldpFilteringObjective(AccessDevicePort port, boolean install) {
-        DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
-
-        FilteringObjective lldp = (install ? builder.permit() : builder.deny())
-                .withKey(Criteria.matchInPort(port.number()))
-                .addCondition(Criteria.matchEthType(EthType.EtherType.LLDP.ethType()))
-                .withMeta(DefaultTrafficTreatment.builder()
-                        .setOutput(PortNumber.CONTROLLER).build())
-                .fromApp(appId)
-                .withPriority(MAX_PRIORITY)
-                .add(new ObjectiveContext() {
-                    @Override
-                    public void onSuccess(Objective objective) {
-                        log.info("LLDP filter for {} {}.", port, (install) ? INSTALLED : REMOVED);
-                    }
-
-                    @Override
-                    public void onError(Objective objective, ObjectiveError error) {
-                        log.error("LLDP filter for {} failed {} because {}", port, (install) ? INSTALLATION : REMOVAL,
-                                error);
-                    }
-                });
-
-        flowObjectiveService.filter(port.deviceId(), lldp);
-    }
-
-    @Override
-    public ForwardingObjective.Builder createTransparentBuilder(AccessDevicePort uplinkPort,
-                                                                AccessDevicePort subscriberPort,
-                                                                MeterId meterId,
-                                                                UniTagInformation tagInfo,
-                                                                boolean upstream) {
-
+    private void processUpstreamDataFilteringObjects(DeviceId deviceId, Port port, Port nniPort,
+                                                     FlowOperation action,
+                                                     MeterId upstreamMeterId,
+                                                     MeterId upstreamOltMeterId,
+                                                     UniTagInformation uti) {
+        ServiceKey sk = new ServiceKey(new AccessDevicePort(port), uti);
         TrafficSelector selector = DefaultTrafficSelector.builder()
-                .matchVlanId(tagInfo.getPonSTag())
-                .matchInPort(upstream ? subscriberPort.number() : uplinkPort.number())
-                .matchInnerVlanId(tagInfo.getPonCTag())
-                .build();
-
-        TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
-        if (meterId != null) {
-            tBuilder.meter(meterId);
-        }
-
-        TrafficTreatment treatment = tBuilder
-                .setOutput(upstream ? uplinkPort.number() : subscriberPort.number())
-                .writeMetadata(createMetadata(upstream ? tagInfo.getPonSTag() : tagInfo.getPonCTag(),
-                        tagInfo.getTechnologyProfileId(),
-                        upstream ? uplinkPort.number() : subscriberPort.number()), 0)
-                .build();
-
-        return createForwardingObjectiveBuilder(selector, treatment, MIN_PRIORITY,
-                DefaultAnnotations.builder().build());
-    }
-
-    @Override
-    public ForwardingObjective.Builder createUpBuilder(AccessDevicePort uplinkPort,
-                                                       AccessDevicePort subscriberPort,
-                                                       MeterId upstreamMeterId,
-                                                       MeterId upstreamOltMeterId,
-                                                       UniTagInformation uniTagInformation) {
-
-        TrafficSelector selector = DefaultTrafficSelector.builder()
-                .matchInPort(subscriberPort.number())
-                .matchVlanId(uniTagInformation.getUniTagMatch())
+                .matchInPort(port.number())
+                .matchVlanId(uti.getUniTagMatch())
                 .build();
 
         TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
         //if the subscriberVlan (cTag) is different than ANY it needs to set.
-        if (uniTagInformation.getPonCTag().toShort() != VlanId.ANY_VALUE) {
+        if (uti.getPonCTag().toShort() != VlanId.ANY_VALUE) {
             treatmentBuilder.pushVlan()
-                    .setVlanId(uniTagInformation.getPonCTag());
+                    .setVlanId(uti.getPonCTag());
         }
 
-        if (uniTagInformation.getUsPonCTagPriority() != NO_PCP) {
-            treatmentBuilder.setVlanPcp((byte) uniTagInformation.getUsPonCTagPriority());
+        if (uti.getUsPonCTagPriority() != -1) {
+            treatmentBuilder.setVlanPcp((byte) uti.getUsPonCTagPriority());
         }
 
         treatmentBuilder.pushVlan()
-                .setVlanId(uniTagInformation.getPonSTag());
+                .setVlanId(uti.getPonSTag());
 
-        if (uniTagInformation.getUsPonSTagPriority() != NO_PCP) {
-            treatmentBuilder.setVlanPcp((byte) uniTagInformation.getUsPonSTagPriority());
+        if (uti.getUsPonSTagPriority() != -1) {
+            treatmentBuilder.setVlanPcp((byte) uti.getUsPonSTagPriority());
         }
 
-        treatmentBuilder.setOutput(uplinkPort.number())
-                .writeMetadata(createMetadata(uniTagInformation.getPonCTag(),
-                        uniTagInformation.getTechnologyProfileId(), uplinkPort.number()), 0L);
+        treatmentBuilder.setOutput(nniPort.number())
+                .writeMetadata(createMetadata(uti.getPonCTag(),
+                        uti.getTechnologyProfileId(), nniPort.number()), 0L);
 
         DefaultAnnotations.Builder annotationBuilder = DefaultAnnotations.builder();
 
@@ -855,55 +1253,84 @@
             annotationBuilder.set(UPSTREAM_OLT, upstreamOltMeterId.toString());
         }
 
-        return createForwardingObjectiveBuilder(selector, treatmentBuilder.build(), MIN_PRIORITY,
+        DefaultForwardingObjective.Builder flowBuilder = createForwardingObjectiveBuilder(selector,
+                treatmentBuilder.build(), MIN_PRIORITY,
                 annotationBuilder.build());
+
+        ObjectiveContext context = new ObjectiveContext() {
+            @Override
+            public void onSuccess(Objective objective) {
+                log.info("{} Upstream Data plane filter for {}.",
+                        completeFlowOpToString(action), sk);
+            }
+
+            @Override
+            public void onError(Objective objective, ObjectiveError error) {
+                log.error("Upstream Data plane filter for {} failed {} because {}.",
+                        sk, action, error);
+                updateConnectPointStatus(sk, null, OltFlowsStatus.ERROR, null);
+            }
+        };
+
+        ForwardingObjective flow = null;
+        if (action == FlowOperation.ADD) {
+            flow = flowBuilder.add(context);
+        } else if (action == FlowOperation.REMOVE) {
+            flow = flowBuilder.remove(context);
+        } else {
+            log.error("Flow action not supported: {}", action);
+        }
+
+        if (flow != null) {
+            flowObjectiveService.forward(deviceId, flow);
+        }
     }
 
-    @Override
-    public ForwardingObjective.Builder createDownBuilder(AccessDevicePort uplinkPort,
-                                                         AccessDevicePort subscriberPort,
-                                                         MeterId downstreamMeterId,
-                                                         MeterId downstreamOltMeterId,
-                                                         UniTagInformation tagInformation,
-                                                         Optional<MacAddress> macAddress) {
-
+    private void processDownstreamDataFilteringObjects(DeviceId deviceId, Port port, Port nniPort,
+                                                       FlowOperation action,
+                                                       MeterId downstreamMeterId,
+                                                       MeterId downstreamOltMeterId,
+                                                       UniTagInformation uti,
+                                                       MacAddress macAddress) {
+        ServiceKey sk = new ServiceKey(new AccessDevicePort(port), uti);
         //subscriberVlan can be any valid Vlan here including ANY to make sure the packet is tagged
         TrafficSelector.Builder selectorBuilder = DefaultTrafficSelector.builder()
-                .matchVlanId(tagInformation.getPonSTag())
-                .matchInPort(uplinkPort.number())
-                .matchInnerVlanId(tagInformation.getPonCTag());
+                .matchVlanId(uti.getPonSTag())
+                .matchInPort(nniPort.number())
+                .matchInnerVlanId(uti.getPonCTag());
 
-
-        if (tagInformation.getPonCTag().toShort() != VlanId.ANY_VALUE) {
-            selectorBuilder.matchMetadata(tagInformation.getPonCTag().toShort());
+        if (uti.getPonCTag().toShort() != VlanId.ANY_VALUE) {
+            selectorBuilder.matchMetadata(uti.getPonCTag().toShort());
         }
 
-        if (tagInformation.getDsPonSTagPriority() != NO_PCP) {
-            selectorBuilder.matchVlanPcp((byte) tagInformation.getDsPonSTagPriority());
+        if (uti.getDsPonCTagPriority() != -1) {
+            selectorBuilder.matchVlanPcp((byte) uti.getDsPonCTagPriority());
         }
 
-        macAddress.ifPresent(selectorBuilder::matchEthDst);
+        if (macAddress != null) {
+            selectorBuilder.matchEthDst(macAddress);
+        }
 
         TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder()
                 .popVlan()
-                .setOutput(subscriberPort.number());
+                .setOutput(port.number());
 
-        treatmentBuilder.writeMetadata(createMetadata(tagInformation.getPonCTag(),
-                                                      tagInformation.getTechnologyProfileId(),
-                                                      subscriberPort.number()), 0);
+        treatmentBuilder.writeMetadata(createMetadata(uti.getPonCTag(),
+                uti.getTechnologyProfileId(),
+                port.number()), 0);
 
         // Upstream pbit is used to remark inner vlan pbit.
         // Upstream is used to avoid trusting the BNG to send the packet with correct pbit.
         // this is done because ds mode 0 is used because ds mode 3 or 6 that allow for
         // all pbit acceptance are not widely supported by vendors even though present in
         // the OMCI spec.
-        if (tagInformation.getUsPonCTagPriority() != NO_PCP) {
-            treatmentBuilder.setVlanPcp((byte) tagInformation.getUsPonCTagPriority());
+        if (uti.getUsPonCTagPriority() != -1) {
+            treatmentBuilder.setVlanPcp((byte) uti.getUsPonCTagPriority());
         }
 
-        if (!VlanId.NONE.equals(tagInformation.getUniTagMatch()) &&
-                tagInformation.getPonCTag().toShort() != VlanId.ANY_VALUE) {
-            treatmentBuilder.setVlanId(tagInformation.getUniTagMatch());
+        if (!VlanId.NONE.equals(uti.getUniTagMatch()) &&
+                uti.getPonCTag().toShort() != VlanId.ANY_VALUE) {
+            treatmentBuilder.setVlanId(uti.getUniTagMatch());
         }
 
         DefaultAnnotations.Builder annotationBuilder = DefaultAnnotations.builder();
@@ -918,13 +1345,39 @@
             annotationBuilder.set(DOWNSTREAM_OLT, downstreamOltMeterId.toString());
         }
 
-        return createForwardingObjectiveBuilder(selectorBuilder.build(), treatmentBuilder.build(), MIN_PRIORITY,
-                annotationBuilder.build());
-    }
+        DefaultForwardingObjective.Builder flowBuilder = createForwardingObjectiveBuilder(selectorBuilder.build(),
+                treatmentBuilder.build(), MIN_PRIORITY, annotationBuilder.build());
 
-    @Override
-    public void clearDeviceState(DeviceId deviceId) {
-        pendingEapolForDevice.remove(deviceId);
+        ObjectiveContext context = new ObjectiveContext() {
+            @Override
+            public void onSuccess(Objective objective) {
+                log.info("{} Downstream Data plane filter for {}.",
+                        completeFlowOpToString(action), sk);
+            }
+
+            @Override
+            public void onError(Objective objective, ObjectiveError error) {
+                log.info("Downstream Data plane filter for {} failed {} because {}.",
+                        sk, action, error);
+                updateConnectPointStatus(sk, null, OltFlowsStatus.ERROR, null);
+            }
+        };
+
+        ForwardingObjective flow = null;
+        if (action == FlowOperation.ADD) {
+            flow = flowBuilder.add(context);
+        } else if (action == FlowOperation.REMOVE) {
+            flow = flowBuilder.remove(context);
+        } else {
+            log.error("Flow action not supported: {}", action);
+        }
+
+        if (flow != null) {
+            if (log.isTraceEnabled()) {
+                log.trace("Forwarding rule {}", flow);
+            }
+            flowObjectiveService.forward(deviceId, flow);
+        }
     }
 
     private DefaultForwardingObjective.Builder createForwardingObjectiveBuilder(TrafficSelector selector,
@@ -941,70 +1394,6 @@
                 .withTreatment(treatment);
     }
 
-    /**
-     * Returns the write metadata value including tech profile reference and innerVlan.
-     * For param cVlan, null can be sent
-     *
-     * @param cVlan                 c (customer) tag of one subscriber
-     * @param techProfileId         tech profile id of one subscriber
-     * @param upstreamOltMeterId    upstream meter id for OLT device.
-     * @return the write metadata value including tech profile reference and innerVlan
-     */
-    private Long createTechProfValueForWm(VlanId cVlan, int techProfileId, MeterId upstreamOltMeterId) {
-        Long writeMetadata;
-
-        if (cVlan == null || VlanId.NONE.equals(cVlan)) {
-            writeMetadata = (long) techProfileId << 32;
-        } else {
-            writeMetadata = ((long) (cVlan.id()) << 48 | (long) techProfileId << 32);
-        }
-        if (upstreamOltMeterId == null) {
-            return writeMetadata;
-        } else {
-            return writeMetadata | upstreamOltMeterId.id();
-        }
-    }
-
-    private BandwidthProfileInformation getBandwidthProfileInformation(String bandwidthProfile) {
-        if (bpService == null) {
-            log.warn(SADIS_NOT_RUNNING);
-            return null;
-        }
-        if (bandwidthProfile == null) {
-            return null;
-        }
-        return bpService.get(bandwidthProfile);
-    }
-
-    /**
-     * It will be used to support AT&T use case (for EAPOL flows).
-     * If multiple services are found in uniServiceList, returns default tech profile id
-     * If one service is found, returns the found one
-     *
-     * @param port uni port
-     * @return the default technology profile id
-     */
-    private int getDefaultTechProfileId(AccessDevicePort port) {
-        if (subsService == null) {
-            log.warn(SADIS_NOT_RUNNING);
-            return defaultTechProfileId;
-        }
-        if (port != null) {
-            SubscriberAndDeviceInformation info = subsService.get(port.name());
-            if (info != null && info.uniTagList().size() == 1) {
-                return info.uniTagList().get(0).getTechnologyProfileId();
-            }
-        }
-        return defaultTechProfileId;
-    }
-
-    /**
-     * Write metadata instruction value (metadata) is 8 bytes.
-     * <p>
-     * MS 2 bytes: C Tag
-     * Next 2 bytes: Technology Profile Id
-     * Next 4 bytes: Port number (uni or nni)
-     */
     private Long createMetadata(VlanId innerVlan, int techProfileId, PortNumber egressPort) {
         if (techProfileId == NONE_TP_ID) {
             techProfileId = DEFAULT_TP_ID_DEFAULT;
@@ -1013,5 +1402,318 @@
         return ((long) (innerVlan.id()) << 48 | (long) techProfileId << 32) | egressPort.toLong();
     }
 
+    private boolean isMacLearningEnabled(SubscriberAndDeviceInformation si) {
+        AtomicBoolean requiresMacLearning = new AtomicBoolean();
+        requiresMacLearning.set(false);
 
+        si.uniTagList().forEach(uniTagInfo -> {
+            if (uniTagInfo.getEnableMacLearning()) {
+                requiresMacLearning.set(true);
+            }
+        });
+
+        return requiresMacLearning.get();
+    }
+
+    /**
+     * Checks whether the subscriber has the MacAddress configured or discovered.
+     *
+     * @param deviceId DeviceId for this subscriber
+     * @param port     Port for this subscriber
+     * @param si       SubscriberAndDeviceInformation
+     * @return boolean
+     */
+    protected boolean isMacAddressAvailable(DeviceId deviceId, Port port, SubscriberAndDeviceInformation si) {
+        AtomicBoolean isConfigured = new AtomicBoolean();
+        isConfigured.set(true);
+
+        si.uniTagList().forEach(uniTagInfo -> {
+            boolean isMacLearningEnabled = uniTagInfo.getEnableMacLearning();
+            boolean configureMac = isMacAddressValid(uniTagInfo);
+            boolean discoveredMac = false;
+            Optional<Host> optHost = hostService.getConnectedHosts(new ConnectPoint(deviceId, port.number()))
+                    .stream().filter(host -> host.vlan().equals(uniTagInfo.getPonCTag())).findFirst();
+            if (optHost.isPresent() && optHost.get().mac() != null) {
+                discoveredMac = true;
+            }
+            if (isMacLearningEnabled && !configureMac && !discoveredMac) {
+                log.debug("Awaiting for macAddress on {} for service {}",
+                        portWithName(port), uniTagInfo.getServiceName());
+                isConfigured.set(false);
+            }
+        });
+
+        return isConfigured.get();
+    }
+
+    protected MacAddress getMacAddress(DeviceId deviceId, Port port, UniTagInformation uniTagInfo) {
+        boolean configuredMac = isMacAddressValid(uniTagInfo);
+        if (configuredMac) {
+            return MacAddress.valueOf(uniTagInfo.getConfiguredMacAddress());
+        } else if (uniTagInfo.getEnableMacLearning()) {
+            Optional<Host> optHost = hostService.getConnectedHosts(new ConnectPoint(deviceId, port.number()))
+                    .stream().filter(host -> host.vlan().equals(uniTagInfo.getPonCTag())).findFirst();
+            if (optHost.isPresent() && optHost.get().mac() != null) {
+                return optHost.get().mac();
+            }
+        }
+        return null;
+    }
+
+    private boolean isMacAddressValid(UniTagInformation tagInformation) {
+        return tagInformation.getConfiguredMacAddress() != null &&
+                !tagInformation.getConfiguredMacAddress().trim().equals("") &&
+                !MacAddress.NONE.equals(MacAddress.valueOf(tagInformation.getConfiguredMacAddress()));
+    }
+
+    protected void updateConnectPointStatus(ServiceKey key, OltFlowsStatus eapolStatus,
+                                            OltFlowsStatus subscriberFlowsStatus, OltFlowsStatus dhcpStatus) {
+        try {
+            cpStatusWriteLock.lock();
+            OltPortStatus status = cpStatus.get(key);
+
+            if (status == null) {
+                status = new OltPortStatus(
+                        eapolStatus != null ? eapolStatus : OltFlowsStatus.NONE,
+                        subscriberFlowsStatus != null ? subscriberFlowsStatus : OltFlowsStatus.NONE,
+                        dhcpStatus != null ? dhcpStatus : OltFlowsStatus.NONE
+                );
+            } else {
+                if (eapolStatus != null) {
+                    status.defaultEapolStatus = eapolStatus;
+                }
+                if (subscriberFlowsStatus != null) {
+                    status.subscriberFlowsStatus = subscriberFlowsStatus;
+                }
+                if (dhcpStatus != null) {
+                    status.dhcpStatus = dhcpStatus;
+                }
+            }
+
+            cpStatus.put(key, status);
+        } finally {
+            cpStatusWriteLock.unlock();
+        }
+    }
+
+    protected class InternalFlowListener implements FlowRuleListener {
+        @Override
+        public void event(FlowRuleEvent event) {
+            if (appId.id() != (event.subject().appId())) {
+                return;
+            }
+
+            if (!oltDeviceService.isLocalLeader(event.subject().deviceId())) {
+                if (log.isTraceEnabled()) {
+                    log.trace("ignoring flow event {} " +
+                            "as not leader for {}", event, event.subject().deviceId());
+                }
+                return;
+            }
+
+            switch (event.type()) {
+                case RULE_ADDED:
+                case RULE_REMOVED:
+                    Port port = getCpFromFlowRule(event.subject());
+                    if (port == null) {
+                        log.error("Can't find port in flow {}", event.subject());
+                        return;
+                    }
+                    if (log.isTraceEnabled()) {
+                        log.trace("flow event {} on cp {}: {}", event.type(),
+                                portWithName(port), event.subject());
+                    }
+                    updateCpStatus(event.type(), port, event.subject());
+                    return;
+                case RULE_ADD_REQUESTED:
+                case RULE_REMOVE_REQUESTED:
+                    // NOTE that PENDING_ADD/REMOVE is set when we create the flowObjective
+                    return;
+                default:
+                    return;
+            }
+        }
+
+        protected void updateCpStatus(FlowRuleEvent.Type type, Port port, FlowRule flowRule) {
+            OltFlowsStatus status = flowRuleStatusToOltFlowStatus(type);
+            if (isDefaultEapolFlow(flowRule)) {
+                ServiceKey sk = new ServiceKey(new AccessDevicePort(port),
+                        defaultEapolUniTag);
+                if (log.isTraceEnabled()) {
+                    log.trace("update defaultEapolStatus {} on {}", status, sk);
+                }
+                updateConnectPointStatus(sk, status, null, null);
+            } else if (isDhcpFlow(flowRule)) {
+                ServiceKey sk = getSubscriberKeyFromFlowRule(flowRule);
+                if (sk == null) {
+                    return;
+                }
+                if (log.isTraceEnabled()) {
+                    log.trace("update dhcpStatus {} on {}", status, sk);
+                }
+                updateConnectPointStatus(sk, null, null, status);
+            } else if (isDataFlow(flowRule)) {
+
+                if (oltDeviceService.isNniPort(deviceService.getDevice(flowRule.deviceId()),
+                        getCpFromFlowRule(flowRule).number())) {
+                    // the NNI has data-plane for every subscriber, doesn't make sense to track them
+                    return;
+                }
+
+                ServiceKey sk = getSubscriberKeyFromFlowRule(flowRule);
+                if (sk == null) {
+                    return;
+                }
+                if (log.isTraceEnabled()) {
+                    log.trace("update dataplaneStatus {} on {}", status, sk);
+                }
+                updateConnectPointStatus(sk, null, status, null);
+            }
+        }
+
+        private boolean isDefaultEapolFlow(FlowRule flowRule) {
+            EthTypeCriterion c = (EthTypeCriterion) flowRule.selector().getCriterion(Criterion.Type.ETH_TYPE);
+            if (c == null) {
+                return false;
+            }
+            if (c.ethType().equals(EthType.EtherType.EAPOL.ethType())) {
+                AtomicBoolean isDefault = new AtomicBoolean(false);
+                flowRule.treatment().allInstructions().forEach(instruction -> {
+                    if (instruction.type() == L2MODIFICATION) {
+                        L2ModificationInstruction modificationInstruction = (L2ModificationInstruction) instruction;
+                        if (modificationInstruction.subtype() == L2ModificationInstruction.L2SubType.VLAN_ID) {
+                            L2ModificationInstruction.ModVlanIdInstruction vlanInstruction =
+                                    (L2ModificationInstruction.ModVlanIdInstruction) modificationInstruction;
+                            if (vlanInstruction.vlanId().id().equals(EAPOL_DEFAULT_VLAN)) {
+                                isDefault.set(true);
+                                return;
+                            }
+                        }
+                    }
+                });
+                return isDefault.get();
+            }
+            return false;
+        }
+
+        /**
+         * Returns true if the flow is a DHCP flow.
+         * Matches both upstream and downstream flows.
+         *
+         * @param flowRule The FlowRule to evaluate
+         * @return boolean
+         */
+        private boolean isDhcpFlow(FlowRule flowRule) {
+            IPProtocolCriterion ipCriterion = (IPProtocolCriterion) flowRule.selector()
+                    .getCriterion(Criterion.Type.IP_PROTO);
+            if (ipCriterion == null) {
+                return false;
+            }
+
+            UdpPortCriterion src = (UdpPortCriterion) flowRule.selector().getCriterion(Criterion.Type.UDP_SRC);
+
+            if (src == null) {
+                return false;
+            }
+            return ipCriterion.protocol() == IPv4.PROTOCOL_UDP &&
+                    (src.udpPort().toInt() == 68 || src.udpPort().toInt() == 67);
+        }
+
+        private boolean isDataFlow(FlowRule flowRule) {
+            // we consider subscriber flows the one that matches on VLAN_VID
+            // method is valid only because it's the last check after EAPOL and DHCP.
+            // this matches mcast flows as well, if we want to avoid that we can
+            // filter out the elements that have groups in the treatment or
+            // mcastIp in the selector
+            // IPV4_DST:224.0.0.22/32
+            // treatment=[immediate=[GROUP:0x1]]
+
+            return flowRule.selector().getCriterion(Criterion.Type.VLAN_VID) != null;
+        }
+
+        private Port getCpFromFlowRule(FlowRule flowRule) {
+            DeviceId deviceId = flowRule.deviceId();
+            PortCriterion inPort = (PortCriterion) flowRule.selector().getCriterion(Criterion.Type.IN_PORT);
+            if (inPort != null) {
+                PortNumber port = inPort.port();
+                return deviceService.getPort(deviceId, port);
+            }
+            return null;
+        }
+
+        private ServiceKey getSubscriberKeyFromFlowRule(FlowRule flowRule) {
+            Port flowPort = getCpFromFlowRule(flowRule);
+            SubscriberAndDeviceInformation si = subsService.get(getPortName(flowPort));
+
+            Boolean isNni = oltDeviceService.isNniPort(deviceService.getDevice(flowRule.deviceId()), flowPort.number());
+            if (si == null && !isNni) {
+                log.error("Subscriber information not found in sadis for port {}", portWithName(flowPort));
+                return null;
+            }
+
+            if (isNni) {
+                return new ServiceKey(new AccessDevicePort(flowPort), nniUniTag);
+            }
+
+            Optional<UniTagInformation> found = Optional.empty();
+            VlanId flowVlan = null;
+            if (isDhcpFlow(flowRule)) {
+                // we need to make a special case for DHCP as in the ATT workflow DHCP flows don't match on tags
+                L2ModificationInstruction.ModVlanIdInstruction instruction =
+                        (L2ModificationInstruction.ModVlanIdInstruction) flowRule.treatment().immediate().get(1);
+                flowVlan = instruction.vlanId();
+            } else {
+                // for now we assume that if it's not DHCP it's dataplane (or at least tagged)
+                VlanIdCriterion vlanIdCriterion =
+                        (VlanIdCriterion) flowRule.selector().getCriterion(Criterion.Type.VLAN_VID);
+                if (vlanIdCriterion == null) {
+                    log.warn("cannot match the flow to a subscriber service as it does not carry vlans");
+                    return null;
+                }
+                flowVlan = vlanIdCriterion.vlanId();
+            }
+
+            VlanId finalFlowVlan = flowVlan;
+            found = si.uniTagList().stream().filter(uti ->
+                    uti.getPonCTag().equals(finalFlowVlan) ||
+                            uti.getPonSTag().equals(finalFlowVlan) ||
+                            uti.getUniTagMatch().equals(finalFlowVlan)
+            ).findFirst();
+
+
+            if (found.isEmpty()) {
+                log.warn("Cannot map flow rule {} to Service in {}", flowRule, si);
+            }
+
+            return found.isPresent() ? new ServiceKey(new AccessDevicePort(flowPort), found.get()) : null;
+
+        }
+
+        private OltFlowsStatus flowRuleStatusToOltFlowStatus(FlowRuleEvent.Type type) {
+            switch (type) {
+                case RULE_ADD_REQUESTED:
+                    return OltFlowsStatus.PENDING_ADD;
+                case RULE_ADDED:
+                    return OltFlowsStatus.ADDED;
+                case RULE_REMOVE_REQUESTED:
+                    return OltFlowsStatus.PENDING_REMOVE;
+                case RULE_REMOVED:
+                    return OltFlowsStatus.REMOVED;
+                default:
+                    return OltFlowsStatus.NONE;
+            }
+        }
+    }
+
+    protected void bindSadisService(SadisService service) {
+        this.subsService = service.getSubscriberInfoService();
+        this.bpService = service.getBandwidthProfileService();
+        log.info("Sadis service is loaded");
+    }
+
+    protected void unbindSadisService(SadisService service) {
+        this.subsService = null;
+        this.bpService = null;
+        log.info("Sadis service is unloaded");
+    }
 }