[VOL-4246] Feature parity with the previous implementation

Change-Id: I3741edb3c1b88b1cf8b5e6d4ff0900132e2e5e6a
diff --git a/impl/src/main/java/org/opencord/olt/impl/AccessDevicePort.java b/impl/src/main/java/org/opencord/olt/impl/AccessDevicePort.java
new file mode 100644
index 0000000..b6d2213
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/impl/AccessDevicePort.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.olt.impl;
+
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.Port;
+
+import java.util.Objects;
+
+/**
+ * OLT device port.
+ */
+public class AccessDevicePort {
+
+    private ConnectPoint cp;
+    private String name;
+
+    /**
+     * Creates an AccessDevicePort with given ONOS port.
+     *
+     * @param port ONOS port
+     */
+    public AccessDevicePort(Port port) {
+        this.cp = ConnectPoint.deviceConnectPoint(port.element().id() + "/" + port.number().toLong());
+        this.name = OltUtils.getPortName(port);
+    }
+
+    /**
+     * Creates an AccessDevicePort with given ONOS connectPoint and name.
+     *
+     * @param cp ONOS connect point
+     * @param name OLT port name
+     */
+    public AccessDevicePort(ConnectPoint cp, String name) {
+        this.cp = cp;
+        this.name = name;
+    }
+
+    /**
+     * Get ONOS ConnectPoint object.
+     *
+     * @return ONOS connect point
+     */
+    public ConnectPoint connectPoint() {
+        return this.cp;
+    }
+
+    /**
+     * Get OLT port name which is combination of serial number and uni index.
+     *
+     * @return OLT port name (ex: BBSM00010001-1)
+     */
+    public String name() {
+        return this.name;
+    }
+
+    @Override
+    public String toString() {
+        return cp.toString() + '[' + name + ']';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        AccessDevicePort that = (AccessDevicePort) o;
+        return Objects.equals(cp, that.cp) &&
+                Objects.equals(name, that.name);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(cp, name);
+    }
+}
diff --git a/impl/src/main/java/org/opencord/olt/impl/ConsistentHasher.java b/impl/src/main/java/org/opencord/olt/impl/ConsistentHasher.java
deleted file mode 100644
index 52e9b96..0000000
--- a/impl/src/main/java/org/opencord/olt/impl/ConsistentHasher.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Copyright 2020-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.opencord.olt.impl;
-
-import com.google.common.hash.Hashing;
-import org.onosproject.cluster.NodeId;
-
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Returns a server hosting a given key based on consistent hashing.
- */
-public class ConsistentHasher {
-
-    private static class Entry implements Comparable<Entry> {
-        private final NodeId server;
-        private final int hash;
-
-        public Entry(NodeId server, int hash) {
-            this.server = server;
-            this.hash = hash;
-        }
-
-        public Entry(int hash) {
-            server = null;
-            this.hash = hash;
-        }
-
-        @Override
-        public int compareTo(Entry o) {
-            if (this.hash > o.hash) {
-                return 1;
-            } else if (this.hash < o.hash) {
-                return -1;
-            } // else
-            return 0;
-        }
-    }
-
-    private final int weight;
-
-    private List<Entry> table;
-
-    /**
-     * Creates a new hasher with the given list of servers.
-     *
-     * @param servers list of servers
-     * @param weight weight
-     */
-    public ConsistentHasher(List<NodeId> servers, int weight) {
-        this.weight = weight;
-
-        this.table = new ArrayList<>();
-
-        servers.forEach(this::addServer);
-    }
-
-    /**
-     * Adds a new server to the list of servers.
-     *
-     * @param server server ID
-     */
-    public synchronized void addServer(NodeId server) {
-        // Add weight number of buckets for each server
-        for (int i = 0; i < weight; i++) {
-            String label = server.toString() + i;
-            int hash = getHash(label);
-            Entry e = new Entry(server, hash);
-            table.add(e);
-        }
-
-        Collections.sort(table);
-    }
-
-    /**
-     * Removes a server from the list of servers.
-     *
-     * @param server server ID
-     */
-    public synchronized void removeServer(NodeId server) {
-        table.removeIf(e -> e.server.equals(server));
-    }
-
-    /**
-     * Hashes a given input string and finds that server that should
-     * handle the given ID.
-     *
-     * @param s input
-     * @return server ID
-     */
-    public synchronized NodeId hash(String s) {
-        Entry temp = new Entry(getHash(s));
-
-        int pos = Collections.binarySearch(this.table, temp);
-
-        // translate a negative not-found result into the closest following match
-        if (pos < 0) {
-            pos = Math.abs(pos + 1);
-        }
-
-        // wraparound if the hash was after the last entry in the table
-        if (pos == this.table.size()) {
-            pos = 0;
-        }
-
-        return table.get(pos).server;
-    }
-
-    private int getHash(String s) {
-        // stable, uniformly-distributed hash
-        return Hashing.murmur3_128().hashString(s, Charset.defaultCharset()).asInt();
-    }
-}
diff --git a/impl/src/main/java/org/opencord/olt/impl/DiscoveredSubscriber.java b/impl/src/main/java/org/opencord/olt/impl/DiscoveredSubscriber.java
new file mode 100644
index 0000000..4280eaf
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/impl/DiscoveredSubscriber.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.impl;
+
+import org.onosproject.net.Device;
+import org.onosproject.net.Port;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
+
+import java.util.Objects;
+
+import static org.opencord.olt.impl.OltUtils.portWithName;
+
+/**
+ * Contains a subscriber's information and status for a specific device and port.
+ */
+public class DiscoveredSubscriber {
+
+    /**
+     * Describe whether the subscriber needs to be added or removed.
+     */
+    public enum Status {
+        ADDED,
+        REMOVED,
+    }
+
+    public Port port;
+    public Device device;
+    public Enum<Status> status;
+    public boolean hasSubscriber;
+    public SubscriberAndDeviceInformation subscriberAndDeviceInformation;
+
+    /**
+     * Creates the class with the proper information.
+     *
+     * @param device        the device of the subscriber
+     * @param port          the port
+     * @param status        the status for this specific subscriber
+     * @param hasSubscriber is the subscriber present
+     * @param si            the information about the tags/dhcp and other info.
+     */
+    public DiscoveredSubscriber(Device device, Port port, Status status, boolean hasSubscriber,
+                                SubscriberAndDeviceInformation si) {
+        this.device = device;
+        this.port = port;
+        this.status = status;
+        this.hasSubscriber = hasSubscriber;
+        subscriberAndDeviceInformation = si;
+    }
+
+    /**
+     * Returns the port name for the subscriber.
+     *
+     * @return the port name.
+     */
+    public String portName() {
+        return OltUtils.getPortName(port);
+    }
+
+    @Override
+    public String toString() {
+
+        return String.format("%s (status: %s, provisionSubscriber: %s)",
+                portWithName(this.port),
+                this.status, this.hasSubscriber
+        );
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        DiscoveredSubscriber that = (DiscoveredSubscriber) o;
+        return hasSubscriber == that.hasSubscriber &&
+                port.equals(that.port) &&
+                device.equals(that.device) &&
+                status.equals(that.status);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(port, device, status, hasSubscriber, subscriberAndDeviceInformation);
+    }
+}
diff --git a/impl/src/main/java/org/opencord/olt/impl/MeterData.java b/impl/src/main/java/org/opencord/olt/impl/MeterData.java
new file mode 100644
index 0000000..208383d
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/impl/MeterData.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.impl;
+
+import org.onosproject.net.meter.MeterCellId;
+import org.onosproject.net.meter.MeterId;
+import org.onosproject.net.meter.MeterState;
+
+import java.util.Objects;
+
+/**
+ * Class containing Meter Data.
+ */
+public class MeterData {
+    private MeterCellId meterCellId;
+    private MeterState meterStatus;
+    private String bandwidthProfileName;
+
+    public MeterData(MeterCellId meterCellId, MeterState meterStatus, String bandwidthProfile) {
+        this.meterCellId = meterCellId;
+        this.meterStatus = meterStatus;
+        this.bandwidthProfileName = bandwidthProfile;
+    }
+
+    public void setMeterCellId(MeterCellId meterCellId) {
+        this.meterCellId = meterCellId;
+    }
+
+    public void setMeterStatus(MeterState meterStatus) {
+        this.meterStatus = meterStatus;
+    }
+
+    public MeterId getMeterId() {
+        return (MeterId) meterCellId;
+    }
+
+    public MeterCellId getMeterCellId() {
+        return meterCellId;
+    }
+
+    public MeterState getMeterStatus() {
+        return meterStatus;
+    }
+
+    public String getBandwidthProfileName() {
+        return bandwidthProfileName;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        MeterData meterData = (MeterData) o;
+        return Objects.equals(meterCellId, meterData.meterCellId) &&
+                meterStatus == meterData.meterStatus &&
+                Objects.equals(bandwidthProfileName, meterData.bandwidthProfileName);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(meterCellId, meterStatus, bandwidthProfileName);
+    }
+
+    @Override
+    public String toString() {
+        return "MeterData{" +
+                "meterCellId=" + meterCellId +
+                ", meterStatus=" + meterStatus +
+                ", bandwidthProfile='" + bandwidthProfileName + '\'' +
+                '}';
+    }
+}
diff --git a/impl/src/main/java/org/opencord/olt/impl/Olt.java b/impl/src/main/java/org/opencord/olt/impl/Olt.java
index 74c5811..747a0b4 100644
--- a/impl/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/impl/src/main/java/org/opencord/olt/impl/Olt.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-present Open Networking Foundation
+ * Copyright 2021-present Open Networking Foundation
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -15,18 +15,11 @@
  */
 package org.opencord.olt.impl;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Sets;
-import org.onlab.packet.MacAddress;
 import org.onlab.packet.VlanId;
 import org.onlab.util.KryoNamespace;
 import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.cluster.ClusterEvent;
-import org.onosproject.cluster.ClusterEventListener;
 import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.LeadershipService;
-import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.event.AbstractListenerManager;
@@ -35,36 +28,17 @@
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
-import org.onosproject.net.Host;
 import org.onosproject.net.Port;
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.device.DeviceEvent;
 import org.onosproject.net.device.DeviceListener;
 import org.onosproject.net.device.DeviceService;
-import org.onosproject.net.driver.DriverService;
-import org.onosproject.net.flow.FlowRuleService;
-import org.onosproject.net.flowobjective.FlowObjectiveService;
-import org.onosproject.net.flowobjective.ForwardingObjective;
-import org.onosproject.net.flowobjective.Objective;
-import org.onosproject.net.flowobjective.ObjectiveContext;
-import org.onosproject.net.flowobjective.ObjectiveError;
-import org.onosproject.net.host.HostEvent;
-import org.onosproject.net.host.HostListener;
-import org.onosproject.net.host.HostService;
-import org.onosproject.net.meter.MeterId;
-import org.onosproject.store.primitives.DefaultConsistentMap;
 import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.ConsistentMultimap;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
 import org.opencord.olt.AccessDeviceEvent;
 import org.opencord.olt.AccessDeviceListener;
-import org.opencord.olt.AccessDevicePort;
 import org.opencord.olt.AccessDeviceService;
-import org.opencord.olt.AccessSubscriberId;
-import org.opencord.olt.internalapi.AccessDeviceFlowService;
-import org.opencord.olt.internalapi.AccessDeviceMeterService;
-import org.opencord.sadis.BandwidthProfileInformation;
 import org.opencord.sadis.BaseInformationService;
 import org.opencord.sadis.SadisService;
 import org.opencord.sadis.SubscriberAndDeviceInformation;
@@ -78,73 +52,60 @@
 import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.osgi.service.component.annotations.ReferencePolicy;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Dictionary;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Strings.isNullOrEmpty;
-import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
-import static java.util.stream.Collectors.*;
 import static org.onlab.util.Tools.get;
 import static org.onlab.util.Tools.groupedThreads;
+import static org.opencord.olt.impl.OltUtils.getPortName;
+import static org.opencord.olt.impl.OltUtils.portWithName;
 import static org.opencord.olt.impl.OsgiPropertyConstants.*;
-import static org.slf4j.LoggerFactory.getLogger;
 
 /**
- * Provisions rules on access devices.
+ * OLT Application.
  */
 @Component(immediate = true,
         property = {
                 DEFAULT_BP_ID + ":String=" + DEFAULT_BP_ID_DEFAULT,
                 DEFAULT_MCAST_SERVICE_NAME + ":String=" + DEFAULT_MCAST_SERVICE_NAME_DEFAULT,
-                EAPOL_DELETE_RETRY_MAX_ATTEMPS + ":Integer=" +
-                        EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT,
-                PROVISION_DELAY + ":Integer=" + PROVISION_DELAY_DEFAULT,
+                FLOW_PROCESSING_THREADS + ":Integer=" + FLOW_PROCESSING_THREADS_DEFAULT,
+                SUBSCRIBER_PROCESSING_THREADS + ":Integer=" + SUBSCRIBER_PROCESSING_THREADS_DEFAULT,
+                REQUEUE_DELAY + ":Integer=" + REQUEUE_DELAY_DEFAULT
         })
 public class Olt
         extends AbstractListenerManager<AccessDeviceEvent, AccessDeviceListener>
         implements AccessDeviceService {
-    private static final String SADIS_NOT_RUNNING = "Sadis is not running.";
-    private static final String APP_NAME = "org.opencord.olt";
-
-    private static final short EAPOL_DEFAULT_VLAN = 4091;
-    private static final String NO_UPLINK_PORT = "No uplink port found for OLT device {}";
-
-    public static final int HASH_WEIGHT = 10;
-    public static final long PENDING_SUBS_MAP_TIMEOUT_MILLIS = 30000L;
-
-    private final Logger log = getLogger(getClass());
-
-    private static final String NNI = "nni-";
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected FlowObjectiveService flowObjectiveService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected DeviceService deviceService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ComponentConfigService cfgService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected CoreService coreService;
+    protected MastershipService mastershipService;
 
-    //Dependency on driver service is to ensure correct startup order
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected DriverService driverService;
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected LeadershipService leadershipService;
 
     @Reference(cardinality = ReferenceCardinality.OPTIONAL,
             bind = "bindSadisService",
@@ -153,31 +114,23 @@
     protected volatile SadisService sadisService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected AccessDeviceFlowService oltFlowService;
+    protected OltDeviceServiceInterface oltDeviceService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected AccessDeviceMeterService oltMeterService;
+    protected OltFlowServiceInterface oltFlowService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected OltMeterServiceInterface oltMeterService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected StorageService storageService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected ClusterService clusterService;
+    protected CoreService coreService;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected MastershipService mastershipService;
+    protected ApplicationId appId;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected LeadershipService leadershipService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected FlowRuleService flowRuleService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected ComponentConfigService componentConfigService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected HostService hostService;
+    private static final String ONOS_OLT_SERVICE = "onos/olt-service";
 
     /**
      * Default bandwidth profile id that is used for authentication trap flows.
@@ -190,456 +143,322 @@
     protected String multicastServiceName = DEFAULT_MCAST_SERVICE_NAME_DEFAULT;
 
     /**
-     * Default amounts of eapol retry.
+     * Number of threads used to process flows.
      **/
-    protected int eapolDeleteRetryMaxAttempts = EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT;
+    protected int flowProcessingThreads = FLOW_PROCESSING_THREADS_DEFAULT;
 
     /**
-     * Delay between EAPOL removal and data plane flows provisioning.
+     * Number of threads used to process flows.
+     **/
+    protected int subscriberProcessingThreads = SUBSCRIBER_PROCESSING_THREADS_DEFAULT;
+
+    /**
+     * Delay in ms to put an event back in the queue, used to avoid retrying things to often if conditions are not met.
+     **/
+    protected int requeueDelay = REQUEUE_DELAY_DEFAULT;
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    /**
+     * A queue to asynchronously process events.
      */
-    protected int provisionDelay = PROVISION_DELAY_DEFAULT;
-
-    private final DeviceListener deviceListener = new InternalDeviceListener();
-    private final ClusterEventListener clusterListener = new InternalClusterListener();
-    private final HostListener hostListener = new InternalHostListener();
-
-    private ConsistentHasher hasher;
+    protected Map<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>> eventsQueues;
 
     protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
-    private BaseInformationService<BandwidthProfileInformation> bpService;
 
-    private ExecutorService oltInstallers = Executors.newFixedThreadPool(4,
-                                                                         groupedThreads("onos/olt-service",
-                                                                                        "olt-installer-%d"));
+    /**
+     * Listener for OLT devices events.
+     */
+    protected OltDeviceListener deviceListener = new OltDeviceListener();
+    protected ScheduledExecutorService discoveredSubscriberExecutor =
+            Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
+                    "discovered-cp-%d", log));
 
-    protected ExecutorService eventExecutor;
-    protected ExecutorService hostEventExecutor;
-    protected ExecutorService retryExecutor;
-    protected ScheduledExecutorService provisionExecutor;
+    protected ScheduledExecutorService queueExecutor =
+            Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
+                    "discovered-cp-restore-%d", log));
 
-    private ConsistentMultimap<ConnectPoint, UniTagInformation> programmedSubs;
-    private ConsistentMultimap<ConnectPoint, UniTagInformation> failedSubs;
+    /**
+     * Executor used to defer flow provisioning to a different thread pool.
+     */
+    protected ExecutorService flowsExecutor;
 
-    protected Map<DeviceId, BlockingQueue<SubscriberFlowInfo>> pendingSubscribersForDevice;
-    private ConsistentMultimap<ConnectPoint, SubscriberFlowInfo> waitingMacSubscribers;
+    /**
+     * Executor used to defer subscriber handling from API call to a different thread pool.
+     */
+    protected ExecutorService subscriberExecutor;
+
+    private static final String APP_NAME = "org.opencord.olt";
+
+    private final ReentrantReadWriteLock queueLock = new ReentrantReadWriteLock();
+    private final Lock queueWriteLock = queueLock.writeLock();
+    private final Lock queueReadLock = queueLock.readLock();
 
     @Activate
-    public void activate(ComponentContext context) {
-        eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
-                                                                        "events-%d", log));
-        hostEventExecutor = Executors.newFixedThreadPool(8, groupedThreads("onos/olt", "mac-events-%d", log));
-        retryExecutor = Executors.newCachedThreadPool();
-        provisionExecutor = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
-                "provision-%d", log));
+    protected void activate(ComponentContext context) {
+        cfgService.registerProperties(getClass());
 
         modified(context);
-        ApplicationId appId = coreService.registerApplication(APP_NAME);
-        componentConfigService.registerProperties(getClass());
 
+        appId = coreService.registerApplication(APP_NAME);
         KryoNamespace serializer = KryoNamespace.newBuilder()
                 .register(KryoNamespaces.API)
+                .register(ConnectPoint.class)
+                .register(DiscoveredSubscriber.class)
+                .register(DiscoveredSubscriber.Status.class)
+                .register(SubscriberAndDeviceInformation.class)
                 .register(UniTagInformation.class)
-                .register(SubscriberFlowInfo.class)
-                .register(AccessDevicePort.class)
-                .register(AccessDevicePort.Type.class)
                 .register(LinkedBlockingQueue.class)
                 .build();
 
-        programmedSubs = storageService.<ConnectPoint, UniTagInformation>consistentMultimapBuilder()
-                .withName("volt-programmed-subs")
+        eventsQueues = storageService.<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>>consistentMapBuilder()
+                .withName("volt-subscriber-queues")
+                .withApplicationId(appId)
                 .withSerializer(Serializer.using(serializer))
-                .withApplicationId(appId)
-                .build();
-
-        failedSubs = storageService.<ConnectPoint, UniTagInformation>consistentMultimapBuilder()
-                .withName("volt-failed-subs")
-                .withSerializer(Serializer.using(serializer))
-                .withApplicationId(appId)
-                .build();
-
-        KryoNamespace macSerializer = KryoNamespace.newBuilder()
-                .register(KryoNamespaces.API)
-                .register(SubscriberFlowInfo.class)
-                .register(AccessDevicePort.class)
-                .register(AccessDevicePort.Type.class)
-                .register(UniTagInformation.class)
-                .build();
-
-        waitingMacSubscribers = storageService.<ConnectPoint, SubscriberFlowInfo>consistentMultimapBuilder()
-                .withName("volt-waiting-mac-subs")
-                .withSerializer(Serializer.using(macSerializer))
-                .withApplicationId(appId)
-                .build();
-        //TODO possibly use consistent multimap with compute on key and element to avoid
-        // lock on all objects of the map, while instead obtaining and releasing the lock
-        // on a per subscriber basis.
-        pendingSubscribersForDevice = new DefaultConsistentMap<>(
-                storageService.<DeviceId, BlockingQueue<SubscriberFlowInfo>>consistentMapBuilder()
-                .withName("volt-pending-subs")
-                .withSerializer(Serializer.using(serializer))
-                .withApplicationId(appId)
-                .buildAsyncMap(), PENDING_SUBS_MAP_TIMEOUT_MILLIS).asJavaMap();
-        eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
-
-        if (sadisService != null) {
-            subsService = sadisService.getSubscriberInfoService();
-            bpService = sadisService.getBandwidthProfileService();
-        } else {
-            log.warn(SADIS_NOT_RUNNING);
-        }
-
-        List<NodeId> readyNodes = clusterService.getNodes().stream()
-                .filter(c -> clusterService.getState(c.id()) == ControllerNode.State.READY)
-                .map(ControllerNode::id)
-                .collect(toList());
-        hasher = new ConsistentHasher(readyNodes, HASH_WEIGHT);
-        clusterService.addListener(clusterListener);
-
-        // look for all provisioned devices in Sadis and create EAPOL flows for the
-        // UNI ports
-        Iterable<Device> devices = deviceService.getDevices();
-        for (Device d : devices) {
-            if (isLocalLeader(d.id())) {
-                checkAndCreateDeviceFlows(d);
-            }
-        }
+                .build().asJavaMap();
 
         deviceService.addListener(deviceListener);
-        hostService.addListener(hostListener);
-        log.info("Started with Application ID {}", appId.id());
+
+        discoveredSubscriberExecutor.execute(this::processDiscoveredSubscribers);
+        eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
+        log.info("Started");
+
+        deviceListener.handleExistingPorts();
     }
 
     @Deactivate
-    public void deactivate() {
-        componentConfigService.unregisterProperties(getClass(), false);
-        clusterService.removeListener(clusterListener);
-        deviceService.removeListener(deviceListener);
-        hostService.removeListener(hostListener);
-        eventDispatcher.removeSink(AccessDeviceEvent.class);
-        eventExecutor.shutdown();
-        hostEventExecutor.shutdown();
-        retryExecutor.shutdown();
-        provisionExecutor.shutdown();
+    protected void deactivate(ComponentContext context) {
+        cfgService.unregisterProperties(getClass(), false);
+        discoveredSubscriberExecutor.shutdown();
+        flowsExecutor.shutdown();
+        subscriberExecutor.shutdown();
+        deviceListener.deactivate();
         log.info("Stopped");
     }
 
     @Modified
     public void modified(ComponentContext context) {
         Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
-
-        try {
+        if (context != null) {
             String bpId = get(properties, DEFAULT_BP_ID);
             defaultBpId = isNullOrEmpty(bpId) ? defaultBpId : bpId;
 
             String mcastSN = get(properties, DEFAULT_MCAST_SERVICE_NAME);
             multicastServiceName = isNullOrEmpty(mcastSN) ? multicastServiceName : mcastSN;
 
-            String eapolDeleteRetryNew = get(properties, EAPOL_DELETE_RETRY_MAX_ATTEMPS);
-            eapolDeleteRetryMaxAttempts = isNullOrEmpty(eapolDeleteRetryNew) ? EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT :
-                    Integer.parseInt(eapolDeleteRetryNew.trim());
+            String flowThreads = get(properties, FLOW_PROCESSING_THREADS);
+            int oldFlowProcessingThreads = flowProcessingThreads;
+            flowProcessingThreads = isNullOrEmpty(flowThreads) ?
+                    oldFlowProcessingThreads : Integer.parseInt(flowThreads.trim());
 
-            log.debug("OLT properties: DefaultBpId: {}, MulticastServiceName: {}, EapolDeleteRetryMaxAttempts: {}",
-                      defaultBpId, multicastServiceName, eapolDeleteRetryMaxAttempts);
+            if (flowsExecutor == null || oldFlowProcessingThreads != flowProcessingThreads) {
+                if (flowsExecutor != null) {
+                    flowsExecutor.shutdown();
+                }
+                flowsExecutor = Executors.newFixedThreadPool(flowProcessingThreads,
+                        groupedThreads(ONOS_OLT_SERVICE,
+                                "flows-installer-%d"));
+            }
 
-        } catch (Exception e) {
-            log.error("Error while modifying the properties", e);
-            defaultBpId = DEFAULT_BP_ID_DEFAULT;
-            multicastServiceName = DEFAULT_MCAST_SERVICE_NAME_DEFAULT;
+            String subscriberThreads = get(properties, SUBSCRIBER_PROCESSING_THREADS);
+            int oldSubscriberProcessingThreads = subscriberProcessingThreads;
+            subscriberProcessingThreads = isNullOrEmpty(subscriberThreads) ?
+                    oldSubscriberProcessingThreads : Integer.parseInt(subscriberThreads.trim());
+
+            if (subscriberExecutor == null || oldSubscriberProcessingThreads != subscriberProcessingThreads) {
+                if (subscriberExecutor != null) {
+                    subscriberExecutor.shutdown();
+                }
+                subscriberExecutor = Executors.newFixedThreadPool(subscriberProcessingThreads,
+                        groupedThreads(ONOS_OLT_SERVICE,
+                                "subscriber-installer-%d"));
+            }
+
+            String queueDelay = get(properties, REQUEUE_DELAY);
+            requeueDelay = isNullOrEmpty(queueDelay) ?
+                    REQUEUE_DELAY_DEFAULT : Integer.parseInt(queueDelay.trim());
         }
+        log.info("Modified. Values = {}: {}, {}: {}, " +
+                        "{}:{}, {}:{}, {}:{}",
+                DEFAULT_BP_ID, defaultBpId,
+                DEFAULT_MCAST_SERVICE_NAME, multicastServiceName,
+                FLOW_PROCESSING_THREADS, flowProcessingThreads,
+                SUBSCRIBER_PROCESSING_THREADS, subscriberProcessingThreads,
+                REQUEUE_DELAY, requeueDelay);
     }
 
-    protected void bindSadisService(SadisService service) {
-        sadisService = service;
-        bpService = sadisService.getBandwidthProfileService();
-        subsService = sadisService.getSubscriberInfoService();
-        log.info("Sadis-service binds to onos.");
-    }
-
-    protected void unbindSadisService(SadisService service) {
-        sadisService = null;
-        bpService = null;
-        subsService = null;
-        log.info("Sadis-service unbinds from onos.");
-    }
 
     @Override
-    public boolean provisionSubscriber(ConnectPoint connectPoint) {
-        log.info("Call to provision subscriber at {}", connectPoint);
-        DeviceId deviceId = connectPoint.deviceId();
-        Port subscriberPortOnos = deviceService.getPort(deviceId, connectPoint.port());
-        checkNotNull(subscriberPortOnos, "Invalid connect point:" + connectPoint);
-        AccessDevicePort subscriberPort = new AccessDevicePort(subscriberPortOnos, AccessDevicePort.Type.UNI);
+    public boolean provisionSubscriber(ConnectPoint cp) {
+        subscriberExecutor.submit(() -> {
+            Device device = deviceService.getDevice(cp.deviceId());
+            Port port = deviceService.getPort(device.id(), cp.port());
+            AccessDevicePort accessDevicePort = new AccessDevicePort(port);
 
-        if (isSubscriberInstalled(connectPoint)) {
-            log.warn("Subscriber at {} already provisioned or in the process .."
-                    + " not taking any more action", connectPoint);
-            return true;
-        }
-
-        // Find the subscriber config at this connect point
-        SubscriberAndDeviceInformation sub = getSubscriber(connectPoint);
-        if (sub == null) {
-            log.warn("No subscriber found for {}", connectPoint);
-            return false;
-        }
-
-        // Get the uplink port
-        AccessDevicePort uplinkPort = getUplinkPort(deviceService.getDevice(deviceId));
-        if (uplinkPort == null) {
-            log.warn(NO_UPLINK_PORT, deviceId);
-            return false;
-        }
-
-        // delete Eapol authentication flow with default bandwidth
-        // wait until Eapol rule with defaultBpId is removed to install subscriber-based rules
-        // retry deletion if it fails/times-out
-        retryExecutor.execute(new DeleteEapolInstallSub(subscriberPort, uplinkPort, sub, 1));
-        return true;
-    }
-
-    // returns true if subscriber is programmed or in the process of being programmed
-    private boolean isSubscriberInstalled(ConnectPoint connectPoint) {
-        Collection<? extends UniTagInformation> uniTagInformationSet =
-                programmedSubs.get(connectPoint).value();
-        if (!uniTagInformationSet.isEmpty()) {
-            return true;
-        }
-        //Check if the subscriber is already getting provisioned
-        // so we do not provision twice
-        AtomicBoolean isPending = new AtomicBoolean(false);
-        pendingSubscribersForDevice.computeIfPresent(connectPoint.deviceId(), (id, infos) -> {
-            for (SubscriberFlowInfo fi : infos) {
-                if (fi.getUniPort().equals(connectPoint.port())) {
-                    log.debug("Subscriber is already pending, {}", fi);
-                    isPending.set(true);
-                    break;
-                }
+            if (oltDeviceService.isNniPort(device, port.number())) {
+                log.warn("will not provision a subscriber on the NNI {}", accessDevicePort);
+                return false;
             }
-            return infos;
+
+            log.info("Provisioning subscriber on {}", accessDevicePort);
+
+            if (oltFlowService.isSubscriberServiceProvisioned(accessDevicePort)) {
+                log.error("Subscriber on {} is already provisioned", accessDevicePort);
+                return false;
+            }
+
+            SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
+            if (si == null) {
+                log.error("Subscriber information not found in sadis for port {}", accessDevicePort);
+                return false;
+            }
+            DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
+                    DiscoveredSubscriber.Status.ADDED, true, si);
+
+            // NOTE we need to keep a list of the subscribers that are provisioned on a port,
+            // regardless of the flow status
+            si.uniTagList().forEach(uti -> {
+                ServiceKey sk = new ServiceKey(accessDevicePort, uti);
+                oltFlowService.updateProvisionedSubscriberStatus(sk, true);
+            });
+
+            addSubscriberToQueue(sub);
+            return true;
         });
-
-        return isPending.get();
-    }
-
-    private class DeleteEapolInstallSub implements Runnable {
-        AccessDevicePort subscriberPort;
-        AccessDevicePort uplinkPort;
-        SubscriberAndDeviceInformation sub;
-        private int attemptNumber;
-
-        DeleteEapolInstallSub(AccessDevicePort subscriberPort, AccessDevicePort uplinkPort,
-                              SubscriberAndDeviceInformation sub,
-                              int attemptNumber) {
-            this.subscriberPort = subscriberPort;
-            this.uplinkPort = uplinkPort;
-            this.sub = sub;
-            this.attemptNumber = attemptNumber;
-        }
-
-        @Override
-        public void run() {
-            CompletableFuture<ObjectiveError> filterFuture = new CompletableFuture<>();
-            oltFlowService.processEapolFilteringObjectives(subscriberPort,
-                                                     defaultBpId, Optional.empty(), filterFuture,
-                                                     VlanId.vlanId(EAPOL_DEFAULT_VLAN),
-                                                     false);
-            filterFuture.thenAcceptAsync(filterStatus -> {
-                if (filterStatus == null) {
-                    log.info("Default eapol flow deleted in attempt {} of {}"
-                            + "... provisioning subscriber flows on {}",
-                            attemptNumber, eapolDeleteRetryMaxAttempts, subscriberPort);
-
-                    // FIXME this is needed to prevent that default EAPOL flow removal and
-                    // data plane flows install are received by the device at the same time
-                    provisionExecutor.schedule(
-                            () -> provisionUniTagList(subscriberPort, uplinkPort, sub),
-                            provisionDelay, TimeUnit.MILLISECONDS);
-                } else {
-                    if (attemptNumber <= eapolDeleteRetryMaxAttempts) {
-                        log.warn("The filtering future failed {} for subscriber on {}"
-                                + "... retrying {} of {} attempts",
-                                 filterStatus, subscriberPort, attemptNumber, eapolDeleteRetryMaxAttempts);
-                        retryExecutor.execute(
-                                         new DeleteEapolInstallSub(subscriberPort, uplinkPort, sub,
-                                                                   attemptNumber + 1));
-                    } else {
-                        log.error("The filtering future failed {} for subscriber on {}"
-                                + "after {} attempts. Subscriber provisioning failed",
-                                  filterStatus, subscriberPort, eapolDeleteRetryMaxAttempts);
-                        sub.uniTagList().forEach(ut ->
-                                failedSubs.put(
-                                        new ConnectPoint(subscriberPort.deviceId(), subscriberPort.number()), ut));
-                    }
-                }
-            });
-        }
-
-    }
-
-    @Override
-    public boolean removeSubscriber(ConnectPoint connectPoint) {
-        Port subscriberPort = deviceService.getPort(connectPoint);
-        if (subscriberPort == null) {
-            log.error("Subscriber port not found at: {}", connectPoint);
-            return false;
-        }
-        return removeSubscriber(new AccessDevicePort(subscriberPort, AccessDevicePort.Type.UNI));
-    }
-
-    private boolean removeSubscriber(AccessDevicePort subscriberPort) {
-        log.info("Call to un-provision subscriber at {}", subscriberPort);
-        //TODO we need to check if the subscriber is pending
-        // Get the subscriber connected to this port from the local cache
-        // If we don't know about the subscriber there's no need to remove it
-        DeviceId deviceId = subscriberPort.deviceId();
-
-        ConnectPoint connectPoint = new ConnectPoint(deviceId, subscriberPort.number());
-        Collection<? extends UniTagInformation> uniTagInformationSet = programmedSubs.get(connectPoint).value();
-        if (uniTagInformationSet == null || uniTagInformationSet.isEmpty()) {
-            log.warn("Subscriber on connectionPoint {} was not previously programmed, " +
-                             "no need to remove it", connectPoint);
-            return true;
-        }
-
-        // Get the uplink port
-        AccessDevicePort uplinkPort = getUplinkPort(deviceService.getDevice(deviceId));
-        if (uplinkPort == null) {
-            log.warn(NO_UPLINK_PORT, deviceId);
-            return false;
-        }
-
-        for (UniTagInformation uniTag : uniTagInformationSet) {
-
-            if (multicastServiceName.equals(uniTag.getServiceName())) {
-                continue;
-            }
-
-            unprovisionVlans(uplinkPort, subscriberPort, uniTag);
-
-            // remove eapol with subscriber bandwidth profile
-            Optional<String> upstreamOltBw = uniTag.getUpstreamOltBandwidthProfile() == null ?
-                    Optional.empty() : Optional.of(uniTag.getUpstreamOltBandwidthProfile());
-            oltFlowService.processEapolFilteringObjectives(subscriberPort,
-                                                           uniTag.getUpstreamBandwidthProfile(),
-                                                           upstreamOltBw,
-                                                           null, uniTag.getPonCTag(), false);
-
-            if (subscriberPort.port() != null && subscriberPort.isEnabled()) {
-                // reinstall eapol with default bandwidth profile
-                oltFlowService.processEapolFilteringObjectives(subscriberPort, defaultBpId, Optional.empty(),
-                        null, VlanId.vlanId(EAPOL_DEFAULT_VLAN), true);
-            } else {
-                log.debug("Port {} is no longer enabled or it's unavailable. Not "
-                                  + "reprogramming default eapol flow", connectPoint);
-            }
-        }
+        //NOTE this only means we have taken the request in, nothing more.
         return true;
     }
 
-
     @Override
-    public boolean provisionSubscriber(AccessSubscriberId subscriberId, Optional<VlanId> sTag,
-                                       Optional<VlanId> cTag, Optional<Integer> tpId) {
+    public boolean removeSubscriber(ConnectPoint cp) {
+        subscriberExecutor.submit(() -> {
+            Device device = deviceService.getDevice(DeviceId.deviceId(cp.deviceId().toString()));
+            Port port = deviceService.getPort(device.id(), cp.port());
+            AccessDevicePort accessDevicePort = new AccessDevicePort(port);
 
-        log.info("Provisioning subscriber using subscriberId {}, sTag {}, cTag {}, tpId {}",
-                subscriberId, sTag, cTag, tpId);
-
-        // Check if we can find the connect point to which this subscriber is connected
-        ConnectPoint cp = findSubscriberConnectPoint(subscriberId.toString());
-        if (cp == null) {
-            log.warn("ConnectPoint for {} not found", subscriberId);
-            return false;
-        }
-        AccessDevicePort subscriberPort = new AccessDevicePort(deviceService.getPort(cp), AccessDevicePort.Type.UNI);
-
-        if (!sTag.isPresent() && !cTag.isPresent()) {
-            return provisionSubscriber(cp);
-        } else if (sTag.isPresent() && cTag.isPresent() && tpId.isPresent()) {
-            AccessDevicePort uplinkPort = getUplinkPort(deviceService.getDevice(cp.deviceId()));
-            if (uplinkPort == null) {
-                log.warn(NO_UPLINK_PORT, cp.deviceId());
+            if (oltDeviceService.isNniPort(device, port.number())) {
+                log.warn("will not un-provision a subscriber on the NNI {}",
+                        accessDevicePort);
                 return false;
             }
 
-            //delete Eapol authentication flow with default bandwidth
-            //wait until Eapol rule with defaultBpId is removed to install subscriber-based rules
-            //install subscriber flows
-            CompletableFuture<ObjectiveError> filterFuture = new CompletableFuture<>();
-            oltFlowService.processEapolFilteringObjectives(subscriberPort, defaultBpId, Optional.empty(),
-                    filterFuture, VlanId.vlanId(EAPOL_DEFAULT_VLAN), false);
-            filterFuture.thenAcceptAsync(filterStatus -> {
-                if (filterStatus == null) {
-                    provisionUniTagInformation(uplinkPort, subscriberPort, cTag.get(), sTag.get(), tpId.get());
-                }
+            log.info("Un-provisioning subscriber on {}", accessDevicePort);
+
+            if (!oltFlowService.isSubscriberServiceProvisioned(accessDevicePort)) {
+                log.error("Subscriber on {} is not provisioned", accessDevicePort);
+                return false;
+            }
+
+            SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
+            if (si == null) {
+                log.error("Subscriber information not found in sadis for port {}",
+                        accessDevicePort);
+                // NOTE that we are returning true so that the subscriber is removed from the queue
+                // and we can move on provisioning others
+                return false;
+            }
+            DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
+                    DiscoveredSubscriber.Status.REMOVED, true, si);
+
+            // NOTE we need to keep a list of the subscribers that are provisioned on a port,
+            // regardless of the flow status
+            si.uniTagList().forEach(uti -> {
+                ServiceKey sk = new ServiceKey(accessDevicePort, uti);
+                oltFlowService.updateProvisionedSubscriberStatus(sk, false);
             });
+
+            addSubscriberToQueue(sub);
             return true;
-        } else {
-            log.warn("Provisioning failed for subscriber: {}", subscriberId);
+        });
+        //NOTE this only means we have taken the request in, nothing more.
+        return true;
+    }
+
+    @Override
+    public boolean provisionSubscriber(ConnectPoint cp, VlanId cTag, VlanId sTag, Integer tpId) {
+        log.debug("Provisioning subscriber on {}, with cTag {}, stag {}, tpId {}",
+                cp, cTag, sTag, tpId);
+        Device device = deviceService.getDevice(cp.deviceId());
+        Port port = deviceService.getPort(device.id(), cp.port());
+        AccessDevicePort accessDevicePort = new AccessDevicePort(port);
+
+        if (oltDeviceService.isNniPort(device, port.number())) {
+            log.warn("will not provision a subscriber on the NNI {}", accessDevicePort);
             return false;
         }
-    }
 
-    @Override
-    public boolean removeSubscriber(AccessSubscriberId subscriberId, Optional<VlanId> sTag,
-                                    Optional<VlanId> cTag, Optional<Integer> tpId) {
-        // Check if we can find the connect point to which this subscriber is connected
-        ConnectPoint cp = findSubscriberConnectPoint(subscriberId.toString());
-        if (cp == null) {
-            log.warn("ConnectPoint for {} not found", subscriberId);
+        SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
+        UniTagInformation specificService = getUniTagInformation(getPortName(port), cTag, sTag, tpId);
+        if (specificService == null) {
+            log.error("Can't find Information for subscriber on {}, with cTag {}, " +
+                    "stag {}, tpId {}", cp, cTag, sTag, tpId);
             return false;
         }
-        AccessDevicePort subscriberPort = new AccessDevicePort(deviceService.getPort(cp), AccessDevicePort.Type.UNI);
+        List<UniTagInformation> uniTagInformationList = new LinkedList<>();
+        uniTagInformationList.add(specificService);
+        si.setUniTagList(uniTagInformationList);
+        DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
+                DiscoveredSubscriber.Status.ADDED, true, si);
 
-        if (!sTag.isPresent() && !cTag.isPresent()) {
-            return removeSubscriber(cp);
-        } else if (sTag.isPresent() && cTag.isPresent() && tpId.isPresent()) {
-            // Get the uplink port
-            AccessDevicePort uplinkPort = getUplinkPort(deviceService.getDevice(cp.deviceId()));
-            if (uplinkPort == null) {
-                log.warn(NO_UPLINK_PORT, cp.deviceId());
-                return false;
-            }
-
-            Optional<UniTagInformation> tagInfo = getUniTagInformation(subscriberPort, cTag.get(),
-                    sTag.get(), tpId.get());
-            if (!tagInfo.isPresent()) {
-                log.warn("UniTagInformation does not exist for {}, cTag {}, sTag {}, tpId {}",
-                        subscriberPort, cTag, sTag, tpId);
-                return false;
-            }
-
-            unprovisionVlans(uplinkPort, subscriberPort, tagInfo.get());
-            return true;
-        } else {
-            log.warn("Removing subscriber is not possible - please check the provided information" +
-                             "for the subscriber: {}", subscriberId);
+        // NOTE we need to keep a list of the subscribers that are provisioned on a port,
+        // regardless of the flow status
+        ServiceKey sk = new ServiceKey(accessDevicePort, specificService);
+        if (oltFlowService.isSubscriberServiceProvisioned(sk)) {
+            log.error("Subscriber on {} is already provisioned", sk);
             return false;
         }
+        oltFlowService.updateProvisionedSubscriberStatus(sk, true);
+
+        addSubscriberToQueue(sub);
+        return true;
     }
 
     @Override
-    public ImmutableMap<ConnectPoint, Set<UniTagInformation>> getProgSubs() {
-        return programmedSubs.stream()
-                .collect(collectingAndThen(
-                        groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toSet())),
-                        ImmutableMap::copyOf));
+    public boolean removeSubscriber(ConnectPoint cp, VlanId cTag, VlanId sTag, Integer tpId) {
+        log.debug("Un-provisioning subscriber on {} with cTag {}, stag {}, tpId {}",
+                cp, cTag, sTag, tpId);
+        Device device = deviceService.getDevice(cp.deviceId());
+        Port port = deviceService.getPort(device.id(), cp.port());
+        AccessDevicePort accessDevicePort = new AccessDevicePort(port);
+
+        if (oltDeviceService.isNniPort(device, port.number())) {
+            log.warn("will not un-provision a subscriber on the NNI {}",
+                    accessDevicePort);
+            return false;
+        }
+
+        SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
+        UniTagInformation specificService = getUniTagInformation(getPortName(port), cTag, sTag, tpId);
+        if (specificService == null) {
+            log.error("Can't find Information for subscriber on {}, with cTag {}, " +
+                    "stag {}, tpId {}", cp, cTag, sTag, tpId);
+            return false;
+        }
+        List<UniTagInformation> uniTagInformationList = new LinkedList<>();
+        uniTagInformationList.add(specificService);
+        si.setUniTagList(uniTagInformationList);
+        DiscoveredSubscriber sub = new DiscoveredSubscriber(device, port,
+                DiscoveredSubscriber.Status.ADDED, true, si);
+
+        // NOTE we need to keep a list of the subscribers that are provisioned on a port,
+        // regardless of the flow status
+        ServiceKey sk = new ServiceKey(accessDevicePort, specificService);
+        if (!oltFlowService.isSubscriberServiceProvisioned(sk)) {
+            log.error("Subscriber on {} is not provisioned", sk);
+            return false;
+        }
+        oltFlowService.updateProvisionedSubscriberStatus(sk, false);
+
+        addSubscriberToQueue(sub);
+        return true;
     }
 
     @Override
-    public ImmutableMap<ConnectPoint, Set<UniTagInformation>> getFailedSubs() {
-        return failedSubs.stream()
-                .collect(collectingAndThen(
-                        groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toSet())),
-                        ImmutableMap::copyOf));
-    }
-
-    @Override
-    public List<DeviceId> fetchOlts() {
-        // look through all the devices and find the ones that are OLTs as per Sadis
+    public List<DeviceId> getConnectedOlts() {
         List<DeviceId> olts = new ArrayList<>();
         Iterable<Device> devices = deviceService.getDevices();
         for (Device d : devices) {
-            if (getOltInfo(d) != null) {
+            if (oltDeviceService.isOlt(d)) {
                 // So this is indeed an OLT device
                 olts.add(d.id());
             }
@@ -648,19 +467,20 @@
     }
 
     /**
-     * Finds the connect point to which a subscriber is connected.
+     * Finds the connect-point to which a subscriber is connected.
      *
      * @param id The id of the subscriber, this is the same ID as in Sadis
      * @return Subscribers ConnectPoint if found else null
      */
-    private ConnectPoint findSubscriberConnectPoint(String id) {
+    @Override
+    public ConnectPoint findSubscriberConnectPoint(String id) {
 
         Iterable<Device> devices = deviceService.getDevices();
         for (Device d : devices) {
             for (Port p : deviceService.getPorts(d.id())) {
                 log.trace("Comparing {} with {}", p.annotations().value(AnnotationKeys.PORT_NAME), id);
                 if (p.annotations().value(AnnotationKeys.PORT_NAME).equals(id)) {
-                    log.debug("Found on device {} port {}", d.id(), p.number());
+                    log.debug("Found on {}", portWithName(p));
                     return new ConnectPoint(d.id(), p.number());
                 }
             }
@@ -668,584 +488,96 @@
         return null;
     }
 
-    /**
-     * Gets the context of the bandwidth profile information for the given parameter.
-     *
-     * @param bandwidthProfile the bandwidth profile id
-     * @return the context of the bandwidth profile information
-     */
-    private BandwidthProfileInformation getBandwidthProfileInformation(String bandwidthProfile) {
-        if (bpService == null) {
-            log.warn(SADIS_NOT_RUNNING);
-            return null;
-        }
-        if (bandwidthProfile == null) {
-            return null;
-        }
-        return bpService.get(bandwidthProfile);
-    }
+    protected void processDiscoveredSubscribers() {
+        log.info("Started processDiscoveredSubscribers loop");
+        while (true) {
+            Set<ConnectPoint> discoveredCps;
+            try {
+                queueReadLock.lock();
+                discoveredCps = eventsQueues.keySet();
+            } catch (Exception e) {
+                log.error("Cannot read keys from queue map", e);
+                return;
+            } finally {
+                queueReadLock.unlock();
+            }
 
-    /**
-     * Removes subscriber vlan flows.
-     *
-     * @param uplink         uplink port of the OLT
-     * @param subscriberPort uni port
-     * @param uniTag         uni tag information
-     */
-    private void unprovisionVlans(AccessDevicePort uplink, AccessDevicePort subscriberPort, UniTagInformation uniTag) {
-        log.info("Unprovisioning vlans for {} at {}", uniTag, subscriberPort);
-        DeviceId deviceId = subscriberPort.deviceId();
+            discoveredCps.forEach(cp -> {
+                LinkedBlockingQueue<DiscoveredSubscriber> eventsQueue;
 
-        CompletableFuture<ObjectiveError> downFuture = new CompletableFuture<>();
-        CompletableFuture<ObjectiveError> upFuture = new CompletableFuture<>();
-
-        VlanId deviceVlan = uniTag.getPonSTag();
-        VlanId subscriberVlan = uniTag.getPonCTag();
-
-        MeterId upstreamMeterId = oltMeterService
-                .getMeterIdFromBpMapping(deviceId, uniTag.getUpstreamBandwidthProfile());
-        MeterId downstreamMeterId = oltMeterService
-                .getMeterIdFromBpMapping(deviceId, uniTag.getDownstreamBandwidthProfile());
-        MeterId upstreamOltMeterId = oltMeterService
-                .getMeterIdFromBpMapping(deviceId, uniTag.getUpstreamOltBandwidthProfile());
-        MeterId downstreamOltMeterId = oltMeterService
-                .getMeterIdFromBpMapping(deviceId, uniTag.getDownstreamOltBandwidthProfile());
-
-        Optional<SubscriberFlowInfo> waitingMacSubFlowInfo =
-                getAndRemoveWaitingMacSubFlowInfoForCTag(new ConnectPoint(deviceId, subscriberPort.number()),
-                        subscriberVlan);
-        if (waitingMacSubFlowInfo.isPresent()) {
-            // only dhcp objectives applied previously, so only dhcp uninstallation objective will be processed
-            log.debug("Waiting MAC service removed and dhcp uninstallation objective will be processed. " +
-                    "waitingMacSubFlowInfo:{}", waitingMacSubFlowInfo.get());
-            CompletableFuture<ObjectiveError> dhcpFuture = new CompletableFuture<>();
-            oltFlowService.processDhcpFilteringObjectives(subscriberPort,
-                    upstreamMeterId, upstreamOltMeterId, uniTag, false, true, Optional.of(dhcpFuture));
-            dhcpFuture.thenAcceptAsync(dhcpStatus -> {
-                AccessDeviceEvent.Type type;
-                if (dhcpStatus == null) {
-                    type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_UNREGISTERED;
-                    log.debug("Dhcp uninstallation objective was processed successfully for cTag {}, sTag {}, " +
-                                    "tpId {} and Device/Port:{}", uniTag.getPonCTag(), uniTag.getPonSTag(),
-                            uniTag.getTechnologyProfileId(), subscriberPort);
-                    updateProgrammedSubscriber(subscriberPort, uniTag, false);
-                } else {
-                    type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_UNREGISTRATION_FAILED;
-                    log.error("Dhcp uninstallation objective was failed for cTag {}, sTag {}, " +
-                                    "tpId {} and Device/Port:{} :{}", uniTag.getPonCTag(), uniTag.getPonSTag(),
-                            uniTag.getTechnologyProfileId(), subscriberPort, dhcpStatus);
+                try {
+                    queueReadLock.lock();
+                    eventsQueue = eventsQueues.get(cp);
+                } catch (Exception e) {
+                    log.error("Cannot get key from queue map", e);
+                    return;
+                } finally {
+                    queueReadLock.unlock();
                 }
-                post(new AccessDeviceEvent(type, deviceId, subscriberPort.port(),
-                        deviceVlan, subscriberVlan, uniTag.getTechnologyProfileId()));
-            });
-            return;
-        } else {
-            log.debug("There is no waiting MAC service for {} and subscriberVlan: {}", subscriberPort, subscriberVlan);
-        }
 
-        ForwardingObjective.Builder upFwd =
-                oltFlowService.createUpBuilder(uplink, subscriberPort, upstreamMeterId, upstreamOltMeterId, uniTag);
-
-        Optional<MacAddress> macAddress = getMacAddress(deviceId, subscriberPort, uniTag);
-        ForwardingObjective.Builder downFwd =
-                oltFlowService.createDownBuilder(uplink, subscriberPort, downstreamMeterId, downstreamOltMeterId,
-                        uniTag, macAddress);
-
-        oltFlowService.processIgmpFilteringObjectives(subscriberPort, upstreamMeterId, upstreamOltMeterId, uniTag,
-                false, true);
-        oltFlowService.processDhcpFilteringObjectives(subscriberPort,
-                upstreamMeterId, upstreamOltMeterId, uniTag, false, true, Optional.empty());
-        oltFlowService.processPPPoEDFilteringObjectives(subscriberPort, upstreamMeterId, upstreamOltMeterId, uniTag,
-                false, true);
-
-        flowObjectiveService.forward(deviceId, upFwd.remove(new ObjectiveContext() {
-            @Override
-            public void onSuccess(Objective objective) {
-                upFuture.complete(null);
-            }
-
-            @Override
-            public void onError(Objective objective, ObjectiveError error) {
-                upFuture.complete(error);
-            }
-        }));
-
-        flowObjectiveService.forward(deviceId, downFwd.remove(new ObjectiveContext() {
-            @Override
-            public void onSuccess(Objective objective) {
-                downFuture.complete(null);
-            }
-
-            @Override
-            public void onError(Objective objective, ObjectiveError error) {
-                downFuture.complete(error);
-            }
-        }));
-
-        upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> {
-            AccessDeviceEvent.Type type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_UNREGISTERED;
-            if (upStatus == null && downStatus == null) {
-                log.debug("Uni tag information is unregistered successfully for cTag {}, sTag {}, tpId {} on {}",
-                        uniTag.getPonCTag(), uniTag.getPonSTag(), uniTag.getTechnologyProfileId(), subscriberPort);
-                updateProgrammedSubscriber(subscriberPort, uniTag, false);
-            } else if (downStatus != null) {
-                log.error("Subscriber with vlan {} on {} failed downstream uninstallation: {}",
-                          subscriberVlan, subscriberPort, downStatus);
-                type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_UNREGISTRATION_FAILED;
-            } else if (upStatus != null) {
-                log.error("Subscriber with vlan {} on {} failed upstream uninstallation: {}",
-                          subscriberVlan, subscriberPort, upStatus);
-                type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_UNREGISTRATION_FAILED;
-            }
-            post(new AccessDeviceEvent(type, deviceId, subscriberPort.port(), deviceVlan, subscriberVlan,
-                                       uniTag.getTechnologyProfileId()));
-        }, oltInstallers);
-    }
-
-    private Optional<SubscriberFlowInfo> getAndRemoveWaitingMacSubFlowInfoForCTag(ConnectPoint cp, VlanId cTag) {
-        SubscriberFlowInfo returnSubFlowInfo = null;
-        Collection<? extends SubscriberFlowInfo> subFlowInfoSet = waitingMacSubscribers.get(cp).value();
-        for (SubscriberFlowInfo subFlowInfo : subFlowInfoSet) {
-            if (subFlowInfo.getTagInfo().getPonCTag().equals(cTag)) {
-                returnSubFlowInfo = subFlowInfo;
-                break;
-            }
-        }
-        if (returnSubFlowInfo != null) {
-            waitingMacSubscribers.remove(cp, returnSubFlowInfo);
-            return Optional.of(returnSubFlowInfo);
-        }
-        return Optional.empty();
-    }
-
-    /**
-     * Adds subscriber vlan flows, dhcp, eapol and igmp trap flows for the related uni port.
-     *
-     * @param subPort      the connection point of the subscriber
-     * @param uplinkPort   uplink port of the OLT (the nni port)
-     * @param sub          subscriber information that includes s, c tags, tech profile and bandwidth profile references
-     */
-    private void provisionUniTagList(AccessDevicePort subPort, AccessDevicePort uplinkPort,
-                                     SubscriberAndDeviceInformation sub) {
-
-        log.debug("Provisioning vlans for subscriber on {}", subPort);
-        if (log.isTraceEnabled()) {
-            log.trace("Subscriber informations {}", sub);
-        }
-
-        if (sub.uniTagList() == null || sub.uniTagList().isEmpty()) {
-            log.warn("Unitaglist doesn't exist for the subscriber {} on {}", sub.id(), subPort);
-            return;
-        }
-
-        for (UniTagInformation uniTag : sub.uniTagList()) {
-            handleSubscriberFlows(uplinkPort, subPort, uniTag);
-        }
-    }
-
-    /**
-     * Finds the uni tag information and provisions the found information.
-     * If the uni tag information is not found, returns
-     *
-     * @param uplinkPort     the nni port
-     * @param subscriberPort the uni port
-     * @param innerVlan      the pon c tag
-     * @param outerVlan      the pon s tag
-     * @param tpId           the technology profile id
-     */
-    private void provisionUniTagInformation(AccessDevicePort uplinkPort,
-                                            AccessDevicePort subscriberPort,
-                                            VlanId innerVlan,
-                                            VlanId outerVlan,
-                                            Integer tpId) {
-
-        Optional<UniTagInformation> gotTagInformation = getUniTagInformation(subscriberPort, innerVlan,
-                outerVlan, tpId);
-        if (!gotTagInformation.isPresent()) {
-            return;
-        }
-        UniTagInformation tagInformation = gotTagInformation.get();
-        handleSubscriberFlows(uplinkPort, subscriberPort, tagInformation);
-    }
-
-    private void updateProgrammedSubscriber(AccessDevicePort port, UniTagInformation tagInformation, boolean add) {
-        if (add) {
-            programmedSubs.put(new ConnectPoint(port.deviceId(), port.number()), tagInformation);
-        } else {
-            programmedSubs.remove(new ConnectPoint(port.deviceId(), port.number()), tagInformation);
-        }
-    }
-
-    /**
-     * Installs a uni tag information flow.
-     *
-     * @param uplinkPort     the nni port
-     * @param subscriberPort the uni port
-     * @param tagInfo        the uni tag information
-     */
-    private void handleSubscriberFlows(AccessDevicePort uplinkPort, AccessDevicePort subscriberPort,
-                                       UniTagInformation tagInfo) {
-        log.debug("Provisioning vlan-based flows for the uniTagInformation {} on {}", tagInfo, subscriberPort);
-        DeviceId deviceId = subscriberPort.deviceId();
-
-        if (multicastServiceName.equals(tagInfo.getServiceName())) {
-            // IGMP flows are taken care of along with VOD service
-            // Please note that for each service, Subscriber Registered event will be sent
-            post(new AccessDeviceEvent(AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTERED, deviceId,
-                    subscriberPort.port(), tagInfo.getPonSTag(), tagInfo.getPonCTag(),
-                    tagInfo.getTechnologyProfileId()));
-            return;
-        }
-
-        BandwidthProfileInformation upstreamBpInfo =
-                getBandwidthProfileInformation(tagInfo.getUpstreamBandwidthProfile());
-        BandwidthProfileInformation downstreamBpInfo =
-                getBandwidthProfileInformation(tagInfo.getDownstreamBandwidthProfile());
-        BandwidthProfileInformation upstreamOltBpInfo =
-                getBandwidthProfileInformation(tagInfo.getUpstreamOltBandwidthProfile());
-        BandwidthProfileInformation downstreamOltBpInfo =
-                getBandwidthProfileInformation(tagInfo.getDownstreamOltBandwidthProfile());
-        if (upstreamBpInfo == null) {
-            log.warn("No meter installed since no Upstream BW Profile definition found for "
-                             + "ctag {} stag {} tpId {} on {}",
-                     tagInfo.getPonCTag(), tagInfo.getPonSTag(), tagInfo.getTechnologyProfileId(), subscriberPort);
-            return;
-        }
-        if (downstreamBpInfo == null) {
-            log.warn("No meter installed since no Downstream BW Profile definition found for "
-                             + "ctag {} stag {} tpId {} on {}",
-                     tagInfo.getPonCTag(), tagInfo.getPonSTag(),
-                     tagInfo.getTechnologyProfileId(), subscriberPort);
-            return;
-        }
-        if ((upstreamOltBpInfo != null && downstreamOltBpInfo == null) ||
-                (upstreamOltBpInfo == null && downstreamOltBpInfo != null)) {
-            log.warn("No meter installed since only one olt BW Profile definition found for "
-                            + "ctag {} stag {} tpId {} and Device/port: {}:{}",
-                    tagInfo.getPonCTag(), tagInfo.getPonSTag(),
-                    tagInfo.getTechnologyProfileId(), deviceId,
-                    subscriberPort);
-            return;
-        }
-
-        MeterId upOltMeterId = null;
-        MeterId downOltMeterId = null;
-
-        // check for meterIds for the upstream and downstream bandwidth profiles
-        MeterId upMeterId = oltMeterService
-                .getMeterIdFromBpMapping(deviceId, upstreamBpInfo.id());
-        MeterId downMeterId = oltMeterService
-                .getMeterIdFromBpMapping(deviceId, downstreamBpInfo.id());
-
-        if (upstreamOltBpInfo != null) {
-            // Multi UNI service
-            upOltMeterId = oltMeterService
-                    .getMeterIdFromBpMapping(deviceId, upstreamOltBpInfo.id());
-            downOltMeterId = oltMeterService
-                    .getMeterIdFromBpMapping(deviceId, downstreamOltBpInfo.id());
-        } else {
-            // NOT Multi UNI service
-            log.debug("OLT bandwidth profiles fields are set to ONU bandwidth profiles");
-            upstreamOltBpInfo = upstreamBpInfo;
-            downstreamOltBpInfo = downstreamBpInfo;
-            upOltMeterId = upMeterId;
-            downOltMeterId = downMeterId;
-        }
-        SubscriberFlowInfo fi = new SubscriberFlowInfo(uplinkPort, subscriberPort,
-                                                       tagInfo, downMeterId, upMeterId, downOltMeterId, upOltMeterId,
-                                                       downstreamBpInfo.id(), upstreamBpInfo.id(),
-                                                       downstreamOltBpInfo.id(), upstreamOltBpInfo.id());
-
-        if (upMeterId != null && downMeterId != null && upOltMeterId != null && downOltMeterId != null) {
-            log.debug("Meters are existing for upstream {} and downstream {} on {}",
-                    upstreamBpInfo.id(), downstreamBpInfo.id(), subscriberPort);
-            handleSubFlowsWithMeters(fi);
-        } else {
-            log.debug("Adding {} on {} to pending subs", fi, subscriberPort);
-            // one or both meters are not ready. It's possible they are in the process of being
-            // created for other subscribers that share the same bandwidth profile.
-            pendingSubscribersForDevice.compute(deviceId, (id, queue) -> {
-                if (queue == null) {
-                    queue = new LinkedBlockingQueue<>();
-                }
-                queue.add(fi);
-                log.info("Added {} to pending subscribers on {}", fi, subscriberPort);
-                return queue;
-            });
-
-            List<BandwidthProfileInformation> bws = new ArrayList<>();
-            // queue up the meters to be created
-            if (upMeterId == null) {
-                log.debug("Missing meter for upstream {} on {}", upstreamBpInfo.id(), subscriberPort);
-                bws.add(upstreamBpInfo);
-            }
-            if (downMeterId == null) {
-                log.debug("Missing meter for downstream {} on {}", downstreamBpInfo.id(), subscriberPort);
-                bws.add(downstreamBpInfo);
-            }
-            if (upOltMeterId == null) {
-                log.debug("Missing meter for upstreamOlt {} on {}", upstreamOltBpInfo.id(), subscriberPort);
-                bws.add(upstreamOltBpInfo);
-            }
-            if (downOltMeterId == null) {
-                log.debug("Missing meter for downstreamOlt {} on {}", downstreamOltBpInfo.id(), subscriberPort);
-                bws.add(downstreamOltBpInfo);
-            }
-            bws.stream().distinct().forEach(bw -> checkAndCreateDevMeter(deviceId, bw));
-        }
-    }
-
-    private void checkAndCreateDevMeter(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
-        log.debug("Checking and Creating Meter with {} on {}", bwpInfo, deviceId);
-        if (bwpInfo == null) {
-            log.error("Can't create meter. Bandwidth profile is null for device : {}", deviceId);
-            return;
-        }
-        //If false the meter is already being installed, skipping installation
-        if (!oltMeterService.checkAndAddPendingMeter(deviceId, bwpInfo)) {
-            log.debug("Meter is already being installed on {} for {}", deviceId, bwpInfo);
-            return;
-        }
-        createMeter(deviceId, bwpInfo);
-    }
-
-    private void createMeter(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
-        log.info("Creating Meter with {} on {}", bwpInfo, deviceId);
-        CompletableFuture<Object> meterFuture = new CompletableFuture<>();
-
-        MeterId meterId = oltMeterService.createMeter(deviceId, bwpInfo,
-                                                      meterFuture);
-
-        meterFuture.thenAcceptAsync(result -> {
-            log.info("Meter Future for {} has completed", meterId);
-            pendingSubscribersForDevice.compute(deviceId, (id, queue) -> {
-                // iterate through the subscribers on hold
-                if (queue != null && !queue.isEmpty()) {
-                    while (true) {
-                        //TODO this might return the reference and not the actual object so
-                        // it can be actually swapped underneath us.
-                        SubscriberFlowInfo fi = queue.peek();
-                        if (fi == null) {
-                            log.info("No more subscribers pending on {}", deviceId);
-                            queue = new LinkedBlockingQueue<>();
-                            break;
-                        }
-                        if (result == null) {
-                            // meter install sent to device
-                            log.debug("Meter {} installed for bw {} on {}", meterId, bwpInfo, deviceId);
-
-                            MeterId upMeterId = oltMeterService
-                                    .getMeterIdFromBpMapping(deviceId, fi.getUpBpInfo());
-                            MeterId downMeterId = oltMeterService
-                                    .getMeterIdFromBpMapping(deviceId, fi.getDownBpInfo());
-                            MeterId upOltMeterId = oltMeterService
-                                    .getMeterIdFromBpMapping(deviceId, fi.getUpOltBpInfo());
-                            MeterId downOltMeterId = oltMeterService
-                                    .getMeterIdFromBpMapping(deviceId, fi.getDownOltBpInfo());
-                            if (upMeterId != null && downMeterId != null &&
-                                    upOltMeterId != null && downOltMeterId != null) {
-                                log.debug("Provisioning subscriber after meter {} " +
-                                                  "installation and all meters are present " +
-                                                  "upstream {} , downstream {} , oltUpstream {} " +
-                                                  "and oltDownstream {} on {}",
-                                          meterId, upMeterId, downMeterId, upOltMeterId,
-                                          downOltMeterId, fi.getUniPort());
-                                // put in the meterIds  because when fi was first
-                                // created there may or may not have been a meterId
-                                // depending on whether the meter was created or
-                                // not at that time.
-                                //TODO possibly spawn this in a separate thread.
-                                fi.setUpMeterId(upMeterId);
-                                fi.setDownMeterId(downMeterId);
-                                fi.setUpOltMeterId(upOltMeterId);
-                                fi.setDownOltMeterId(downOltMeterId);
-                                handleSubFlowsWithMeters(fi);
-                                queue.remove(fi);
-                            } else {
-                                log.debug("Not all meters for {} are yet installed up {}, " +
-                                                 "down {}, oltUp {}, oltDown {}", fi, upMeterId,
-                                         downMeterId, upOltMeterId, downOltMeterId);
-                            }
-                            oltMeterService.removeFromPendingMeters(deviceId, bwpInfo);
-                        } else {
-                            // meter install failed
-                            log.error("Addition of subscriber {} on {} failed due to meter " +
-                                              "{} with result {}", fi, fi.getUniPort(), meterId, result);
-                            oltMeterService.removeFromPendingMeters(deviceId, bwpInfo);
-                            queue.remove(fi);
-                        }
+                if (!oltDeviceService.isLocalLeader(cp.deviceId())) {
+                    // if we're not local leader for this device, ignore this queue
+                    if (log.isTraceEnabled()) {
+                        log.trace("Ignoring queue on CP {} as not master of the device", cp);
                     }
-                } else {
-                    log.info("No pending subscribers on {}", deviceId);
-                    queue = new LinkedBlockingQueue<>();
-                }
-                return queue;
-            });
-        });
-
-    }
-
-    /**
-     * Add subscriber flows given meter information for both upstream and
-     * downstream directions.
-     *
-     * @param subscriberFlowInfo relevant information for subscriber
-     */
-    private void handleSubFlowsWithMeters(SubscriberFlowInfo subscriberFlowInfo) {
-        log.info("Provisioning subscriber flows based on {}", subscriberFlowInfo);
-        UniTagInformation tagInfo = subscriberFlowInfo.getTagInfo();
-        if (tagInfo.getIsDhcpRequired()) {
-            Optional<MacAddress> macAddress =
-                    getMacAddress(subscriberFlowInfo.getDevId(), subscriberFlowInfo.getUniPort(), tagInfo);
-            if (subscriberFlowInfo.getTagInfo().getEnableMacLearning()) {
-                ConnectPoint cp = new ConnectPoint(subscriberFlowInfo.getDevId(),
-                        subscriberFlowInfo.getUniPort().number());
-                if (macAddress.isPresent()) {
-                    log.debug("MAC Address {} obtained for {}", macAddress.get(), subscriberFlowInfo);
-                } else {
-                    waitingMacSubscribers.put(cp, subscriberFlowInfo);
-                    log.debug("Adding sub to waiting mac map: {}", subscriberFlowInfo);
+                    return;
                 }
 
-                CompletableFuture<ObjectiveError> dhcpFuture = new CompletableFuture<>();
-                oltFlowService.processDhcpFilteringObjectives(subscriberFlowInfo.getUniPort(),
-                        subscriberFlowInfo.getUpId(), subscriberFlowInfo.getUpOltId(),
-                        tagInfo, true, true, Optional.of(dhcpFuture));
-                dhcpFuture.thenAcceptAsync(dhcpStatus -> {
-                    if (dhcpStatus != null) {
-                        log.error("Dhcp Objective failed for {}: {}", subscriberFlowInfo, dhcpStatus);
-                        if (macAddress.isEmpty()) {
-                            waitingMacSubscribers.remove(cp, subscriberFlowInfo);
+                flowsExecutor.execute(() -> {
+                    if (!eventsQueue.isEmpty()) {
+                        // we do not remove the event from the queue until it has been processed
+                        // in that way we guarantee that events are processed in order
+                        DiscoveredSubscriber sub = eventsQueue.peek();
+                        if (sub == null) {
+                            // the queue is empty
+                            return;
                         }
-                        post(new AccessDeviceEvent(AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTRATION_FAILED,
-                                subscriberFlowInfo.getDevId(), subscriberFlowInfo.getUniPort().port(),
-                                tagInfo.getPonSTag(), tagInfo.getPonCTag(), tagInfo.getTechnologyProfileId()));
-                    } else {
-                        log.debug("Dhcp Objective success for: {}", subscriberFlowInfo);
-                        if (macAddress.isPresent()) {
-                            continueProvisioningSubs(subscriberFlowInfo, macAddress);
+
+                        if (log.isTraceEnabled()) {
+                            log.trace("Processing subscriber on port {} with status {}",
+                                    portWithName(sub.port), sub.status);
+                        }
+
+                        if (sub.hasSubscriber) {
+                            // this is a provision subscriber call
+                            if (oltFlowService.handleSubscriberFlows(sub, defaultBpId, multicastServiceName)) {
+                                if (log.isTraceEnabled()) {
+                                    log.trace("Provisioning of subscriber on {} completed",
+                                            portWithName(sub.port));
+                                }
+                                removeSubscriberFromQueue(sub);
+                            }
+                        } else {
+                            // this is a port event (ENABLED/DISABLED)
+                            // means no subscriber was provisioned on that port
+
+                            if (!deviceService.isAvailable(sub.device.id()) ||
+                                    deviceService.getPort(sub.device.id(), sub.port.number()) == null) {
+                                // If the device is not connected or the port is not available do nothing
+                                // This can happen when we disable and then immediately delete the device,
+                                // the queue is populated but the meters and flows are already gone
+                                // thus there is nothing left to do
+                                return;
+                            }
+
+                            if (oltFlowService.handleBasicPortFlows(sub, defaultBpId, defaultBpId)) {
+                                if (log.isTraceEnabled()) {
+                                    log.trace("Processing of port {} completed",
+                                            portWithName(sub.port));
+                                }
+                                removeSubscriberFromQueue(sub);
+                            }
                         }
                     }
                 });
-            } else {
-                log.debug("Dynamic MAC Learning disabled, so will not learn for: {}", subscriberFlowInfo);
-                // dhcp flows will handle after data plane flows
-                continueProvisioningSubs(subscriberFlowInfo, macAddress);
-            }
-        } else {
-            // dhcp not required for this service
-            continueProvisioningSubs(subscriberFlowInfo, Optional.empty());
-        }
-    }
+            });
 
-    private void continueProvisioningSubs(SubscriberFlowInfo subscriberFlowInfo, Optional<MacAddress> macAddress) {
-        AccessDevicePort uniPort = subscriberFlowInfo.getUniPort();
-        log.debug("Provisioning subscriber flows on {} based on {}", uniPort, subscriberFlowInfo);
-        UniTagInformation tagInfo = subscriberFlowInfo.getTagInfo();
-        CompletableFuture<ObjectiveError> upFuture = new CompletableFuture<>();
-        CompletableFuture<ObjectiveError> downFuture = new CompletableFuture<>();
-
-        ForwardingObjective.Builder upFwd =
-                oltFlowService.createUpBuilder(subscriberFlowInfo.getNniPort(), uniPort, subscriberFlowInfo.getUpId(),
-                        subscriberFlowInfo.getUpOltId(), subscriberFlowInfo.getTagInfo());
-        flowObjectiveService.forward(subscriberFlowInfo.getDevId(), upFwd.add(new ObjectiveContext() {
-            @Override
-            public void onSuccess(Objective objective) {
-                log.debug("Upstream HSIA flow {} installed successfully on {}", subscriberFlowInfo, uniPort);
-                upFuture.complete(null);
-            }
-
-            @Override
-            public void onError(Objective objective, ObjectiveError error) {
-                upFuture.complete(error);
-            }
-        }));
-
-        ForwardingObjective.Builder downFwd =
-                oltFlowService.createDownBuilder(subscriberFlowInfo.getNniPort(), uniPort,
-                        subscriberFlowInfo.getDownId(), subscriberFlowInfo.getDownOltId(),
-                        subscriberFlowInfo.getTagInfo(), macAddress);
-        flowObjectiveService.forward(subscriberFlowInfo.getDevId(), downFwd.add(new ObjectiveContext() {
-            @Override
-            public void onSuccess(Objective objective) {
-                log.debug("Downstream HSIA flow {} installed successfully on {}", subscriberFlowInfo, uniPort);
-                downFuture.complete(null);
-            }
-
-            @Override
-            public void onError(Objective objective, ObjectiveError error) {
-                downFuture.complete(error);
-            }
-        }));
-
-        upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> {
-            AccessDeviceEvent.Type type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTERED;
-            if (downStatus != null) {
-                log.error("Flow with innervlan {} and outerVlan {} on {} failed downstream installation: {}",
-                        tagInfo.getPonCTag(), tagInfo.getPonSTag(), uniPort, downStatus);
-                type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTRATION_FAILED;
-            } else if (upStatus != null) {
-                log.error("Flow with innervlan {} and outerVlan {} on {} failed upstream installation: {}",
-                        tagInfo.getPonCTag(), tagInfo.getPonSTag(), uniPort, upStatus);
-                type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTRATION_FAILED;
-            } else {
-                log.debug("Upstream and downstream data plane flows are installed successfully on {}", uniPort);
-                Optional<String> upstreamOltBw = tagInfo.getUpstreamOltBandwidthProfile() == null ?
-                        Optional.empty() : Optional.of(tagInfo.getUpstreamOltBandwidthProfile());
-                oltFlowService.processEapolFilteringObjectives(uniPort, tagInfo.getUpstreamBandwidthProfile(),
-                                                               upstreamOltBw, null,
-                        tagInfo.getPonCTag(), true);
-
-                if (!tagInfo.getEnableMacLearning()) {
-                    oltFlowService.processDhcpFilteringObjectives(uniPort, subscriberFlowInfo.getUpId(),
-                            subscriberFlowInfo.getUpOltId(), tagInfo, true, true, Optional.empty());
-                }
-
-                oltFlowService.processIgmpFilteringObjectives(uniPort, subscriberFlowInfo.getUpId(),
-                        subscriberFlowInfo.getUpOltId(), tagInfo, true, true);
-
-                oltFlowService.processPPPoEDFilteringObjectives(uniPort, subscriberFlowInfo.getUpId(),
-                        subscriberFlowInfo.getUpOltId(), tagInfo, true, true);
-
-                updateProgrammedSubscriber(uniPort, tagInfo, true);
-            }
-            post(new AccessDeviceEvent(type, subscriberFlowInfo.getDevId(), uniPort.port(),
-                                       tagInfo.getPonSTag(), tagInfo.getPonCTag(),
-                                       tagInfo.getTechnologyProfileId()));
-        }, oltInstallers);
-    }
-
-    /**
-     * Gets mac address from tag info if present, else checks the host service.
-     *
-     * @param deviceId device ID
-     * @param port uni port
-     * @param tagInformation tag info
-     * @return MAC Address of subscriber
-     */
-    private Optional<MacAddress> getMacAddress(DeviceId deviceId, AccessDevicePort port,
-                                               UniTagInformation tagInformation) {
-        if (isMacAddressValid(tagInformation)) {
-            log.debug("Got MAC Address {} from the uniTagInformation for {} and cTag {}",
-                    tagInformation.getConfiguredMacAddress(), port, tagInformation.getPonCTag());
-            return Optional.of(MacAddress.valueOf(tagInformation.getConfiguredMacAddress()));
-        } else if (tagInformation.getEnableMacLearning()) {
-            Optional<Host> optHost = hostService.getConnectedHosts(new ConnectPoint(deviceId, port.number()))
-                    .stream().filter(host -> host.vlan().equals(tagInformation.getPonCTag())).findFirst();
-            if (optHost.isPresent()) {
-                log.debug("Got MAC Address {} from the hostService for {} and cTag {}",
-                        optHost.get().mac(), port, tagInformation.getPonCTag());
-                return Optional.of(optHost.get().mac());
+            try {
+                TimeUnit.MILLISECONDS.sleep(requeueDelay);
+            } catch (InterruptedException e) {
+                continue;
             }
         }
-        log.debug("Could not obtain MAC Address for {} and cTag {}", port, tagInformation.getPonCTag());
-        return Optional.empty();
-    }
-
-    private boolean isMacAddressValid(UniTagInformation tagInformation) {
-        return tagInformation.getConfiguredMacAddress() != null &&
-                !tagInformation.getConfiguredMacAddress().trim().equals("") &&
-                !MacAddress.NONE.equals(MacAddress.valueOf(tagInformation.getConfiguredMacAddress()));
     }
 
     /**
@@ -1253,26 +585,26 @@
      * using the pon c tag, pon s tag and the technology profile id
      * May return Optional<null>
      *
-     * @param port        port of the subscriber
+     * @param portName  port of the subscriber
      * @param innerVlan pon c tag
      * @param outerVlan pon s tag
      * @param tpId      the technology profile id
      * @return the found uni tag information
      */
-    private Optional<UniTagInformation> getUniTagInformation(AccessDevicePort port, VlanId innerVlan,
-                                                             VlanId outerVlan, int tpId) {
+    private UniTagInformation getUniTagInformation(String portName, VlanId innerVlan,
+                                                   VlanId outerVlan, int tpId) {
         log.debug("Getting uni tag information for {}, innerVlan: {}, outerVlan: {}, tpId: {}",
-                port, innerVlan, outerVlan, tpId);
-        SubscriberAndDeviceInformation subInfo = getSubscriber(new ConnectPoint(port.deviceId(), port.number()));
+                portName, innerVlan, outerVlan, tpId);
+        SubscriberAndDeviceInformation subInfo = subsService.get(portName);
         if (subInfo == null) {
-            log.warn("Subscriber information doesn't exist for {}", port);
-            return Optional.empty();
+            log.warn("Subscriber information doesn't exist for {}", portName);
+            return null;
         }
 
         List<UniTagInformation> uniTagList = subInfo.uniTagList();
         if (uniTagList == null) {
-            log.warn("Uni tag list is not found for the subscriber {} on {}", subInfo.id(), port);
-            return Optional.empty();
+            log.warn("Uni tag list is not found for the subscriber {} on {}", subInfo.id(), portName);
+            return null;
         }
 
         UniTagInformation service = null;
@@ -1286,438 +618,298 @@
 
         if (service == null) {
             log.warn("SADIS doesn't include the service with ponCtag {} ponStag {} and tpId {} on {}",
-                     innerVlan, outerVlan, tpId, port);
-            return Optional.empty();
-        }
-
-        return Optional.of(service);
-    }
-
-    /**
-     * Creates trap flows for device, including DHCP and LLDP trap on NNI and
-     * EAPOL trap on the UNIs, if device is present in Sadis config.
-     *
-     * @param dev Device to look for
-     */
-    private void checkAndCreateDeviceFlows(Device dev) {
-        // check if this device is provisioned in Sadis
-        SubscriberAndDeviceInformation deviceInfo = getOltInfo(dev);
-        log.info("checkAndCreateDeviceFlows: deviceInfo {}", deviceInfo);
-
-        if (deviceInfo != null) {
-            log.debug("Driver for device {} is {}", dev.id(),
-                     driverService.getDriver(dev.id()));
-            for (Port p : deviceService.getPorts(dev.id())) {
-                if (PortNumber.LOCAL.equals(p.number()) || !p.isEnabled()) {
-                    continue;
-                }
-                if (isUniPort(dev, p)) {
-                    AccessDevicePort port = new AccessDevicePort(p, AccessDevicePort.Type.UNI);
-                    if (!programmedSubs.containsKey(new ConnectPoint(dev.id(), p.number()))) {
-                        log.info("Creating Eapol on {}", port);
-                        oltFlowService.processEapolFilteringObjectives(port, defaultBpId, Optional.empty(),
-                                null, VlanId.vlanId(EAPOL_DEFAULT_VLAN), true);
-                    } else {
-                        log.debug("Subscriber Eapol on {} is already provisioned, not installing default", port);
-                    }
-                } else {
-                    AccessDevicePort port = new AccessDevicePort(p, AccessDevicePort.Type.NNI);
-                    oltFlowService.processNniFilteringObjectives(port, true);
-                }
-            }
-        }
-    }
-
-
-    /**
-     * Get the uplink for of the OLT device.
-     * <p>
-     * This assumes that the OLT has a single uplink port. When more uplink ports need to be supported
-     * this logic needs to be changed
-     *
-     * @param dev Device to look for
-     * @return The uplink Port of the OLT
-     */
-    private AccessDevicePort getUplinkPort(Device dev) {
-        // check if this device is provisioned in Sadis
-        SubscriberAndDeviceInformation deviceInfo = getOltInfo(dev);
-        log.trace("getUplinkPort: deviceInfo {}", deviceInfo);
-        if (deviceInfo == null) {
-            log.warn("Device {} is not configured in SADIS .. cannot fetch device"
-                             + " info", dev.id());
+                    innerVlan, outerVlan, tpId, portName);
             return null;
         }
-        // Return the port that has been configured as the uplink port of this OLT in Sadis
-        Optional<Port> optionalPort = deviceService.getPorts(dev.id()).stream()
-                .filter(port -> isNniPort(port) ||
-                        (port.number().toLong() == deviceInfo.uplinkPort()))
-                .findFirst();
-        if (optionalPort.isPresent()) {
-            log.trace("getUplinkPort: Found port {}", optionalPort.get());
-            return new AccessDevicePort(optionalPort.get(), AccessDevicePort.Type.NNI);
-        }
 
-        log.warn("getUplinkPort: " + NO_UPLINK_PORT, dev.id());
-        return null;
+        return service;
     }
 
-    /**
-     * Return the subscriber on a port.
-     *
-     * @param cp ConnectPoint on which to find the subscriber
-     * @return subscriber if found else null
-     */
-    protected SubscriberAndDeviceInformation getSubscriber(ConnectPoint cp) {
-        if (subsService == null) {
-            log.warn(SADIS_NOT_RUNNING);
-            return null;
-        }
-        Port port = deviceService.getPort(cp);
-        checkNotNull(port, "Invalid connect point");
-        String portName = port.annotations().value(AnnotationKeys.PORT_NAME);
-        return subsService.get(portName);
+    protected void bindSadisService(SadisService service) {
+        sadisService = service;
+        subsService = sadisService.getSubscriberInfoService();
+
+        log.info("Sadis-service binds to onos.");
     }
 
-    /**
-     * Checks whether the given port of the device is a uni port or not.
-     *
-     * @param d the access device
-     * @param p the port of the device
-     * @return true if the given port is a uni port
-     */
-    private boolean isUniPort(Device d, Port p) {
-        AccessDevicePort ulPort = getUplinkPort(d);
-        if (ulPort != null) {
-            return (ulPort.number().toLong() != p.number().toLong());
-        }
-        //handles a special case where NNI port is misconfigured in SADIS and getUplinkPort(d) returns null
-        //checks whether the port name starts with nni- which is the signature of an NNI Port
-        if (p.annotations().value(AnnotationKeys.PORT_NAME) != null &&
-                p.annotations().value(AnnotationKeys.PORT_NAME).startsWith(NNI)) {
-            log.error("NNI port number {} is not matching with configured value", p.number().toLong());
-            return false;
-        }
-        return true;
+    protected void unbindSadisService(SadisService service) {
+        deviceService.removeListener(deviceListener);
+        deviceListener = null;
+        sadisService = null;
+        subsService = null;
+        log.info("Sadis-service unbinds from onos.");
     }
 
-    /**
-     * Gets the given device details from SADIS.
-     * If the device is not found, returns null
-     *
-     * @param dev the access device
-     * @return the olt information
-     */
-    private SubscriberAndDeviceInformation getOltInfo(Device dev) {
-        if (subsService == null) {
-            log.warn(SADIS_NOT_RUNNING);
-            return null;
+    protected void addSubscriberToQueue(DiscoveredSubscriber sub) {
+        ConnectPoint cp = new ConnectPoint(sub.device.id(), sub.port.number());
+        LinkedBlockingQueue<DiscoveredSubscriber> q = null;
+        try {
+            queueReadLock.lock();
+            q = eventsQueues.getOrDefault(cp, new LinkedBlockingQueue<>());
+        } finally {
+            queueReadLock.unlock();
         }
-        String devSerialNo = dev.serialNumber();
-        return subsService.get(devSerialNo);
-    }
-
-    /**
-     * Checks for mastership or falls back to leadership on deviceId.
-     * If the device is available use mastership,
-     * otherwise fallback on leadership.
-     * Leadership on the device topic is needed because the master can be NONE
-     * in case the device went away, we still need to handle events
-     * consistently
-     */
-    private boolean isLocalLeader(DeviceId deviceId) {
-        if (deviceService.isAvailable(deviceId)) {
-            return mastershipService.isLocalMaster(deviceId);
+        if (!q.contains(sub)) {
+            log.info("Adding subscriber to queue: {} with status {} and subscriber {}",
+                    portWithName(sub.port), sub.status, sub.hasSubscriber);
+            q.add(sub);
         } else {
-            // Fallback with Leadership service - device id is used as topic
-            NodeId leader = leadershipService.runForLeadership(
-                    deviceId.toString()).leaderNodeId();
-            // Verify if this node is the leader
-            return clusterService.getLocalNode().id().equals(leader);
+            log.debug("Not adding subscriber to queue as already present: {} with status {}",
+                    portWithName(sub.port), sub.status);
+            // no need to update the queue in the map if nothing has changed
+            return;
+        }
+        try {
+            queueWriteLock.lock();
+            eventsQueues.put(cp, q);
+        } catch (UnsupportedOperationException | ClassCastException |
+                NullPointerException | IllegalArgumentException e) {
+            log.error("Cannot add subscriber to queue: {}", e.getMessage());
+        } finally {
+            queueWriteLock.unlock();
         }
     }
 
-    private boolean isNniPort(Port port) {
-        if (port.annotations().keys().contains(AnnotationKeys.PORT_NAME)) {
-            return port.annotations().value(AnnotationKeys.PORT_NAME).contains(NNI);
+    protected void removeSubscriberFromQueue(DiscoveredSubscriber sub) {
+        ConnectPoint cp = new ConnectPoint(sub.device.id(), sub.port.number());
+        LinkedBlockingQueue<DiscoveredSubscriber> q = null;
+        if (log.isTraceEnabled()) {
+            log.trace("removing subscriber {} from queue", sub);
         }
-        return false;
-    }
-
-    private class InternalHostListener implements HostListener {
-        @Override
-        public void event(HostEvent event) {
-            hostEventExecutor.execute(() -> {
-                Host host = event.subject();
-                switch (event.type()) {
-                    case HOST_ADDED:
-                        ConnectPoint cp = new ConnectPoint(host.location().deviceId(), host.location().port());
-                        Optional<SubscriberFlowInfo> optSubFlowInfo =
-                                getAndRemoveWaitingMacSubFlowInfoForCTag(cp, host.vlan());
-                        if (optSubFlowInfo.isPresent()) {
-                            log.debug("Continuing provisioning for waiting mac service. event: {}", event);
-                            continueProvisioningSubs(optSubFlowInfo.get(), Optional.of(host.mac()));
-                        } else {
-                            log.debug("There is no waiting mac sub. event: {}", event);
-                        }
-                        break;
-                    case HOST_UPDATED:
-                        if (event.prevSubject() != null && !event.prevSubject().mac().equals(event.subject().mac())) {
-                            log.debug("Subscriber's MAC address changed from {} to {}. " +
-                                            "devId/portNumber: {}/{} vlan: {}", event.prevSubject().mac(),
-                                    event.subject().mac(), host.location().deviceId(), host.location().port(),
-                                    host.vlan());
-                            // TODO handle subscriber MAC Address changed
-                        } else {
-                            log.debug("Unhandled HOST_UPDATED event: {}", event);
-                        }
-                        break;
-                    default:
-                        log.debug("Unhandled host event received. event: {}", event);
-                }
-            });
+        try {
+            queueReadLock.lock();
+            q = eventsQueues.get(cp);
+        } finally {
+            queueReadLock.unlock();
+        }
+        if (q == null) {
+            log.warn("Cannot find queue for connectPoint {}", cp);
+            return;
+        }
+        boolean removed = q.remove(sub);
+        if (!removed) {
+            log.warn("Subscriber {} has not been removed from queue, is it still there? {}", sub, q);
+            return;
+        } else {
+            log.debug("Subscriber {} has been removed from the queue", sub);
         }
 
-        @Override
-        public boolean isRelevant(HostEvent event) {
-            return isLocalLeader(event.subject().location().deviceId());
+        try {
+            queueWriteLock.lock();
+            eventsQueues.remove(cp); // am I needed??
+            eventsQueues.put(cp, q);
+        } catch (UnsupportedOperationException | ClassCastException |
+                NullPointerException | IllegalArgumentException e) {
+            log.error("Cannot remove subscriber {} from queue: {}", sub, e.getMessage());
+        } finally {
+            queueWriteLock.unlock();
         }
     }
 
-    private class InternalDeviceListener implements DeviceListener {
-        private Set<DeviceId> programmedDevices = Sets.newConcurrentHashSet();
+    protected class OltDeviceListener
+            implements DeviceListener {
+
+        private final Logger log = LoggerFactory.getLogger(getClass());
+        protected ExecutorService eventExecutor;
+
+        /**
+         * Builds the listener with all the proper services and information needed.
+         */
+        public OltDeviceListener() {
+            this.eventExecutor = Executors.newFixedThreadPool(flowProcessingThreads,
+                    groupedThreads("onos/olt-device-listener-event", "event-%d", log));
+        }
+
+        public void deactivate() {
+            this.eventExecutor.shutdown();
+        }
 
         @Override
         public void event(DeviceEvent event) {
             eventExecutor.execute(() -> {
-                DeviceId devId = event.subject().id();
-                Device dev = event.subject();
-                Port p = event.port();
-                DeviceEvent.Type eventType = event.type();
-
-                if (DeviceEvent.Type.PORT_STATS_UPDATED.equals(eventType) ||
-                        DeviceEvent.Type.DEVICE_SUSPENDED.equals(eventType) ||
-                        DeviceEvent.Type.DEVICE_UPDATED.equals(eventType)) {
-                    return;
-                }
-
-                boolean isLocalLeader = isLocalLeader(devId);
-                // Only handle the event if the device belongs to us
-                if (!isLocalLeader && event.type().equals(DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED)
-                        && !deviceService.isAvailable(devId) && deviceService.getPorts(devId).isEmpty()) {
-                    log.info("Cleaning local state for non master instance upon " +
-                                     "device disconnection {}", devId);
-                    // Since no mastership of the device is present upon disconnection
-                    // the method in the FlowRuleManager only empties the local copy
-                    // of the DeviceFlowTable thus this method needs to get called
-                    // on every instance, see how it's done in the InternalDeviceListener
-                    // in FlowRuleManager: no mastership check for purgeOnDisconnection
-                    handleDeviceDisconnection(dev, false, false);
-                    return;
-                } else if (!isLocalLeader) {
-                    log.debug("Not handling event because instance is not leader for {}", devId);
-                    return;
-                }
-
-                log.debug("OLT got {} event for {}/{}", eventType, event.subject(), event.port());
-
-                if (getOltInfo(dev) == null) {
-                    // it's possible that we got an event for a previously
-                    // programmed OLT that is no longer available in SADIS
-                    // we let such events go through
-                    if (!programmedDevices.contains(devId)) {
-                        log.warn("No device info found for {}, this is either "
-                                         + "not an OLT or not known to sadis", dev);
-                        return;
-                    }
-                }
-                AccessDevicePort port = null;
-                if (p != null) {
-                    if (isUniPort(dev, p)) {
-                        port = new AccessDevicePort(p, AccessDevicePort.Type.UNI);
-                    } else {
-                        port = new AccessDevicePort(p, AccessDevicePort.Type.NNI);
-                    }
-                }
-
+                boolean isOlt = oltDeviceService.isOlt(event.subject());
+                DeviceId deviceId = event.subject().id();
                 switch (event.type()) {
-                    //TODO: Port handling and bookkeeping should be improved once
-                    // olt firmware handles correct behaviour.
-                    case PORT_ADDED:
-                        if (!deviceService.isAvailable(devId)) {
-                            log.warn("Received {} for disconnected device {}, ignoring", event, devId);
-                            return;
-                        }
-                        if (port.type().equals(AccessDevicePort.Type.UNI)) {
-                            post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, devId, port.port()));
-
-                            if (port.isEnabled() && !port.number().equals(PortNumber.LOCAL)) {
-                                log.info("eapol will be sent for port added {}", port);
-                                oltFlowService.processEapolFilteringObjectives(port, defaultBpId, Optional.empty(),
-                                                                               null,
-                                                                               VlanId.vlanId(EAPOL_DEFAULT_VLAN),
-                                                                               true);
-                            }
-                        } else {
-                            SubscriberAndDeviceInformation deviceInfo = getOltInfo(dev);
-                            if (deviceInfo != null) {
-                                oltFlowService.processNniFilteringObjectives(port, true);
-                            }
-                        }
-                        break;
-                    case PORT_REMOVED:
-                        if (port.type().equals(AccessDevicePort.Type.UNI)) {
-                            // if no subscriber is provisioned we need to remove the default EAPOL
-                            // if a subscriber was provisioned the default EAPOL will not be there and we can skip.
-                            // The EAPOL with subscriber tag will be removed by removeSubscriber call.
-                            Collection<? extends UniTagInformation> uniTagInformationSet =
-                                    programmedSubs.get(new ConnectPoint(port.deviceId(), port.number())).value();
-                            if (uniTagInformationSet == null || uniTagInformationSet.isEmpty()) {
-                                log.info("No subscriber provisioned on port {} in PORT_REMOVED event, " +
-                                                 "removing default EAPOL flow", port);
-                                oltFlowService.processEapolFilteringObjectives(port, defaultBpId, Optional.empty(),
-                                        null, VlanId.vlanId(EAPOL_DEFAULT_VLAN), false);
-                            } else {
-                                removeSubscriber(port);
-                            }
-
-                            post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_REMOVED, devId, port.port()));
-                        }
-                        break;
-                    case PORT_UPDATED:
-                        if (!deviceService.isAvailable(devId)) {
-                            log.warn("Received {} for disconnected device {}, ignoring", event, devId);
-                            return;
-                        }
-                        if (port.type().equals(AccessDevicePort.Type.NNI)) {
-                            SubscriberAndDeviceInformation deviceInfo = getOltInfo(dev);
-                            if (deviceInfo != null && port.isEnabled()) {
-                                log.debug("NNI {} enabled", port);
-                                oltFlowService.processNniFilteringObjectives(port, true);
-                            }
-                            return;
-                        }
-                        ConnectPoint cp = new ConnectPoint(devId, port.number());
-                        Collection<? extends UniTagInformation> uniTagInformationSet = programmedSubs.get(cp).value();
-                        if (uniTagInformationSet == null || uniTagInformationSet.isEmpty()) {
-                            if (!port.number().equals(PortNumber.LOCAL)) {
-                                log.info("eapol will be {} updated for {} with default vlan {}",
-                                         (port.isEnabled()) ? "added" : "removed", port, EAPOL_DEFAULT_VLAN);
-                                oltFlowService.processEapolFilteringObjectives(port, defaultBpId, Optional.empty(),
-                                        null, VlanId.vlanId(EAPOL_DEFAULT_VLAN), port.isEnabled());
-                            }
-                        } else {
-                            log.info("eapol will be {} updated for {}", (port.isEnabled()) ? "added" : "removed",
-                                     port);
-                            for (UniTagInformation uniTag : uniTagInformationSet) {
-                                oltFlowService.processEapolFilteringObjectives(port,
-                                        uniTag.getUpstreamBandwidthProfile(),
-                                        Optional.of(uniTag.getUpstreamOltBandwidthProfile()),
-                                        null, uniTag.getPonCTag(), port.isEnabled());
-                            }
-                        }
-                        if (port.isEnabled()) {
-                            post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, devId, port.port()));
-                        } else {
-                            post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_REMOVED, devId, port.port()));
-                        }
-                        break;
+                    case PORT_STATS_UPDATED:
                     case DEVICE_ADDED:
-                        handleDeviceConnection(dev, true);
-                        break;
-                    case DEVICE_REMOVED:
-                        handleDeviceDisconnection(dev, true, true);
-                        break;
-                    case DEVICE_AVAILABILITY_CHANGED:
-                        if (deviceService.isAvailable(devId)) {
-                            log.info("Handling available device: {}", dev.id());
-                            handleDeviceConnection(dev, false);
-                        } else {
-                            if (deviceService.getPorts(devId).isEmpty()) {
-                                log.info("Handling controlled device disconnection .. "
-                                                 + "flushing all state for dev:{}", devId);
-                                handleDeviceDisconnection(dev, true, false);
-                            } else {
-                                log.info("Disconnected device has available ports .. "
-                                                 + "assuming temporary disconnection, "
-                                                 + "retaining state for device {}", devId);
-                            }
-                        }
-                        break;
-                    default:
-                        log.debug("Not handling event {}", event);
                         return;
+                    case PORT_ADDED:
+                    case PORT_UPDATED:
+                    case PORT_REMOVED:
+                        if (!isOlt) {
+                            log.trace("Ignoring event {}, this is not an OLT device", deviceId);
+                            return;
+                        }
+                        if (!oltDeviceService.isLocalLeader(deviceId)) {
+                            log.trace("Device {} is not local to this node", deviceId);
+                            return;
+                        }
+                        // port added, updated and removed are treated in the same way as we only care whether the port
+                        // is enabled or not
+                        handleOltPort(event.type(), event.subject(), event.port());
+                        return;
+                    case DEVICE_AVAILABILITY_CHANGED:
+                        if (!isOlt) {
+                            log.trace("Ignoring event {}, this is not an OLT device", deviceId);
+                            return;
+                        }
+                        if (deviceService.isAvailable(deviceId)) {
+                            if (!oltDeviceService.isLocalLeader(deviceId)) {
+                                if (log.isTraceEnabled()) {
+                                    log.trace("Device {} is not local to this node, not handling available device",
+                                            deviceId);
+                                }
+                            } else {
+                                log.info("Handling available device: {}", deviceId);
+                                handleExistingPorts();
+                            }
+                        } else if (!deviceService.isAvailable(deviceId) && deviceService.getPorts(deviceId).isEmpty()) {
+                            // NOTE that upon disconnection there is no mastership on the device,
+                            // and we should anyway clear the local cache of the flows/meters across instances.
+                            // We're only clearing the device if there are no available ports,
+                            // otherwise we assume it's a temporary disconnection
+                            log.info("Device {} availability changed to false and ports are empty, " +
+                                    "purging meters and flows", deviceId);
+                            //NOTE all the instances will call these methods
+                            oltFlowService.purgeDeviceFlows(deviceId);
+                            oltMeterService.purgeDeviceMeters(deviceId);
+                            clearQueueForDevice(deviceId);
+                        } else {
+                            log.info("Device {} availability changed to false, but ports are still available, " +
+                                            "assuming temporary disconnection. Ports: {}",
+                                    deviceId, deviceService.getPorts(deviceId));
+                        }
+                        return;
+                    case DEVICE_REMOVED:
+                        if (!isOlt) {
+                            log.trace("Ignoring event {}, this is not an OLT device", deviceId);
+                            return;
+                        }
+                        log.info("Device {} Removed, purging meters and flows", deviceId);
+                        oltFlowService.purgeDeviceFlows(deviceId);
+                        oltMeterService.purgeDeviceMeters(deviceId);
+                        clearQueueForDevice(deviceId);
+                        return;
+                    default:
+                        if (log.isTraceEnabled()) {
+                            log.trace("Not handling event: {}, ", event);
+                        }
                 }
             });
         }
 
-        private void sendUniEvent(Device device, AccessDeviceEvent.Type eventType) {
-            deviceService.getPorts(device.id()).stream()
-                    .filter(p -> !PortNumber.LOCAL.equals(p.number()))
-                    .filter(p -> isUniPort(device, p))
-                    .forEach(p -> post(new AccessDeviceEvent(eventType, device.id(), p)));
-        }
-
-        private void handleDeviceDisconnection(Device device, boolean sendDisconnectedEvent, boolean sendUniEvent) {
-            programmedDevices.remove(device.id());
-            removeAllSubscribers(device.id());
-            removeWaitingMacSubs(device.id());
-            //Handle case where OLT disconnects during subscriber provisioning
-            pendingSubscribersForDevice.remove(device.id());
-            oltFlowService.clearDeviceState(device.id());
-
-            //Complete meter and flow purge
-            flowRuleService.purgeFlowRules(device.id());
-            oltMeterService.clearMeters(device.id());
-            if (sendDisconnectedEvent) {
-                post(new AccessDeviceEvent(
-                        AccessDeviceEvent.Type.DEVICE_DISCONNECTED, device.id(),
-                        null, null, null));
-            }
-            if (sendUniEvent) {
-                sendUniEvent(device, AccessDeviceEvent.Type.UNI_REMOVED);
+        protected void clearQueueForDevice(DeviceId devId) {
+            try {
+                queueWriteLock.lock();
+                Iterator<Map.Entry<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>>> iter =
+                        eventsQueues.entrySet().iterator();
+                while (iter.hasNext()) {
+                    Map.Entry<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>> entry = iter.next();
+                    if (entry.getKey().deviceId().equals(devId)) {
+                        eventsQueues.remove(entry.getKey());
+                    }
+                }
+            } finally {
+                queueWriteLock.unlock();
             }
         }
 
-        private void handleDeviceConnection(Device dev, boolean sendUniEvent) {
-            post(new AccessDeviceEvent(
-                    AccessDeviceEvent.Type.DEVICE_CONNECTED, dev.id(),
-                    null, null, null));
-            programmedDevices.add(dev.id());
-            checkAndCreateDeviceFlows(dev);
-            if (sendUniEvent) {
-                sendUniEvent(dev, AccessDeviceEvent.Type.UNI_ADDED);
+        protected void handleOltPort(DeviceEvent.Type type, Device device, Port port) {
+            log.info("OltDeviceListener receives event {} for port {} with status {} on device {}", type,
+                    portWithName(port), port.isEnabled() ? "ENABLED" : "DISABLED", device.id());
+            if (port.isEnabled()) {
+                if (oltDeviceService.isNniPort(device, port.number())) {
+                    // NOTE in the NNI case we receive a PORT_REMOVED event with status ENABLED, thus we need to
+                    // pass the floeAction to the handleNniFlows method
+                    OltFlowService.FlowOperation action = OltFlowService.FlowOperation.ADD;
+                    if (type == DeviceEvent.Type.PORT_REMOVED) {
+                        action = OltFlowService.FlowOperation.REMOVE;
+                    }
+                    oltFlowService.handleNniFlows(device, port, action);
+                } else {
+                    post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, device.id(), port));
+                    // NOTE if the subscriber was previously provisioned,
+                    // then add it back to the queue to be re-provisioned
+                    boolean provisionSubscriber = oltFlowService.
+                            isSubscriberServiceProvisioned(new AccessDevicePort(port));
+                    SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
+                    if (si == null) {
+                        //NOTE this should not happen given that the subscriber was provisioned before
+                        log.error("Subscriber information not found in sadis for port {}",
+                                portWithName(port));
+                        return;
+                    }
+
+                    DiscoveredSubscriber.Status status = DiscoveredSubscriber.Status.ADDED;
+                    if (type == DeviceEvent.Type.PORT_REMOVED) {
+                        status = DiscoveredSubscriber.Status.REMOVED;
+                    }
+
+                    DiscoveredSubscriber sub =
+                            new DiscoveredSubscriber(device, port,
+                                    status, provisionSubscriber, si);
+                    addSubscriberToQueue(sub);
+                }
+            } else {
+                if (oltDeviceService.isNniPort(device, port.number())) {
+                    // NOTE this may need to be handled on DEVICE_REMOVE as we don't disable the NNI
+                    oltFlowService.handleNniFlows(device, port, OltFlowService.FlowOperation.REMOVE);
+                } else {
+                    post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_REMOVED, device.id(), port));
+                    // NOTE we are assuming that if a subscriber has default eapol
+                    // it does not have subscriber flows
+                    if (oltFlowService.hasDefaultEapol(port)) {
+                        SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
+                        if (si == null) {
+                            //NOTE this should not happen given that the subscriber was provisioned before
+                            log.error("Subscriber information not found in sadis for port {}",
+                                    portWithName(port));
+                            return;
+                        }
+                        DiscoveredSubscriber sub =
+                                new DiscoveredSubscriber(device, port,
+                                        DiscoveredSubscriber.Status.REMOVED, false, si);
+
+                        addSubscriberToQueue(sub);
+
+                    } else if (oltFlowService.isSubscriberServiceProvisioned(new AccessDevicePort(port))) {
+                        SubscriberAndDeviceInformation si = subsService.get(getPortName(port));
+                        if (si == null) {
+                            //NOTE this should not happen given that the subscriber was provisioned before
+                            log.error("Subscriber information not found in sadis for port {}",
+                                    portWithName(port));
+                            return;
+                        }
+                        DiscoveredSubscriber sub =
+                                new DiscoveredSubscriber(device, port,
+                                        DiscoveredSubscriber.Status.REMOVED, true, si);
+                        addSubscriberToQueue(sub);
+                    }
+                }
             }
         }
 
-        private void removeAllSubscribers(DeviceId deviceId) {
-            List<Map.Entry<ConnectPoint, UniTagInformation>> subs = programmedSubs.stream()
-                    .filter(e -> e.getKey().deviceId().equals(deviceId))
-                    .collect(toList());
-
-            subs.forEach(e -> programmedSubs.remove(e.getKey(), e.getValue()));
-        }
-
-        private void removeWaitingMacSubs(DeviceId deviceId) {
-            List<ConnectPoint> waitingMacKeys = waitingMacSubscribers.stream()
-                    .filter(cp -> cp.getKey().deviceId().equals(deviceId))
-                    .map(Map.Entry::getKey)
-                    .collect(toList());
-            waitingMacKeys.forEach(cp -> waitingMacSubscribers.removeAll(cp));
-        }
-
-    }
-
-    private class InternalClusterListener implements ClusterEventListener {
-
-        @Override
-        public void event(ClusterEvent event) {
-            if (event.type() == ClusterEvent.Type.INSTANCE_READY) {
-                hasher.addServer(event.subject().id());
-            }
-            if (event.type() == ClusterEvent.Type.INSTANCE_DEACTIVATED) {
-                hasher.removeServer(event.subject().id());
+        /**
+         * This method is invoked on app activation in order to deal
+         * with devices and ports that are already existing in the system
+         * and thus won't trigger an event.
+         * It is also needed on instance reboot and device reconnect
+         */
+        protected void handleExistingPorts() {
+            Iterable<DeviceId> devices = getConnectedOlts();
+            for (DeviceId deviceId : devices) {
+                log.info("Handling existing OLT Ports for device {}", deviceId);
+                if (oltDeviceService.isLocalLeader(deviceId)) {
+                    List<Port> ports = deviceService.getPorts(deviceId);
+                    for (Port p : ports) {
+                        if (PortNumber.LOCAL.equals(p.number()) || !p.isEnabled()) {
+                            continue;
+                        }
+                        Device device = deviceService.getDevice(deviceId);
+                        deviceListener.handleOltPort(DeviceEvent.Type.PORT_UPDATED, device, p);
+                    }
+                }
             }
         }
     }
-
 }
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltDeviceService.java b/impl/src/main/java/org/opencord/olt/impl/OltDeviceService.java
new file mode 100644
index 0000000..b336c28
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/impl/OltDeviceService.java
@@ -0,0 +1,156 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.impl;
+
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DeviceService;
+import org.opencord.sadis.BaseInformationService;
+import org.opencord.sadis.SadisService;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.ReferencePolicy;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Optional;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * The implementation of the OltDeviceService.
+ */
+@Component(immediate = true)
+public class OltDeviceService implements OltDeviceServiceInterface {
+
+    protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
+    private final Logger log = getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.OPTIONAL,
+            bind = "bindSadisService",
+            unbind = "unbindSadisService",
+            policy = ReferencePolicy.DYNAMIC)
+    protected volatile SadisService sadisService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected DeviceService deviceService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected MastershipService mastershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected LeadershipService leadershipService;
+
+    @Activate
+    public void activate() {
+        log.info("Activated");
+    }
+
+    /**
+     * Returns true if the device is an OLT.
+     *
+     * @param device the Device to be checked
+     * @return boolean
+     */
+    public boolean isOlt(Device device) {
+        return getOltInfo(device) != null;
+    }
+
+    private SubscriberAndDeviceInformation getOltInfo(Device dev) {
+        if (subsService == null) {
+            return null;
+        }
+        String devSerialNo = dev.serialNumber();
+        return subsService.get(devSerialNo);
+    }
+
+
+    /**
+     * Returns true if the port is an NNI Port on the OLT.
+     * NOTE: We can check if a port is a NNI based on the SADIS config, specifically the uplinkPort section
+     *
+     * @param dev  the Device this port belongs to
+     * @param portNumber the PortNumber to be checked
+     * @return boolean
+     */
+    @Override
+    public boolean isNniPort(Device dev, PortNumber portNumber) {
+        SubscriberAndDeviceInformation deviceInfo = getOltInfo(dev);
+        return deviceInfo != null && portNumber.toLong() == deviceInfo.uplinkPort();
+    }
+
+    @Override
+    public Optional<Port> getNniPort(Device device) {
+        SubscriberAndDeviceInformation deviceInfo = getOltInfo(device);
+        if (deviceInfo == null) {
+            return Optional.empty();
+        }
+        List<Port> ports = deviceService.getPorts(device.id());
+        if (log.isTraceEnabled()) {
+            log.trace("Get NNI Port looks for NNI in: {}", ports);
+        }
+        return ports.stream()
+                .filter(p -> p.number().toLong() == deviceInfo.uplinkPort())
+                .findFirst();
+    }
+
+    protected void bindSadisService(SadisService service) {
+        this.subsService = service.getSubscriberInfoService();
+        log.info("Sadis service is loaded");
+    }
+
+    protected void unbindSadisService(SadisService service) {
+        this.subsService = null;
+        log.info("Sadis service is unloaded");
+    }
+
+    /**
+     * Checks for mastership or falls back to leadership on deviceId.
+     * If the device is available use mastership,
+     * otherwise fallback on leadership.
+     * Leadership on the device topic is needed because the master can be NONE
+     * in case the device went away, we still need to handle events
+     * consistently
+     *
+     * @param deviceId The device ID to check.
+     * @return boolean (true if the current instance is managing the device)
+     */
+    @Override
+    public boolean isLocalLeader(DeviceId deviceId) {
+        if (deviceService.isAvailable(deviceId)) {
+            return mastershipService.isLocalMaster(deviceId);
+        } else {
+            // Fallback with Leadership service - device id is used as topic
+            NodeId leader = leadershipService.runForLeadership(
+                    deviceId.toString()).leaderNodeId();
+            // Verify if this node is the leader
+            return clusterService.getLocalNode().id().equals(leader);
+        }
+    }
+}
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltDeviceServiceInterface.java b/impl/src/main/java/org/opencord/olt/impl/OltDeviceServiceInterface.java
new file mode 100644
index 0000000..e8fcccb
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/impl/OltDeviceServiceInterface.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.impl;
+
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
+
+import java.util.Optional;
+
+/**
+ * Service for olt device handling.
+ */
+public interface OltDeviceServiceInterface {
+    /**
+     * Returns true if the device is a known OLT to sadis/config.
+     * @param device the device
+     * @return true if a configured olt
+     */
+    boolean isOlt(Device device);
+
+    /**
+     * Returns true if the port is an NNI port of the OLT device.
+     * @param device the device
+     * @param port the port
+     * @return true if an NNI port of that OLT
+     */
+    boolean isNniPort(Device device, PortNumber port);
+
+    /**
+     * Returns the NNi port fo the OLT device if present.
+     * @param device the device
+     * @return the nni Port, if present
+     */
+    Optional<Port> getNniPort(Device device);
+
+    /**
+     * Returns true if the instance is leader for the OLT device.
+     * @param deviceId the device
+     * @return true if master, false otherwise.
+     */
+    boolean isLocalLeader(DeviceId deviceId);
+}
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
index 1868421..ee184a5 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-present Open Networking Foundation
+ * Copyright 2021-present Open Networking Foundation
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -13,8 +13,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.opencord.olt.impl;
 
+import com.google.common.collect.ImmutableMap;
 import org.onlab.packet.EthType;
 import org.onlab.packet.IPv4;
 import org.onlab.packet.IPv6;
@@ -26,18 +28,31 @@
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
-import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.Annotations;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.Host;
+import org.onosproject.net.Port;
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.flow.DefaultTrafficSelector;
 import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleEvent;
+import org.onosproject.net.flow.FlowRuleListener;
+import org.onosproject.net.flow.FlowRuleService;
 import org.onosproject.net.flow.TrafficSelector;
 import org.onosproject.net.flow.TrafficTreatment;
 import org.onosproject.net.flow.criteria.Criteria;
+import org.onosproject.net.flow.criteria.Criterion;
+import org.onosproject.net.flow.criteria.EthTypeCriterion;
+import org.onosproject.net.flow.criteria.IPProtocolCriterion;
+import org.onosproject.net.flow.criteria.PortCriterion;
+import org.onosproject.net.flow.criteria.UdpPortCriterion;
+import org.onosproject.net.flow.criteria.VlanIdCriterion;
+import org.onosproject.net.flow.instructions.L2ModificationInstruction;
 import org.onosproject.net.flowobjective.DefaultFilteringObjective;
 import org.onosproject.net.flowobjective.DefaultForwardingObjective;
 import org.onosproject.net.flowobjective.FilteringObjective;
@@ -46,13 +61,11 @@
 import org.onosproject.net.flowobjective.Objective;
 import org.onosproject.net.flowobjective.ObjectiveContext;
 import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.host.HostService;
 import org.onosproject.net.meter.MeterId;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
-import org.opencord.olt.AccessDevicePort;
-import org.opencord.olt.internalapi.AccessDeviceFlowService;
-import org.opencord.olt.internalapi.AccessDeviceMeterService;
 import org.opencord.sadis.BandwidthProfileInformation;
 import org.opencord.sadis.BaseInformationService;
 import org.opencord.sadis.SadisService;
@@ -67,26 +80,46 @@
 import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.osgi.service.component.annotations.ReferencePolicy;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
 import java.util.Dictionary;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static com.google.common.base.Strings.isNullOrEmpty;
 import static org.onlab.util.Tools.get;
-import static org.opencord.olt.impl.OsgiPropertyConstants.*;
-import static org.slf4j.LoggerFactory.getLogger;
+import static org.onosproject.net.flow.instructions.Instruction.Type.L2MODIFICATION;
+import static org.opencord.olt.impl.OltUtils.completeFlowOpToString;
+import static org.opencord.olt.impl.OltUtils.flowOpToString;
+import static org.opencord.olt.impl.OltUtils.getPortName;
+import static org.opencord.olt.impl.OltUtils.portWithName;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_TP_ID;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_TP_ID_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DOWNSTREAM_OLT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DOWNSTREAM_ONU;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_ON_NNI;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_ON_NNI_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_V4;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_V4_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_V6;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_V6_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_EAPOL;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_EAPOL_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_IGMP_ON_NNI;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_IGMP_ON_NNI_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_PPPOE;
+import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_PPPOE_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.UPSTREAM_OLT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.UPSTREAM_ONU;
+import static org.opencord.olt.impl.OsgiPropertyConstants.WAIT_FOR_REMOVAL;
+import static org.opencord.olt.impl.OsgiPropertyConstants.WAIT_FOR_REMOVAL_DEFAULT;
 
-/**
- * Provisions flow rules on access devices.
- */
 @Component(immediate = true, property = {
         ENABLE_DHCP_ON_NNI + ":Boolean=" + ENABLE_DHCP_ON_NNI_DEFAULT,
         ENABLE_DHCP_V4 + ":Boolean=" + ENABLE_DHCP_V4_DEFAULT,
@@ -94,32 +127,20 @@
         ENABLE_IGMP_ON_NNI + ":Boolean=" + ENABLE_IGMP_ON_NNI_DEFAULT,
         ENABLE_EAPOL + ":Boolean=" + ENABLE_EAPOL_DEFAULT,
         ENABLE_PPPOE + ":Boolean=" + ENABLE_PPPOE_DEFAULT,
-        DEFAULT_TP_ID + ":Integer=" + DEFAULT_TP_ID_DEFAULT
+        DEFAULT_TP_ID + ":Integer=" + DEFAULT_TP_ID_DEFAULT,
+        // FIXME remove this option as potentially dangerous in production
+        WAIT_FOR_REMOVAL + ":Boolean=" + WAIT_FOR_REMOVAL_DEFAULT
 })
-public class OltFlowService implements AccessDeviceFlowService {
-    private static final String SADIS_NOT_RUNNING = "Sadis is not running.";
-    private static final String APP_NAME = "org.opencord.olt";
-    private static final int NONE_TP_ID = -1;
-    private static final int NO_PCP = -1;
-    private static final Integer MAX_PRIORITY = 10000;
-    private static final Integer MIN_PRIORITY = 1000;
-    private static final String INSTALLED = "installed";
-    private static final String REMOVED = "removed";
-    private static final String INSTALLATION = "installation";
-    private static final String REMOVAL = "removal";
-    private static final String V4 = "V4";
-    private static final String V6 = "V6";
-
-    private final Logger log = getLogger(getClass());
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected FlowObjectiveService flowObjectiveService;
+public class OltFlowService implements OltFlowServiceInterface {
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected CoreService coreService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected MastershipService mastershipService;
+    protected ComponentConfigService cfgService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected FlowObjectiveService flowObjectiveService;
 
     @Reference(cardinality = ReferenceCardinality.OPTIONAL,
             bind = "bindSadisService",
@@ -128,17 +149,65 @@
     protected volatile SadisService sadisService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected OltMeterServiceInterface oltMeterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected OltDeviceServiceInterface oltDeviceService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected FlowRuleService flowRuleService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected HostService hostService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected DeviceService deviceService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected AccessDeviceMeterService oltMeterService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected ComponentConfigService componentConfigService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected StorageService storageService;
 
+    protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
+    protected BaseInformationService<BandwidthProfileInformation> bpService;
+
+    private static final String APP_NAME = "org.opencord.olt";
+    protected ApplicationId appId;
+    private static final Integer MAX_PRIORITY = 10000;
+    private static final Integer MIN_PRIORITY = 1000;
+    private static final short EAPOL_DEFAULT_VLAN = 4091;
+    private static final int NONE_TP_ID = -1;
+    private static final String V4 = "V4";
+    private static final String V6 = "V6";
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    protected UniTagInformation defaultEapolUniTag = new UniTagInformation.Builder()
+            .setServiceName("defaultEapol").build();
+    protected UniTagInformation nniUniTag = new UniTagInformation.Builder()
+            .setServiceName("nni")
+            .setTechnologyProfileId(NONE_TP_ID)
+            .setPonCTag(VlanId.NONE)
+            .setUniTagMatch(VlanId.ANY)
+            .setUsPonCTagPriority(-1)
+            .build();
+
+    /**
+     * Connect Point status map.
+     * Used to keep track of which cp has flows that needs to be removed when the status changes.
+     */
+    protected Map<ServiceKey, OltPortStatus> cpStatus;
+    private final ReentrantReadWriteLock cpStatusLock = new ReentrantReadWriteLock();
+    private final Lock cpStatusWriteLock = cpStatusLock.writeLock();
+    private final Lock cpStatusReadLock = cpStatusLock.readLock();
+
+    /**
+     * This map contains the subscriber that have been provisioned by the operator.
+     * They may or may not have flows, depending on the port status.
+     * The map is used to define whether flows need to be provisioned when a port comes up.
+     */
+    protected Map<ServiceKey, Boolean> provisionedSubscribers;
+    private final ReentrantReadWriteLock provisionedSubscribersLock = new ReentrantReadWriteLock();
+    private final Lock provisionedSubscribersWriteLock = provisionedSubscribersLock.writeLock();
+    private final Lock provisionedSubscribersReadLock = provisionedSubscribersLock.readLock();
+
     /**
      * Create DHCP trap flow on NNI port(s).
      */
@@ -174,42 +243,76 @@
      **/
     protected int defaultTechProfileId = DEFAULT_TP_ID_DEFAULT;
 
-    protected ApplicationId appId;
-    protected BaseInformationService<BandwidthProfileInformation> bpService;
-    protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
-    protected Map<DeviceId, BlockingQueue<SubscriberFlowInfo>> pendingEapolForDevice;
+    protected boolean waitForRemoval = WAIT_FOR_REMOVAL_DEFAULT;
+
+    public enum FlowOperation {
+        ADD,
+        REMOVE;
+
+
+        @Override
+        public String toString() {
+            return super.toString().toLowerCase();
+        }
+    }
+
+    public enum FlowDirection {
+        UPSTREAM,
+        DOWNSTREAM,
+    }
+
+    public enum OltFlowsStatus {
+        NONE,
+        PENDING_ADD,
+        ADDED,
+        PENDING_REMOVE,
+        REMOVED,
+        ERROR
+    }
+
+    protected InternalFlowListener internalFlowListener;
 
     @Activate
     public void activate(ComponentContext context) {
-        if (sadisService != null) {
-            bpService = sadisService.getBandwidthProfileService();
-            subsService = sadisService.getSubscriberInfoService();
-        } else {
-            log.warn(SADIS_NOT_RUNNING);
-        }
-        componentConfigService.registerProperties(getClass());
-        appId = coreService.getAppId(APP_NAME);
+        cfgService.registerProperties(getClass());
+        appId = coreService.registerApplication(APP_NAME);
+        internalFlowListener = new InternalFlowListener();
+
+        modified(context);
+
         KryoNamespace serializer = KryoNamespace.newBuilder()
                 .register(KryoNamespaces.API)
-                .register(UniTagInformation.class)
-                .register(SubscriberFlowInfo.class)
+                .register(OltFlowsStatus.class)
+                .register(FlowDirection.class)
+                .register(OltPortStatus.class)
+                .register(OltFlowsStatus.class)
                 .register(AccessDevicePort.class)
-                .register(AccessDevicePort.Type.class)
-                .register(LinkedBlockingQueue.class)
+                .register(new ServiceKeySerializer(), ServiceKey.class)
+                .register(UniTagInformation.class)
                 .build();
-        pendingEapolForDevice = storageService.<DeviceId, BlockingQueue<SubscriberFlowInfo>>consistentMapBuilder()
-                .withName("volt-pending-eapol")
-                .withSerializer(Serializer.using(serializer))
-                .withApplicationId(appId)
-                .build().asJavaMap();
-        log.info("started");
-    }
 
+        cpStatus = storageService.<ServiceKey, OltPortStatus>consistentMapBuilder()
+                .withName("volt-cp-status")
+                .withApplicationId(appId)
+                .withSerializer(Serializer.using(serializer))
+                .build().asJavaMap();
+
+        provisionedSubscribers = storageService.<ServiceKey, Boolean>consistentMapBuilder()
+                .withName("volt-provisioned-subscriber")
+                .withApplicationId(appId)
+                .withSerializer(Serializer.using(serializer))
+                .build().asJavaMap();
+
+        flowRuleService.addListener(internalFlowListener);
+
+        log.info("Started");
+    }
 
     @Deactivate
     public void deactivate(ComponentContext context) {
-        componentConfigService.unregisterProperties(getClass(), false);
-        log.info("stopped");
+        cfgService.unregisterProperties(getClass(), false);
+        flowRuleService.removeListener(internalFlowListener);
+        log.info("Stopped");
     }
 
     @Modified
@@ -247,106 +350,712 @@
             enablePppoe = pppoe;
         }
 
+        Boolean wait = Tools.isPropertyEnabled(properties, WAIT_FOR_REMOVAL);
+        if (wait != null) {
+            waitForRemoval = wait;
+        }
+
         String tpId = get(properties, DEFAULT_TP_ID);
         defaultTechProfileId = isNullOrEmpty(tpId) ? DEFAULT_TP_ID_DEFAULT : Integer.parseInt(tpId.trim());
 
-        log.info("modified. Values = enableDhcpOnNni: {}, enableDhcpV4: {}, " +
-                         "enableDhcpV6:{}, enableIgmpOnNni:{}, " +
-                         "enableEapol:{}, enablePppoe:{}, defaultTechProfileId:{}",
-                 enableDhcpOnNni, enableDhcpV4, enableDhcpV6,
-                 enableIgmpOnNni, enableEapol,  enablePppoe,
-                 defaultTechProfileId);
+        log.info("Modified. Values = enableDhcpOnNni: {}, enableDhcpV4: {}, " +
+                        "enableDhcpV6:{}, enableIgmpOnNni:{}, " +
+                        "enableEapol:{}, enablePppoe:{}, defaultTechProfileId:{}," +
+                        "waitForRemoval:{}",
+                enableDhcpOnNni, enableDhcpV4, enableDhcpV6,
+                enableIgmpOnNni, enableEapol, enablePppoe,
+                defaultTechProfileId, waitForRemoval);
 
     }
 
-    protected void bindSadisService(SadisService service) {
-        sadisService = service;
-        bpService = sadisService.getBandwidthProfileService();
-        subsService = sadisService.getSubscriberInfoService();
-        log.info("Sadis-service binds to onos.");
-    }
-
-    protected void unbindSadisService(SadisService service) {
-        sadisService = null;
-        bpService = null;
-        subsService = null;
-        log.info("Sadis-service unbinds from onos.");
-    }
-
     @Override
-    public void processDhcpFilteringObjectives(AccessDevicePort port,
-                                               MeterId upstreamMeterId,
-                                               MeterId upstreamOltMeterId,
-                                               UniTagInformation tagInformation,
-                                               boolean install,
-                                               boolean upstream,
-                                               Optional<CompletableFuture<ObjectiveError>> dhcpFuture) {
-        if (upstream) {
-            // for UNI ports
-            if (tagInformation != null && !tagInformation.getIsDhcpRequired()) {
-                log.debug("Dhcp provisioning is disabled for UNI port {} for service {}",
-                        port, tagInformation.getServiceName());
-                dhcpFuture.ifPresent(f -> f.complete(null));
-                return;
-            }
-        } else {
-            // for NNI ports
-            if (!enableDhcpOnNni) {
-                log.debug("Dhcp provisioning is disabled for NNI port {}", port);
-                dhcpFuture.ifPresent(f -> f.complete(null));
-                return;
-            }
-        }
-        int techProfileId = tagInformation != null ? tagInformation.getTechnologyProfileId() : NONE_TP_ID;
-        VlanId cTag = tagInformation != null ? tagInformation.getPonCTag() : VlanId.NONE;
-        VlanId unitagMatch = tagInformation != null ? tagInformation.getUniTagMatch() : VlanId.ANY;
-        Byte vlanPcp = tagInformation != null && tagInformation.getUsPonCTagPriority() != NO_PCP
-                ? (byte) tagInformation.getUsPonCTagPriority() : null;
-
-
-        if (enableDhcpV4) {
-            int udpSrc = (upstream) ? 68 : 67;
-            int udpDst = (upstream) ? 67 : 68;
-
-            EthType ethType = EthType.EtherType.IPV4.ethType();
-            byte protocol = IPv4.PROTOCOL_UDP;
-
-            addDhcpFilteringObjectives(port, udpSrc, udpDst, ethType,
-                    upstreamMeterId, upstreamOltMeterId, techProfileId, protocol, cTag, unitagMatch,
-                    vlanPcp, upstream, install, dhcpFuture);
-        }
-
-        if (enableDhcpV6) {
-            int udpSrc = (upstream) ? 547 : 546;
-            int udpDst = (upstream) ? 546 : 547;
-
-            EthType ethType = EthType.EtherType.IPV6.ethType();
-            byte protocol = IPv6.PROTOCOL_UDP;
-
-            addDhcpFilteringObjectives(port, udpSrc, udpDst, ethType,
-                    upstreamMeterId, upstreamOltMeterId, techProfileId, protocol, cTag, unitagMatch,
-                    vlanPcp, upstream, install, dhcpFuture);
+    public ImmutableMap<ServiceKey, OltPortStatus> getConnectPointStatus() {
+        try {
+            cpStatusReadLock.lock();
+            return ImmutableMap.copyOf(cpStatus);
+        } finally {
+            cpStatusReadLock.unlock();
         }
     }
 
-    private void addDhcpFilteringObjectives(AccessDevicePort port, int udpSrc, int udpDst,
-                                            EthType ethType, MeterId upstreamMeterId, MeterId upstreamOltMeterId,
-                                            int techProfileId, byte protocol, VlanId cTag, VlanId unitagMatch,
-                                            Byte vlanPcp, boolean upstream,
-                                            boolean install, Optional<CompletableFuture<ObjectiveError>> dhcpFuture) {
+    @Override
+    public ImmutableMap<ServiceKey, UniTagInformation> getProgrammedSubscribers() {
+        // NOTE we might want to remove this conversion and directly use cpStatus as it contains
+        // all the required information about suscribers
+        Map<ServiceKey, UniTagInformation> subscribers =
+                new HashMap<>();
+        try {
+            cpStatusReadLock.lock();
+
+            cpStatus.forEach((sk, status) -> {
+                if (
+                    // not NNI Port
+                        !oltDeviceService.isNniPort(deviceService.getDevice(sk.getPort().connectPoint().deviceId()),
+                                sk.getPort().connectPoint().port()) &&
+                                // not EAPOL flow
+                                !sk.getService().equals(defaultEapolUniTag)
+                ) {
+                    subscribers.put(sk, sk.getService());
+                }
+            });
+
+            return ImmutableMap.copyOf(subscribers);
+        } finally {
+            cpStatusReadLock.unlock();
+        }
+    }
+
+    @Override
+    public Map<ServiceKey, Boolean> getRequestedSubscribers() {
+        try {
+            provisionedSubscribersReadLock.lock();
+            return ImmutableMap.copyOf(provisionedSubscribers);
+        } finally {
+            provisionedSubscribersReadLock.unlock();
+        }
+    }
+
+    @Override
+    public void handleNniFlows(Device device, Port port, FlowOperation action) {
+
+        // always handle the LLDP flow
+        processLldpFilteringObjective(device.id(), port, action);
+
+        if (enableDhcpOnNni) {
+            if (enableDhcpV4) {
+                log.debug("Installing DHCPv4 trap flow on NNI {} for device {}", portWithName(port), device.id());
+                processDhcpFilteringObjectives(device.id(), port, action, FlowDirection.DOWNSTREAM,
+                        67, 68, EthType.EtherType.IPV4.ethType(), IPv4.PROTOCOL_UDP,
+                        null, null, nniUniTag);
+            }
+            if (enableDhcpV6) {
+                log.debug("Installing DHCPv6 trap flow on NNI {} for device {}", portWithName(port), device.id());
+                processDhcpFilteringObjectives(device.id(), port, action, FlowDirection.DOWNSTREAM,
+                        546, 547, EthType.EtherType.IPV6.ethType(), IPv6.PROTOCOL_UDP,
+                        null, null, nniUniTag);
+            }
+        } else {
+            log.info("DHCP is not required on NNI {} for device {}", portWithName(port), device.id());
+        }
+
+        if (enableIgmpOnNni) {
+            log.debug("Installing IGMP flow on NNI {} for device {}", portWithName(port), device.id());
+            processIgmpFilteringObjectives(device.id(), port, action, FlowDirection.DOWNSTREAM,
+                    null, null, NONE_TP_ID, VlanId.NONE, VlanId.ANY, -1);
+        }
+
+        if (enablePppoe) {
+            log.debug("Installing PPPoE flow on NNI {} for device {}", port.number(), device.id());
+            processPPPoEDFilteringObjectives(device.id(), port, action, FlowDirection.DOWNSTREAM,
+                    null, null, NONE_TP_ID, VlanId.NONE, VlanId.ANY, null);
+        }
+    }
+
+    @Override
+    public boolean handleBasicPortFlows(DiscoveredSubscriber sub, String bandwidthProfileId,
+                                        String oltBandwidthProfileId) {
+
+        // we only need to something if EAPOL is enabled
+        if (!enableEapol) {
+            return true;
+        }
+
+        if (sub.status == DiscoveredSubscriber.Status.ADDED) {
+            return addDefaultFlows(sub, bandwidthProfileId, oltBandwidthProfileId);
+        } else if (sub.status == DiscoveredSubscriber.Status.REMOVED) {
+            return removeDefaultFlows(sub, bandwidthProfileId, oltBandwidthProfileId);
+        } else {
+            log.error("Unknown Status {} on DiscoveredSubscriber {}", sub.status, sub);
+            return false;
+        }
+
+    }
+
+    private boolean addDefaultFlows(DiscoveredSubscriber sub, String bandwidthProfileId, String oltBandwidthProfileId) {
+
+        if (!oltMeterService.createMeter(sub.device.id(), bandwidthProfileId)) {
+            if (log.isTraceEnabled()) {
+                log.trace("waiting on meter for bp {} and sub {}", bandwidthProfileId, sub);
+            }
+            return false;
+        }
+        if (hasDefaultEapol(sub.port)) {
+            return true;
+        }
+        return handleEapolFlow(sub, bandwidthProfileId,
+                oltBandwidthProfileId, FlowOperation.ADD, VlanId.vlanId(EAPOL_DEFAULT_VLAN));
+
+    }
+
+    private boolean removeDefaultFlows(DiscoveredSubscriber sub, String bandwidthProfile, String oltBandwidthProfile) {
+        // NOTE that we are not checking for meters as they must have been created to install the flow in first place
+        return handleEapolFlow(sub, bandwidthProfile, oltBandwidthProfile,
+                FlowOperation.REMOVE, VlanId.vlanId(EAPOL_DEFAULT_VLAN));
+    }
+
+    @Override
+    public boolean handleSubscriberFlows(DiscoveredSubscriber sub, String defaultBandwidthProfile,
+                                         String multicastServiceName) {
+        // NOTE that we are taking defaultBandwithProfile as a parameter as that can be configured in the Olt component
+        if (sub.status == DiscoveredSubscriber.Status.ADDED) {
+            return addSubscriberFlows(sub, defaultBandwidthProfile, multicastServiceName);
+        } else if (sub.status == DiscoveredSubscriber.Status.REMOVED) {
+            return removeSubscriberFlows(sub, defaultBandwidthProfile, multicastServiceName);
+        } else {
+            log.error("don't know how to handle {}", sub);
+            return false;
+        }
+    }
+
+    private boolean addSubscriberFlows(DiscoveredSubscriber sub, String defaultBandwithProfile,
+                                       String multicastServiceName) {
+        if (log.isTraceEnabled()) {
+            log.trace("Provisioning of subscriber on {} started", portWithName(sub.port));
+        }
+        if (enableEapol) {
+            if (hasDefaultEapol(sub.port)) {
+                // remove EAPOL flow and throw exception so that we'll retry later
+                if (!isDefaultEapolPendingRemoval(sub.port)) {
+                    removeDefaultFlows(sub, defaultBandwithProfile, defaultBandwithProfile);
+                }
+
+                if (waitForRemoval) {
+                    // NOTE wait for removal is a flag only needed to make sure VOLTHA
+                    // does not explode with the flows remove/add in the same batch
+                    log.debug("Awaiting for default flows removal for {}", portWithName(sub.port));
+                    return false;
+                } else {
+                    log.warn("continuing provisioning on {}", portWithName(sub.port));
+                }
+            }
+
+        }
+
+        // NOTE createMeters will return if the meters are not installed
+        if (!oltMeterService.createMeters(sub.device.id(),
+                sub.subscriberAndDeviceInformation)) {
+            return false;
+        }
+
+        // NOTE we need to add the DHCP flow regardless so that the host can be discovered and the MacAddress added
+        handleSubscriberDhcpFlows(sub.device.id(), sub.port, FlowOperation.ADD,
+                sub.subscriberAndDeviceInformation);
+
+        if (isMacLearningEnabled(sub.subscriberAndDeviceInformation)
+                && !isMacAddressAvailable(sub.device.id(), sub.port,
+                sub.subscriberAndDeviceInformation)) {
+            log.debug("Awaiting for macAddress on {}", portWithName(sub.port));
+            return false;
+        }
+
+        handleSubscriberDataFlows(sub.device, sub.port, FlowOperation.ADD,
+                sub.subscriberAndDeviceInformation, multicastServiceName);
+
+        handleSubscriberEapolFlows(sub, FlowOperation.ADD, sub.subscriberAndDeviceInformation);
+
+        handleSubscriberIgmpFlows(sub, FlowOperation.ADD);
+
+        log.info("Provisioning of subscriber on {} completed", portWithName(sub.port));
+        return true;
+    }
+
+    private boolean removeSubscriberFlows(DiscoveredSubscriber sub, String defaultBandwithProfile,
+                                          String multicastServiceName) {
+
+        if (log.isTraceEnabled()) {
+            log.trace("Removal of subscriber on {} started",
+                    portWithName(sub.port));
+        }
+        SubscriberAndDeviceInformation si = subsService.get(sub.portName());
+        if (si == null) {
+            log.error("Subscriber information not found in sadis for port {} during subscriber removal",
+                    portWithName(sub.port));
+            // NOTE that we are returning true so that the subscriber is removed from the queue
+            // and we can move on provisioning others
+            return true;
+        }
+
+        handleSubscriberDhcpFlows(sub.device.id(), sub.port, FlowOperation.REMOVE, si);
+
+        if (enableEapol) {
+            // remove the tagged eapol
+            handleSubscriberEapolFlows(sub, FlowOperation.REMOVE, si);
+
+            // and add the default one back
+            if (sub.port.isEnabled()) {
+                // NOTE we remove the subscriber when the port goes down
+                // but in that case we don't need to add default eapol
+                handleEapolFlow(sub, defaultBandwithProfile, defaultBandwithProfile,
+                        FlowOperation.ADD, VlanId.vlanId(EAPOL_DEFAULT_VLAN));
+            }
+        }
+        handleSubscriberDataFlows(sub.device, sub.port, FlowOperation.REMOVE, si, multicastServiceName);
+
+        handleSubscriberIgmpFlows(sub, FlowOperation.REMOVE);
+
+        // FIXME check the return status of the flow and return accordingly
+        log.info("Removal of subscriber on {} completed", portWithName(sub.port));
+        return true;
+    }
+
+    @Override
+    public boolean hasDefaultEapol(Port port) {
+        OltPortStatus status = getOltPortStatus(port, defaultEapolUniTag);
+        // NOTE we consider ERROR as a present EAPOL flow as ONOS
+        // will keep trying to add it
+        return status != null && (status.defaultEapolStatus == OltFlowsStatus.ADDED ||
+                status.defaultEapolStatus == OltFlowsStatus.PENDING_ADD ||
+                status.defaultEapolStatus == OltFlowsStatus.ERROR);
+    }
+
+    private OltPortStatus getOltPortStatus(Port port, UniTagInformation uniTagInformation) {
+        try {
+            cpStatusReadLock.lock();
+            ServiceKey sk = new ServiceKey(new AccessDevicePort(port), uniTagInformation);
+            OltPortStatus status = cpStatus.get(sk);
+            return status;
+        } finally {
+            cpStatusReadLock.unlock();
+        }
+    }
+
+    public boolean isDefaultEapolPendingRemoval(Port port) {
+        OltPortStatus status = getOltPortStatus(port, defaultEapolUniTag);
+        if (log.isTraceEnabled()) {
+            log.trace("Status during EAPOL flow check {} for port {} and UniTagInformation {}",
+                    status, portWithName(port), defaultEapolUniTag);
+        }
+        return status != null && status.defaultEapolStatus == OltFlowsStatus.PENDING_REMOVE;
+    }
+
+    @Override
+    public boolean hasDhcpFlows(Port port, UniTagInformation uti) {
+        OltPortStatus status = getOltPortStatus(port, uti);
+        if (log.isTraceEnabled()) {
+            log.trace("Status during DHCP flow check {} for port {} and service {}",
+                    status, portWithName(port), uti.getServiceName());
+        }
+        return status != null &&
+                (status.dhcpStatus == OltFlowsStatus.ADDED ||
+                        status.dhcpStatus == OltFlowsStatus.PENDING_ADD);
+    }
+
+    @Override
+    public boolean hasSubscriberFlows(Port port, UniTagInformation uti) {
+
+        OltPortStatus status = getOltPortStatus(port, uti);
+        if (log.isTraceEnabled()) {
+            log.trace("Status during subscriber flow check {} for port {} and service {}",
+                    status, portWithName(port), uti.getServiceName());
+        }
+        return status != null && (status.subscriberFlowsStatus == OltFlowsStatus.ADDED ||
+                status.subscriberFlowsStatus == OltFlowsStatus.PENDING_ADD);
+    }
+
+    @Override
+    public void purgeDeviceFlows(DeviceId deviceId) {
+        log.debug("Purging flows on device {}", deviceId);
+        flowRuleService.purgeFlowRules(deviceId);
+
+        // removing the status from the cpStatus map
+        try {
+            cpStatusWriteLock.lock();
+            Iterator<Map.Entry<ServiceKey, OltPortStatus>> iter = cpStatus.entrySet().iterator();
+            while (iter.hasNext()) {
+                Map.Entry<ServiceKey, OltPortStatus> entry = iter.next();
+                if (entry.getKey().getPort().connectPoint().deviceId().equals(deviceId)) {
+                    cpStatus.remove(entry.getKey());
+                }
+            }
+        } finally {
+            cpStatusWriteLock.unlock();
+        }
+
+        // removing subscribers from the provisioned map
+        try {
+            provisionedSubscribersWriteLock.lock();
+            Iterator<Map.Entry<ServiceKey, Boolean>> iter = provisionedSubscribers.entrySet().iterator();
+            while (iter.hasNext()) {
+                Map.Entry<ServiceKey, Boolean> entry = iter.next();
+                if (entry.getKey().getPort().connectPoint().deviceId().equals(deviceId)) {
+                    provisionedSubscribers.remove(entry.getKey());
+                }
+            }
+        } finally {
+            provisionedSubscribersWriteLock.unlock();
+        }
+    }
+
+    @Override
+    public boolean isSubscriberServiceProvisioned(AccessDevicePort cp) {
+        // if any service is programmed on this port, returns true
+        AtomicBoolean provisioned = new AtomicBoolean(false);
+        try {
+            provisionedSubscribersReadLock.lock();
+            for (Map.Entry<ServiceKey, Boolean> entry : provisionedSubscribers.entrySet()) {
+                if (entry.getKey().getPort().equals(cp) && entry.getValue()) {
+                    provisioned.set(true);
+                    break;
+                }
+            }
+        } finally {
+            provisionedSubscribersReadLock.unlock();
+        }
+        return provisioned.get();
+    }
+
+    @Override
+    public boolean isSubscriberServiceProvisioned(ServiceKey sk) {
+        try {
+            provisionedSubscribersReadLock.lock();
+            Boolean provisioned = provisionedSubscribers.get(sk);
+            if (provisioned == null || !provisioned) {
+                return false;
+            }
+        } finally {
+            provisionedSubscribersReadLock.unlock();
+        }
+        return true;
+    }
+
+    @Override
+    public void updateProvisionedSubscriberStatus(ServiceKey sk, Boolean status) {
+        try {
+            provisionedSubscribersWriteLock.lock();
+            provisionedSubscribers.put(sk, status);
+        } finally {
+            provisionedSubscribersWriteLock.unlock();
+        }
+    }
+
+    private boolean handleEapolFlow(DiscoveredSubscriber sub, String bandwidthProfile,
+                                    String oltBandwidthProfile, FlowOperation action, VlanId vlanId) {
+
+        // create a subscriberKey for the EAPOL flow
+        ServiceKey sk = new ServiceKey(new AccessDevicePort(sub.port), defaultEapolUniTag);
+
+        // NOTE we only need to keep track of the default EAPOL flow in the
+        // connectpoint status map
+        if (vlanId.id().equals(EAPOL_DEFAULT_VLAN)) {
+            OltFlowsStatus status = action == FlowOperation.ADD ?
+                    OltFlowsStatus.PENDING_ADD : OltFlowsStatus.PENDING_REMOVE;
+            updateConnectPointStatus(sk, status, OltFlowsStatus.NONE, OltFlowsStatus.NONE);
+
+        }
+
+        DefaultFilteringObjective.Builder filterBuilder = DefaultFilteringObjective.builder();
+        TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
+
+        int techProfileId = getDefaultTechProfileId(sub.port);
+        MeterId meterId = oltMeterService.getMeterIdForBandwidthProfile(sub.device.id(), bandwidthProfile);
+
+        // in the delete case the meter should still be there as we remove
+        // the meters only if no flows are pointing to them
+        if (meterId == null) {
+            log.debug("MeterId is null for BandwidthProfile {} on device {}",
+                    bandwidthProfile, sub.device.id());
+            return false;
+        }
+
+        MeterId oltMeterId = oltMeterService.getMeterIdForBandwidthProfile(sub.device.id(), oltBandwidthProfile);
+        if (oltMeterId == null) {
+            log.debug("MeterId is null for OltBandwidthProfile {} on device {}",
+                    oltBandwidthProfile, sub.device.id());
+            return false;
+        }
+
+        log.info("{} EAPOL flow for {} with vlanId {} and BandwidthProfile {} (meterId {})",
+                flowOpToString(action), portWithName(sub.port), vlanId, bandwidthProfile, meterId);
+
+        FilteringObjective.Builder eapolAction;
+
+        if (action == FlowOperation.ADD) {
+            eapolAction = filterBuilder.permit();
+        } else if (action == FlowOperation.REMOVE) {
+            eapolAction = filterBuilder.deny();
+        } else {
+            log.error("Operation {} not supported", action);
+            return false;
+        }
+
+        FilteringObjective.Builder baseEapol = eapolAction
+                .withKey(Criteria.matchInPort(sub.port.number()))
+                .addCondition(Criteria.matchEthType(EthType.EtherType.EAPOL.ethType()));
+
+        // NOTE we only need to add the treatment to install the flow,
+        // we can remove it based in the match
+        FilteringObjective.Builder eapol;
+
+        TrafficTreatment treatment = treatmentBuilder
+                .meter(meterId)
+                .writeMetadata(createTechProfValueForWriteMetadata(
+                        vlanId,
+                        techProfileId, oltMeterId), 0)
+                .setOutput(PortNumber.CONTROLLER)
+                .pushVlan()
+                .setVlanId(vlanId)
+                .build();
+        eapol = baseEapol
+                .withMeta(treatment);
+
+        FilteringObjective eapolObjective = eapol
+                .fromApp(appId)
+                .withPriority(MAX_PRIORITY)
+                .add(new ObjectiveContext() {
+                    @Override
+                    public void onSuccess(Objective objective) {
+                        log.info("EAPOL flow objective {} for {}",
+                                completeFlowOpToString(action), portWithName(sub.port));
+                        if (log.isTraceEnabled()) {
+                            log.trace("EAPOL flow details for port {}: {}", portWithName(sub.port), objective);
+                        }
+                    }
+
+                    @Override
+                    public void onError(Objective objective, ObjectiveError error) {
+                        log.error("Cannot {} eapol flow for {} : {}", action,
+                                portWithName(sub.port), error);
+
+                        if (vlanId.id().equals(EAPOL_DEFAULT_VLAN)) {
+                            updateConnectPointStatus(sk,
+                                    OltFlowsStatus.ERROR, null, null);
+                        }
+                    }
+                });
+
+        flowObjectiveService.filter(sub.device.id(), eapolObjective);
+
+        log.info("{} EAPOL filter for {}", completeFlowOpToString(action), portWithName(sub.port));
+        return true;
+    }
+
+    // FIXME it's confusing that si is not necessarily inside the DiscoveredSubscriber
+    private boolean handleSubscriberEapolFlows(DiscoveredSubscriber sub, FlowOperation action,
+                                               SubscriberAndDeviceInformation si) {
+        if (!enableEapol) {
+            return true;
+        }
+        // TODO verify we need an EAPOL flow for EACH service
+        AtomicBoolean success = new AtomicBoolean(true);
+        si.uniTagList().forEach(u -> {
+            // we assume that if subscriber flows are installed, tagged EAPOL is installed as well
+            boolean hasFlows = hasSubscriberFlows(sub.port, u);
+
+            // if the subscriber flows are present the EAPOL flow is present thus we don't need to install it,
+            // if the subscriber does not have flows the EAPOL flow is not present thus we don't need to remove it
+            if (action == FlowOperation.ADD && hasFlows ||
+                    action == FlowOperation.REMOVE && !hasFlows) {
+                log.debug("Not {} EAPOL on {}({}) as subscriber flow status is {}", flowOpToString(action),
+                        portWithName(sub.port), u.getServiceName(), hasFlows);
+                return;
+            }
+            log.info("{} EAPOL flows for subscriber {} on {} and service {}",
+                    flowOpToString(action), si.id(), portWithName(sub.port), u.getServiceName());
+            if (!handleEapolFlow(sub, u.getUpstreamBandwidthProfile(),
+                    u.getUpstreamOltBandwidthProfile(),
+                    action, u.getPonCTag())) {
+                //
+                log.error("Failed to {} EAPOL with suscriber tags", action);
+                //TODO this sets it for all services, maybe some services succeeded.
+                success.set(false);
+            }
+        });
+        return success.get();
+    }
+
+    private void handleSubscriberIgmpFlows(DiscoveredSubscriber sub, FlowOperation action) {
+        sub.subscriberAndDeviceInformation.uniTagList().forEach(uti -> {
+            if (uti.getIsIgmpRequired()) {
+                DeviceId deviceId = sub.device.id();
+                // if we reached here a meter already exists
+                MeterId meterId = oltMeterService
+                        .getMeterIdForBandwidthProfile(deviceId, uti.getUpstreamBandwidthProfile());
+                MeterId oltMeterId = oltMeterService
+                        .getMeterIdForBandwidthProfile(deviceId, uti.getUpstreamOltBandwidthProfile());
+
+                processIgmpFilteringObjectives(deviceId, sub.port,
+                        action, FlowDirection.UPSTREAM, meterId, oltMeterId, uti.getTechnologyProfileId(),
+                        uti.getPonCTag(), uti.getUniTagMatch(), uti.getUsPonCTagPriority());
+            }
+        });
+    }
+
+    private boolean checkSadisRunning() {
+        if (bpService == null) {
+            log.warn("Sadis is not running");
+            return false;
+        }
+        return true;
+    }
+
+    private int getDefaultTechProfileId(Port port) {
+        if (!checkSadisRunning()) {
+            return defaultTechProfileId;
+        }
+        if (port != null) {
+            SubscriberAndDeviceInformation info = subsService.get(getPortName(port));
+            if (info != null && info.uniTagList().size() == 1) {
+                return info.uniTagList().get(0).getTechnologyProfileId();
+            }
+        }
+        return defaultTechProfileId;
+    }
+
+    protected Long createTechProfValueForWriteMetadata(VlanId cVlan, int techProfileId, MeterId upstreamOltMeterId) {
+        Long writeMetadata;
+
+        if (cVlan == null || VlanId.NONE.equals(cVlan)) {
+            writeMetadata = (long) techProfileId << 32;
+        } else {
+            writeMetadata = ((long) (cVlan.id()) << 48 | (long) techProfileId << 32);
+        }
+        if (upstreamOltMeterId == null) {
+            return writeMetadata;
+        } else {
+            return writeMetadata | upstreamOltMeterId.id();
+        }
+    }
+
+    private void processLldpFilteringObjective(DeviceId deviceId, Port port, FlowOperation action) {
+        DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
+
+        FilteringObjective lldp = (action == FlowOperation.ADD ? builder.permit() : builder.deny())
+                .withKey(Criteria.matchInPort(port.number()))
+                .addCondition(Criteria.matchEthType(EthType.EtherType.LLDP.ethType()))
+                .withMeta(DefaultTrafficTreatment.builder()
+                        .setOutput(PortNumber.CONTROLLER).build())
+                .fromApp(appId)
+                .withPriority(MAX_PRIORITY)
+                .add(new ObjectiveContext() {
+                    @Override
+                    public void onSuccess(Objective objective) {
+                        log.info("{} LLDP filter for {}.", completeFlowOpToString(action), portWithName(port));
+                    }
+
+                    @Override
+                    public void onError(Objective objective, ObjectiveError error) {
+                        log.error("Falied to {} LLDP filter on {} because {}", action, portWithName(port),
+                                error);
+                    }
+                });
+
+        flowObjectiveService.filter(deviceId, lldp);
+    }
+
+    protected void handleSubscriberDhcpFlows(DeviceId deviceId, Port port,
+                                             FlowOperation action,
+                                             SubscriberAndDeviceInformation si) {
+        si.uniTagList().forEach(uti -> {
+
+            if (!uti.getIsDhcpRequired()) {
+                return;
+            }
+
+            // if it's an ADD skip if flows are there,
+            // if it's a DELETE skip if flows are not there
+            boolean hasFlows = hasDhcpFlows(port, uti);
+            if (action == FlowOperation.ADD && hasFlows ||
+                    action == FlowOperation.REMOVE && !hasFlows) {
+                log.debug("Not dealing with DHCP {} on {} as DHCP flow status is {}", action,
+                        uti.getServiceName(), hasFlows);
+                return;
+            }
+
+            log.info("{} DHCP flows for subscriber on {} and service {}",
+                    flowOpToString(action), portWithName(port), uti.getServiceName());
+
+            // if we reached here a meter already exists
+            MeterId meterId = oltMeterService
+                    .getMeterIdForBandwidthProfile(deviceId, uti.getUpstreamBandwidthProfile());
+            MeterId oltMeterId = oltMeterService
+                    .getMeterIdForBandwidthProfile(deviceId, uti.getUpstreamOltBandwidthProfile());
+
+            if (enableDhcpV4) {
+                processDhcpFilteringObjectives(deviceId, port, action, FlowDirection.UPSTREAM, 68, 67,
+                        EthType.EtherType.IPV4.ethType(), IPv4.PROTOCOL_UDP, meterId, oltMeterId,
+                        uti);
+            }
+            if (enableDhcpV6) {
+                log.error("DHCP V6 not supported for subscribers");
+            }
+        });
+    }
+
+    // FIXME return boolean, if this fails we need to retry
+    protected void handleSubscriberDataFlows(Device device, Port port,
+                                             FlowOperation action,
+                                             SubscriberAndDeviceInformation si, String multicastServiceName) {
+
+        Optional<Port> nniPort = oltDeviceService.getNniPort(device);
+        if (nniPort.isEmpty()) {
+            log.error("Cannot configure DP flows as upstream port is not configured for subscriber {} on {}",
+                    si.id(), portWithName(port));
+            return;
+        }
+        si.uniTagList().forEach(uti -> {
+
+            boolean hasFlows = hasSubscriberFlows(port, uti);
+            if (action == FlowOperation.ADD && hasFlows ||
+                    action == FlowOperation.REMOVE && !hasFlows) {
+                log.debug("Not dealing with DP flows {} on {} as subscriber flow status is {}", action,
+                        uti.getServiceName(), hasFlows);
+                return;
+            }
+
+            if (multicastServiceName.equals(uti.getServiceName())) {
+                log.debug("This is the multicast service ({}) for subscriber {} on {}, " +
+                                "dataplane flows are not needed",
+                        uti.getServiceName(), si.id(), portWithName(port));
+                return;
+            }
+
+            log.info("{} Data plane flows for subscriber {} on {} and service {}",
+                    flowOpToString(action), si.id(), portWithName(port), uti.getServiceName());
+
+            // upstream flows
+            MeterId usMeterId = oltMeterService
+                    .getMeterIdForBandwidthProfile(device.id(), uti.getUpstreamBandwidthProfile());
+            MeterId oltUsMeterId = oltMeterService
+                    .getMeterIdForBandwidthProfile(device.id(), uti.getUpstreamOltBandwidthProfile());
+            processUpstreamDataFilteringObjects(device.id(), port, nniPort.get(), action, usMeterId,
+                    oltUsMeterId, uti);
+
+            // downstream flows
+            MeterId dsMeterId = oltMeterService
+                    .getMeterIdForBandwidthProfile(device.id(), uti.getDownstreamBandwidthProfile());
+            MeterId oltDsMeterId = oltMeterService
+                    .getMeterIdForBandwidthProfile(device.id(), uti.getDownstreamOltBandwidthProfile());
+            processDownstreamDataFilteringObjects(device.id(), port, nniPort.get(), action, dsMeterId,
+                    oltDsMeterId, uti, getMacAddress(device.id(), port, uti));
+        });
+    }
+
+    private void processDhcpFilteringObjectives(DeviceId deviceId, Port port,
+                                                FlowOperation action, FlowDirection direction,
+                                                int udpSrc, int udpDst, EthType ethType, byte protocol,
+                                                MeterId meterId, MeterId oltMeterId, UniTagInformation uti) {
+        ServiceKey sk = new ServiceKey(new AccessDevicePort(port), uti);
+        log.debug("{} DHCP filtering objectives on {}", flowOpToString(action), sk);
+
+
+        OltFlowsStatus status = action.equals(FlowOperation.ADD) ?
+                OltFlowsStatus.PENDING_ADD : OltFlowsStatus.PENDING_REMOVE;
+        updateConnectPointStatus(sk, null, null, status);
 
         DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
         TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
 
-        if (upstreamMeterId != null) {
-            treatmentBuilder.meter(upstreamMeterId);
+        if (meterId != null) {
+            treatmentBuilder.meter(meterId);
         }
 
-        if (techProfileId != NONE_TP_ID) {
-            treatmentBuilder.writeMetadata(createTechProfValueForWm(unitagMatch, techProfileId, upstreamOltMeterId), 0);
+        if (uti.getTechnologyProfileId() != NONE_TP_ID) {
+            treatmentBuilder.writeMetadata(
+                    createTechProfValueForWriteMetadata(uti.getUniTagMatch(),
+                            uti.getTechnologyProfileId(), oltMeterId), 0);
         }
 
-        FilteringObjective.Builder dhcpUpstreamBuilder = (install ? builder.permit() : builder.deny())
+        FilteringObjective.Builder dhcpBuilder = (action == FlowOperation.ADD ? builder.permit() : builder.deny())
                 .withKey(Criteria.matchInPort(port.number()))
                 .addCondition(Criteria.matchEthType(ethType))
                 .addCondition(Criteria.matchIPProtocol(protocol))
@@ -356,87 +1065,124 @@
                 .withPriority(MAX_PRIORITY);
 
         //VLAN changes and PCP matching need to happen only in the upstream directions
-        if (upstream) {
-            treatmentBuilder.setVlanId(cTag);
-            if (!VlanId.vlanId(VlanId.NO_VID).equals(unitagMatch)) {
-                dhcpUpstreamBuilder.addCondition(Criteria.matchVlanId(unitagMatch));
+        if (direction == FlowDirection.UPSTREAM) {
+            treatmentBuilder.setVlanId(uti.getPonCTag());
+            if (!VlanId.vlanId(VlanId.NO_VID).equals(uti.getUniTagMatch())) {
+                dhcpBuilder.addCondition(Criteria.matchVlanId(uti.getUniTagMatch()));
             }
-            if (vlanPcp != null) {
-                treatmentBuilder.setVlanPcp(vlanPcp);
+            if (uti.getUsPonCTagPriority() != -1) {
+                treatmentBuilder.setVlanPcp((byte) uti.getUsPonCTagPriority());
             }
         }
 
-        dhcpUpstreamBuilder.withMeta(treatmentBuilder
-                                  .setOutput(PortNumber.CONTROLLER).build());
+        dhcpBuilder.withMeta(treatmentBuilder
+                .setOutput(PortNumber.CONTROLLER).build());
 
 
-        FilteringObjective dhcpUpstream = dhcpUpstreamBuilder.add(new ObjectiveContext() {
+        FilteringObjective dhcpUpstream = dhcpBuilder.add(new ObjectiveContext() {
             @Override
             public void onSuccess(Objective objective) {
-                log.info("DHCP {} filter for {} {}.",
-                        (ethType.equals(EthType.EtherType.IPV4.ethType())) ? V4 : V6, port,
-                        (install) ? INSTALLED : REMOVED);
-                dhcpFuture.ifPresent(f -> f.complete(null));
+                log.info("{} DHCP {} filter for {}.",
+                        completeFlowOpToString(action), (ethType.equals(EthType.EtherType.IPV4.ethType())) ? V4 : V6,
+                        portWithName(port));
             }
 
             @Override
             public void onError(Objective objective, ObjectiveError error) {
                 log.error("DHCP {} filter for {} failed {} because {}",
-                        (ethType.equals(EthType.EtherType.IPV4.ethType())) ? V4 : V6, port,
-                        (install) ? INSTALLATION : REMOVAL,
+                        (ethType.equals(EthType.EtherType.IPV4.ethType())) ? V4 : V6,
+                        portWithName(port),
+                        action,
                         error);
-                dhcpFuture.ifPresent(f -> f.complete(error));
+                updateConnectPointStatus(sk, null, null, OltFlowsStatus.ERROR);
             }
         });
-        flowObjectiveService.filter(port.deviceId(), dhcpUpstream);
+        flowObjectiveService.filter(deviceId, dhcpUpstream);
+    }
+
+    private void processIgmpFilteringObjectives(DeviceId deviceId, Port port,
+                                                FlowOperation action, FlowDirection direction,
+                                                MeterId meterId, MeterId oltMeterId, int techProfileId,
+                                                VlanId cTag, VlanId unitagMatch, int vlanPcp) {
+
+        DefaultFilteringObjective.Builder filterBuilder = DefaultFilteringObjective.builder();
+        TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
+        if (direction == FlowDirection.UPSTREAM) {
+
+            if (techProfileId != NONE_TP_ID) {
+                treatmentBuilder.writeMetadata(createTechProfValueForWriteMetadata(null,
+                        techProfileId, oltMeterId), 0);
+            }
+
+
+            if (meterId != null) {
+                treatmentBuilder.meter(meterId);
+            }
+
+            if (!VlanId.vlanId(VlanId.NO_VID).equals(unitagMatch)) {
+                filterBuilder.addCondition(Criteria.matchVlanId(unitagMatch));
+            }
+
+            if (!VlanId.vlanId(VlanId.NO_VID).equals(cTag)) {
+                treatmentBuilder.setVlanId(cTag);
+            }
+
+            if (vlanPcp != -1) {
+                treatmentBuilder.setVlanPcp((byte) vlanPcp);
+            }
+        }
+
+        filterBuilder = (action == FlowOperation.ADD) ? filterBuilder.permit() : filterBuilder.deny();
+
+        FilteringObjective igmp = filterBuilder
+                .withKey(Criteria.matchInPort(port.number()))
+                .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
+                .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_IGMP))
+                .withMeta(treatmentBuilder
+                        .setOutput(PortNumber.CONTROLLER).build())
+                .fromApp(appId)
+                .withPriority(MAX_PRIORITY)
+                .add(new ObjectiveContext() {
+                    @Override
+                    public void onSuccess(Objective objective) {
+                        log.info("Igmp filter for {} {}.", portWithName(port), action);
+                    }
+
+                    @Override
+                    public void onError(Objective objective, ObjectiveError error) {
+                        log.error("Igmp filter for {} failed {} because {}.", portWithName(port), action,
+                                error);
+                    }
+                });
+
+        flowObjectiveService.filter(deviceId, igmp);
 
     }
 
-    @Override
-    public void processPPPoEDFilteringObjectives(AccessDevicePort port,
-                                                 MeterId upstreamMeterId,
-                                                 MeterId upstreamOltMeterId,
-                                                 UniTagInformation tagInformation,
-                                                 boolean install,
-                                                 boolean upstream) {
-        if (!enablePppoe) {
-            log.debug("PPPoED filtering is disabled. Ignoring request.");
-            return;
-        }
-
-        int techProfileId = NONE_TP_ID;
-        VlanId cTag = VlanId.NONE;
-        VlanId unitagMatch = VlanId.ANY;
-        Byte vlanPcp = null;
-
-        if (tagInformation != null) {
-            techProfileId = tagInformation.getTechnologyProfileId();
-            cTag = tagInformation.getPonCTag();
-            unitagMatch = tagInformation.getUniTagMatch();
-            if (tagInformation.getUsPonCTagPriority() != NO_PCP) {
-                vlanPcp = (byte) tagInformation.getUsPonCTagPriority();
-            }
-        }
+    private void processPPPoEDFilteringObjectives(DeviceId deviceId, Port port,
+                                                  FlowOperation action, FlowDirection direction,
+                                                  MeterId meterId, MeterId oltMeterId, int techProfileId,
+                                                  VlanId cTag, VlanId unitagMatch, Byte vlanPcp) {
 
         DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
         TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
-        CompletableFuture<Object> meterFuture = new CompletableFuture<>();
 
-        if (upstreamMeterId != null) {
-            treatmentBuilder.meter(upstreamMeterId);
+        if (meterId != null) {
+            treatmentBuilder.meter(meterId);
         }
 
         if (techProfileId != NONE_TP_ID) {
-            treatmentBuilder.writeMetadata(createTechProfValueForWm(cTag, techProfileId, upstreamOltMeterId), 0);
+            treatmentBuilder.writeMetadata(createTechProfValueForWriteMetadata(cTag, techProfileId, oltMeterId), 0);
         }
 
-        DefaultFilteringObjective.Builder pppoedBuilder = (install ? builder.permit() : builder.deny())
+        DefaultFilteringObjective.Builder pppoedBuilder = ((action == FlowOperation.ADD)
+                ? builder.permit() : builder.deny())
                 .withKey(Criteria.matchInPort(port.number()))
                 .addCondition(Criteria.matchEthType(EthType.EtherType.PPPoED.ethType()))
                 .fromApp(appId)
                 .withPriority(10000);
 
-        if (upstream) {
+        if (direction == FlowDirection.UPSTREAM) {
             treatmentBuilder.setVlanId(cTag);
             if (!VlanId.vlanId(VlanId.NO_VID).equals(unitagMatch)) {
                 pppoedBuilder.addCondition(Criteria.matchVlanId(unitagMatch));
@@ -451,398 +1197,50 @@
                 .add(new ObjectiveContext() {
                     @Override
                     public void onSuccess(Objective objective) {
-                        log.info("PPPoED filter for {} {}.", port, (install) ? INSTALLED : REMOVED);
+                        log.info("PPPoED filter for {} {}.", portWithName(port), action);
                     }
 
                     @Override
                     public void onError(Objective objective, ObjectiveError error) {
-                        log.info("PPPoED filter for {} failed {} because {}", port,
-                                (install) ? INSTALLATION : REMOVAL, error);
+                        log.info("PPPoED filter for {} failed {} because {}", portWithName(port),
+                                action, error);
                     }
                 });
-        flowObjectiveService.filter(port.deviceId(), pppoed);
+        flowObjectiveService.filter(deviceId, pppoed);
     }
 
-    @Override
-    public void processIgmpFilteringObjectives(AccessDevicePort port,
-                                               MeterId upstreamMeterId,
-                                               MeterId upstreamOltMeterId,
-                                               UniTagInformation tagInformation,
-                                               boolean install,
-                                               boolean upstream) {
-        if (upstream) {
-            // for UNI ports
-            if (tagInformation != null && !tagInformation.getIsIgmpRequired()) {
-                log.debug("Igmp provisioning is disabled for UNI port {} for service {}",
-                        port, tagInformation.getServiceName());
-                return;
-            }
-        } else {
-            // for NNI ports
-            if (!enableIgmpOnNni) {
-                log.debug("Igmp provisioning is disabled for NNI port {}", port);
-                return;
-            }
-        }
-
-        log.debug("{} IGMP flows on {}", (install) ? "Installing" : "Removing", port);
-        DefaultFilteringObjective.Builder filterBuilder = DefaultFilteringObjective.builder();
-        TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
-        if (upstream) {
-
-            if (tagInformation.getTechnologyProfileId() != NONE_TP_ID) {
-                treatmentBuilder.writeMetadata(createTechProfValueForWm(null,
-                        tagInformation.getTechnologyProfileId(), upstreamOltMeterId), 0);
-            }
-
-
-            if (upstreamMeterId != null) {
-                treatmentBuilder.meter(upstreamMeterId);
-            }
-
-            if (!VlanId.vlanId(VlanId.NO_VID).equals(tagInformation.getUniTagMatch())) {
-                filterBuilder.addCondition(Criteria.matchVlanId(tagInformation.getUniTagMatch()));
-            }
-
-            if (!VlanId.vlanId(VlanId.NO_VID).equals(tagInformation.getPonCTag())) {
-                treatmentBuilder.setVlanId(tagInformation.getPonCTag());
-            }
-
-            if (tagInformation.getUsPonCTagPriority() != NO_PCP) {
-                treatmentBuilder.setVlanPcp((byte) tagInformation.getUsPonCTagPriority());
-            }
-        }
-
-        filterBuilder = install ? filterBuilder.permit() : filterBuilder.deny();
-
-        FilteringObjective igmp = filterBuilder
-                .withKey(Criteria.matchInPort(port.number()))
-                .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
-                .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_IGMP))
-                .withMeta(treatmentBuilder
-                        .setOutput(PortNumber.CONTROLLER).build())
-                .fromApp(appId)
-                .withPriority(MAX_PRIORITY)
-                .add(new ObjectiveContext() {
-                    @Override
-                    public void onSuccess(Objective objective) {
-                        log.info("Igmp filter for {} {}.", port, (install) ? INSTALLED : REMOVED);
-                    }
-
-                    @Override
-                    public void onError(Objective objective, ObjectiveError error) {
-                        log.error("Igmp filter for {} failed {} because {}.", port, (install) ? INSTALLATION : REMOVAL,
-                                error);
-                    }
-                });
-
-        flowObjectiveService.filter(port.deviceId(), igmp);
-    }
-
-    @Override
-    public void processEapolFilteringObjectives(AccessDevicePort port, String bpId, Optional<String> oltBpId,
-                                                CompletableFuture<ObjectiveError> filterFuture,
-                                                VlanId vlanId, boolean install) {
-
-        if (!enableEapol) {
-            log.debug("Eapol filtering is disabled. Completing filteringFuture immediately for the device {}",
-                    port.deviceId());
-            if (filterFuture != null) {
-                filterFuture.complete(null);
-            }
-            return;
-        }
-        log.info("Processing EAPOL with Bandwidth profile {} on {}", bpId, port);
-        BandwidthProfileInformation bpInfo = getBandwidthProfileInformation(bpId);
-        BandwidthProfileInformation oltBpInfo;
-        if (oltBpId.isPresent()) {
-            oltBpInfo = getBandwidthProfileInformation(oltBpId.get());
-        } else {
-            oltBpInfo = bpInfo;
-        }
-        if (bpInfo == null) {
-            log.warn("Bandwidth profile {} is not found. Authentication flow"
-                    + " will not be installed on {}", bpId, port);
-            if (filterFuture != null) {
-                filterFuture.complete(ObjectiveError.BADPARAMS);
-            }
-            return;
-        }
-
-        ConnectPoint cp = new ConnectPoint(port.deviceId(), port.number());
-        DefaultFilteringObjective.Builder filterBuilder = DefaultFilteringObjective.builder();
-        TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
-        // check if meter exists and create it only for an install
-        final MeterId meterId = oltMeterService.getMeterIdFromBpMapping(port.deviceId(), bpInfo.id());
-        MeterId oltMeterId = null;
-        if (oltBpId.isPresent()) {
-            oltMeterId = oltBpId.map(id -> oltMeterService.getMeterIdFromBpMapping(port.deviceId(), id)).orElse(null);
-        }
-        log.info("Meter id {} for Bandwidth profile {} and OLT meter id {} for OLT Bandwidth profile {} " +
-                        "associated to EAPOL on {}", meterId, bpInfo.id(), oltMeterId, oltBpId, port.deviceId());
-        if (meterId == null || (oltBpId.isPresent() && oltMeterId == null)) {
-            if (install) {
-                log.debug("Need to install meter for EAPOL with bwp {} on {}", bpInfo.id(), port);
-                SubscriberFlowInfo fi = new SubscriberFlowInfo(null, port,
-                                                            new UniTagInformation.Builder()
-                                                                    .setPonCTag(vlanId).build(),
-                                                        null, meterId, null, oltMeterId,
-                                                    null, bpInfo.id(), null, oltBpInfo.id());
-                pendingEapolForDevice.compute(port.deviceId(), (id, queue) -> {
-                    if (queue == null) {
-                        queue = new LinkedBlockingQueue<>();
-                    }
-                    queue.add(fi);
-                    return queue;
-                });
-
-                //If false the meter is already being installed, skipping installation
-                if (!oltMeterService.checkAndAddPendingMeter(port.deviceId(), bpInfo) &&
-                        !oltMeterService.checkAndAddPendingMeter(port.deviceId(), oltBpInfo)) {
-                    return;
-                }
-                List<BandwidthProfileInformation> bwpList = Arrays.asList(bpInfo, oltBpInfo);
-                bwpList.stream().distinct().filter(Objects::nonNull)
-                        .forEach(bwp -> createMeterAndProceedEapol(port, bwp, filterFuture, install,
-                        cp, filterBuilder, treatmentBuilder));
-            } else {
-                // this case should not happen as the request to remove an eapol
-                // flow should mean that the flow points to a meter that exists.
-                // Nevertheless we can still delete the flow as we only need the
-                // correct 'match' to do so.
-                log.warn("Unknown meter id for bp {}, still proceeding with "
-                        + "delete of eapol flow on {}", bpInfo.id(), port);
-                SubscriberFlowInfo fi = new SubscriberFlowInfo(null, port,
-                        new UniTagInformation.Builder().setPonCTag(vlanId).build(),
-                        null, meterId, null, oltMeterId, null,
-                        bpInfo.id(), null, oltBpInfo.id());
-                handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi, meterId, oltMeterId);
-            }
-        } else {
-            log.debug("Meter {} was previously created for bp {} on {}", meterId, bpInfo.id(), port);
-            SubscriberFlowInfo fi = new SubscriberFlowInfo(null, port,
-                    new UniTagInformation.Builder().setPonCTag(vlanId).build(),
-                    null, meterId, null, oltMeterId, null,
-                    bpInfo.id(), null, oltBpInfo.id());
-            handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi, meterId, oltMeterId);
-            //No need for the future, meter is present.
-            return;
-        }
-    }
-
-    private void createMeterAndProceedEapol(AccessDevicePort port, BandwidthProfileInformation bwpInfo,
-                                            CompletableFuture<ObjectiveError> filterFuture,
-                                            boolean install, ConnectPoint cp,
-                                            DefaultFilteringObjective.Builder filterBuilder,
-                                            TrafficTreatment.Builder treatmentBuilder) {
-        CompletableFuture<Object> meterFuture = new CompletableFuture<>();
-        MeterId meterId = oltMeterService.createMeter(port.deviceId(), bwpInfo, meterFuture);
-        DeviceId deviceId = port.deviceId();
-        meterFuture.thenAccept(result -> {
-            //for each pending eapol flow we check if the meter is there.
-            pendingEapolForDevice.compute(deviceId, (id, queue) -> {
-                if (queue != null && !queue.isEmpty()) {
-                    while (true) {
-                        //TODO this might return the reference and not the actual object
-                        // so it can be actually swapped underneath us.
-                        SubscriberFlowInfo fi = queue.peek();
-                        if (fi == null) {
-                            log.debug("No more subscribers eapol flows on {}", deviceId);
-                            queue = new LinkedBlockingQueue<>();
-                            break;
-                        }
-                        log.debug("handing pending eapol on {} for {}", fi.getUniPort(), fi);
-                        if (result == null) {
-                            MeterId mId = oltMeterService
-                                    .getMeterIdFromBpMapping(port.deviceId(), fi.getUpBpInfo());
-                            MeterId oltMeterId = oltMeterService
-                                    .getMeterIdFromBpMapping(port.deviceId(), fi.getUpOltBpInfo());
-                            if (mId != null && oltMeterId != null) {
-                                log.debug("Meter installation completed for subscriber on {}, " +
-                                                  "handling EAPOL trap flow", port);
-                                fi.setUpMeterId(mId);
-                                fi.setUpOltMeterId(oltMeterId);
-                                handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi,
-                                            mId, oltMeterId);
-                                queue.remove(fi);
-                            } else {
-                                log.debug("Not all meters for {} are yet installed for EAPOL meterID {}, " +
-                                                  "oltMeterId {}", fi, meterId, oltMeterId);
-                            }
-                        } else {
-                            log.warn("Meter installation error while sending EAPOL trap flow to {}. " +
-                                             "Result {} and MeterId {}", port, result, meterId);
-                            queue.remove(fi);
-                        }
-                        oltMeterService.removeFromPendingMeters(port.deviceId(), bwpInfo);
-                    }
-                } else {
-                    log.info("No pending EAPOLs on {}", port.deviceId());
-                    queue = new LinkedBlockingQueue<>();
-                }
-                return queue;
-            });
-        });
-    }
-
-    private void handleEapol(CompletableFuture<ObjectiveError> filterFuture,
-                             boolean install, ConnectPoint cp,
-                             DefaultFilteringObjective.Builder filterBuilder,
-                             TrafficTreatment.Builder treatmentBuilder,
-                             SubscriberFlowInfo fi, MeterId mId, MeterId oltMeterId) {
-        log.info("Meter {} for {} on {} exists. {} EAPOL trap flow",
-                 mId, fi.getUpBpInfo(), fi.getUniPort(),
-                 (install) ? "Installing" : "Removing");
-        int techProfileId = getDefaultTechProfileId(fi.getUniPort());
-        // can happen in case of removal
-        if (mId != null) {
-            treatmentBuilder.meter(mId);
-        }
-        //Authentication trap flow uses only tech profile id as write metadata value
-        FilteringObjective eapol = (install ? filterBuilder.permit() : filterBuilder.deny())
-                .withKey(Criteria.matchInPort(fi.getUniPort().number()))
-                .addCondition(Criteria.matchEthType(EthType.EtherType.EAPOL.ethType()))
-                .withMeta(treatmentBuilder
-                                  .writeMetadata(createTechProfValueForWm(
-                                          fi.getTagInfo().getPonCTag(),
-                                          techProfileId, oltMeterId), 0)
-                                  .setOutput(PortNumber.CONTROLLER)
-                                  .pushVlan()
-                                  .setVlanId(fi.getTagInfo().getPonCTag())
-                                  .build())
-                .fromApp(appId)
-                .withPriority(MAX_PRIORITY)
-                .add(new ObjectiveContext() {
-                    @Override
-                    public void onSuccess(Objective objective) {
-                        log.info("Eapol filter {} for {} on {} with meter {}.",
-                                 objective.id(), (install) ? INSTALLED : REMOVED, fi.getUniPort(), mId);
-                        if (filterFuture != null) {
-                            filterFuture.complete(null);
-                        }
-                    }
-
-                    @Override
-                    public void onError(Objective objective, ObjectiveError error) {
-                        log.error("Eapol filter {} for {} with meter {} " +
-                                         "failed {} because {}", objective.id(), fi.getUniPort(), mId,
-                                 (install) ? INSTALLATION : REMOVAL,
-                                 error);
-                        if (filterFuture != null) {
-                            filterFuture.complete(error);
-                        }
-                    }
-                });
-        flowObjectiveService.filter(fi.getDevId(), eapol);
-    }
-
-    /**
-     * Installs trap filtering objectives for particular traffic types on an
-     * NNI port.
-     *
-     * @param nniPort    NNI port
-     * @param install true to install, false to remove
-     */
-    @Override
-    public void processNniFilteringObjectives(AccessDevicePort nniPort, boolean install) {
-        log.info("{} flows for NNI port {}",
-                 install ? "Adding" : "Removing", nniPort);
-        processLldpFilteringObjective(nniPort, install);
-        processDhcpFilteringObjectives(nniPort, null, null, null, install, false, Optional.empty());
-        processIgmpFilteringObjectives(nniPort, null, null, null, install, false);
-        processPPPoEDFilteringObjectives(nniPort, null, null, null, install, false);
-    }
-
-
-    @Override
-    public void processLldpFilteringObjective(AccessDevicePort port, boolean install) {
-        DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
-
-        FilteringObjective lldp = (install ? builder.permit() : builder.deny())
-                .withKey(Criteria.matchInPort(port.number()))
-                .addCondition(Criteria.matchEthType(EthType.EtherType.LLDP.ethType()))
-                .withMeta(DefaultTrafficTreatment.builder()
-                        .setOutput(PortNumber.CONTROLLER).build())
-                .fromApp(appId)
-                .withPriority(MAX_PRIORITY)
-                .add(new ObjectiveContext() {
-                    @Override
-                    public void onSuccess(Objective objective) {
-                        log.info("LLDP filter for {} {}.", port, (install) ? INSTALLED : REMOVED);
-                    }
-
-                    @Override
-                    public void onError(Objective objective, ObjectiveError error) {
-                        log.error("LLDP filter for {} failed {} because {}", port, (install) ? INSTALLATION : REMOVAL,
-                                error);
-                    }
-                });
-
-        flowObjectiveService.filter(port.deviceId(), lldp);
-    }
-
-    @Override
-    public ForwardingObjective.Builder createTransparentBuilder(AccessDevicePort uplinkPort,
-                                                                AccessDevicePort subscriberPort,
-                                                                MeterId meterId,
-                                                                UniTagInformation tagInfo,
-                                                                boolean upstream) {
-
+    private void processUpstreamDataFilteringObjects(DeviceId deviceId, Port port, Port nniPort,
+                                                     FlowOperation action,
+                                                     MeterId upstreamMeterId,
+                                                     MeterId upstreamOltMeterId,
+                                                     UniTagInformation uti) {
+        ServiceKey sk = new ServiceKey(new AccessDevicePort(port), uti);
         TrafficSelector selector = DefaultTrafficSelector.builder()
-                .matchVlanId(tagInfo.getPonSTag())
-                .matchInPort(upstream ? subscriberPort.number() : uplinkPort.number())
-                .matchInnerVlanId(tagInfo.getPonCTag())
-                .build();
-
-        TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
-        if (meterId != null) {
-            tBuilder.meter(meterId);
-        }
-
-        TrafficTreatment treatment = tBuilder
-                .setOutput(upstream ? uplinkPort.number() : subscriberPort.number())
-                .writeMetadata(createMetadata(upstream ? tagInfo.getPonSTag() : tagInfo.getPonCTag(),
-                        tagInfo.getTechnologyProfileId(),
-                        upstream ? uplinkPort.number() : subscriberPort.number()), 0)
-                .build();
-
-        return createForwardingObjectiveBuilder(selector, treatment, MIN_PRIORITY,
-                DefaultAnnotations.builder().build());
-    }
-
-    @Override
-    public ForwardingObjective.Builder createUpBuilder(AccessDevicePort uplinkPort,
-                                                       AccessDevicePort subscriberPort,
-                                                       MeterId upstreamMeterId,
-                                                       MeterId upstreamOltMeterId,
-                                                       UniTagInformation uniTagInformation) {
-
-        TrafficSelector selector = DefaultTrafficSelector.builder()
-                .matchInPort(subscriberPort.number())
-                .matchVlanId(uniTagInformation.getUniTagMatch())
+                .matchInPort(port.number())
+                .matchVlanId(uti.getUniTagMatch())
                 .build();
 
         TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
         //if the subscriberVlan (cTag) is different than ANY it needs to set.
-        if (uniTagInformation.getPonCTag().toShort() != VlanId.ANY_VALUE) {
+        if (uti.getPonCTag().toShort() != VlanId.ANY_VALUE) {
             treatmentBuilder.pushVlan()
-                    .setVlanId(uniTagInformation.getPonCTag());
+                    .setVlanId(uti.getPonCTag());
         }
 
-        if (uniTagInformation.getUsPonCTagPriority() != NO_PCP) {
-            treatmentBuilder.setVlanPcp((byte) uniTagInformation.getUsPonCTagPriority());
+        if (uti.getUsPonCTagPriority() != -1) {
+            treatmentBuilder.setVlanPcp((byte) uti.getUsPonCTagPriority());
         }
 
         treatmentBuilder.pushVlan()
-                .setVlanId(uniTagInformation.getPonSTag());
+                .setVlanId(uti.getPonSTag());
 
-        if (uniTagInformation.getUsPonSTagPriority() != NO_PCP) {
-            treatmentBuilder.setVlanPcp((byte) uniTagInformation.getUsPonSTagPriority());
+        if (uti.getUsPonSTagPriority() != -1) {
+            treatmentBuilder.setVlanPcp((byte) uti.getUsPonSTagPriority());
         }
 
-        treatmentBuilder.setOutput(uplinkPort.number())
-                .writeMetadata(createMetadata(uniTagInformation.getPonCTag(),
-                        uniTagInformation.getTechnologyProfileId(), uplinkPort.number()), 0L);
+        treatmentBuilder.setOutput(nniPort.number())
+                .writeMetadata(createMetadata(uti.getPonCTag(),
+                        uti.getTechnologyProfileId(), nniPort.number()), 0L);
 
         DefaultAnnotations.Builder annotationBuilder = DefaultAnnotations.builder();
 
@@ -855,55 +1253,84 @@
             annotationBuilder.set(UPSTREAM_OLT, upstreamOltMeterId.toString());
         }
 
-        return createForwardingObjectiveBuilder(selector, treatmentBuilder.build(), MIN_PRIORITY,
+        DefaultForwardingObjective.Builder flowBuilder = createForwardingObjectiveBuilder(selector,
+                treatmentBuilder.build(), MIN_PRIORITY,
                 annotationBuilder.build());
+
+        ObjectiveContext context = new ObjectiveContext() {
+            @Override
+            public void onSuccess(Objective objective) {
+                log.info("{} Upstream Data plane filter for {}.",
+                        completeFlowOpToString(action), sk);
+            }
+
+            @Override
+            public void onError(Objective objective, ObjectiveError error) {
+                log.error("Upstream Data plane filter for {} failed {} because {}.",
+                        sk, action, error);
+                updateConnectPointStatus(sk, null, OltFlowsStatus.ERROR, null);
+            }
+        };
+
+        ForwardingObjective flow = null;
+        if (action == FlowOperation.ADD) {
+            flow = flowBuilder.add(context);
+        } else if (action == FlowOperation.REMOVE) {
+            flow = flowBuilder.remove(context);
+        } else {
+            log.error("Flow action not supported: {}", action);
+        }
+
+        if (flow != null) {
+            flowObjectiveService.forward(deviceId, flow);
+        }
     }
 
-    @Override
-    public ForwardingObjective.Builder createDownBuilder(AccessDevicePort uplinkPort,
-                                                         AccessDevicePort subscriberPort,
-                                                         MeterId downstreamMeterId,
-                                                         MeterId downstreamOltMeterId,
-                                                         UniTagInformation tagInformation,
-                                                         Optional<MacAddress> macAddress) {
-
+    private void processDownstreamDataFilteringObjects(DeviceId deviceId, Port port, Port nniPort,
+                                                       FlowOperation action,
+                                                       MeterId downstreamMeterId,
+                                                       MeterId downstreamOltMeterId,
+                                                       UniTagInformation uti,
+                                                       MacAddress macAddress) {
+        ServiceKey sk = new ServiceKey(new AccessDevicePort(port), uti);
         //subscriberVlan can be any valid Vlan here including ANY to make sure the packet is tagged
         TrafficSelector.Builder selectorBuilder = DefaultTrafficSelector.builder()
-                .matchVlanId(tagInformation.getPonSTag())
-                .matchInPort(uplinkPort.number())
-                .matchInnerVlanId(tagInformation.getPonCTag());
+                .matchVlanId(uti.getPonSTag())
+                .matchInPort(nniPort.number())
+                .matchInnerVlanId(uti.getPonCTag());
 
-
-        if (tagInformation.getPonCTag().toShort() != VlanId.ANY_VALUE) {
-            selectorBuilder.matchMetadata(tagInformation.getPonCTag().toShort());
+        if (uti.getPonCTag().toShort() != VlanId.ANY_VALUE) {
+            selectorBuilder.matchMetadata(uti.getPonCTag().toShort());
         }
 
-        if (tagInformation.getDsPonSTagPriority() != NO_PCP) {
-            selectorBuilder.matchVlanPcp((byte) tagInformation.getDsPonSTagPriority());
+        if (uti.getDsPonCTagPriority() != -1) {
+            selectorBuilder.matchVlanPcp((byte) uti.getDsPonCTagPriority());
         }
 
-        macAddress.ifPresent(selectorBuilder::matchEthDst);
+        if (macAddress != null) {
+            selectorBuilder.matchEthDst(macAddress);
+        }
 
         TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder()
                 .popVlan()
-                .setOutput(subscriberPort.number());
+                .setOutput(port.number());
 
-        treatmentBuilder.writeMetadata(createMetadata(tagInformation.getPonCTag(),
-                                                      tagInformation.getTechnologyProfileId(),
-                                                      subscriberPort.number()), 0);
+        treatmentBuilder.writeMetadata(createMetadata(uti.getPonCTag(),
+                uti.getTechnologyProfileId(),
+                port.number()), 0);
 
         // Upstream pbit is used to remark inner vlan pbit.
         // Upstream is used to avoid trusting the BNG to send the packet with correct pbit.
         // this is done because ds mode 0 is used because ds mode 3 or 6 that allow for
         // all pbit acceptance are not widely supported by vendors even though present in
         // the OMCI spec.
-        if (tagInformation.getUsPonCTagPriority() != NO_PCP) {
-            treatmentBuilder.setVlanPcp((byte) tagInformation.getUsPonCTagPriority());
+        if (uti.getUsPonCTagPriority() != -1) {
+            treatmentBuilder.setVlanPcp((byte) uti.getUsPonCTagPriority());
         }
 
-        if (!VlanId.NONE.equals(tagInformation.getUniTagMatch()) &&
-                tagInformation.getPonCTag().toShort() != VlanId.ANY_VALUE) {
-            treatmentBuilder.setVlanId(tagInformation.getUniTagMatch());
+        if (!VlanId.NONE.equals(uti.getUniTagMatch()) &&
+                uti.getPonCTag().toShort() != VlanId.ANY_VALUE) {
+            treatmentBuilder.setVlanId(uti.getUniTagMatch());
         }
 
         DefaultAnnotations.Builder annotationBuilder = DefaultAnnotations.builder();
@@ -918,13 +1345,39 @@
             annotationBuilder.set(DOWNSTREAM_OLT, downstreamOltMeterId.toString());
         }
 
-        return createForwardingObjectiveBuilder(selectorBuilder.build(), treatmentBuilder.build(), MIN_PRIORITY,
-                annotationBuilder.build());
-    }
+        DefaultForwardingObjective.Builder flowBuilder = createForwardingObjectiveBuilder(selectorBuilder.build(),
+                treatmentBuilder.build(), MIN_PRIORITY, annotationBuilder.build());
 
-    @Override
-    public void clearDeviceState(DeviceId deviceId) {
-        pendingEapolForDevice.remove(deviceId);
+        ObjectiveContext context = new ObjectiveContext() {
+            @Override
+            public void onSuccess(Objective objective) {
+                log.info("{} Downstream Data plane filter for {}.",
+                        completeFlowOpToString(action), sk);
+            }
+
+            @Override
+            public void onError(Objective objective, ObjectiveError error) {
+                log.info("Downstream Data plane filter for {} failed {} because {}.",
+                        sk, action, error);
+                updateConnectPointStatus(sk, null, OltFlowsStatus.ERROR, null);
+            }
+        };
+
+        ForwardingObjective flow = null;
+        if (action == FlowOperation.ADD) {
+            flow = flowBuilder.add(context);
+        } else if (action == FlowOperation.REMOVE) {
+            flow = flowBuilder.remove(context);
+        } else {
+            log.error("Flow action not supported: {}", action);
+        }
+
+        if (flow != null) {
+            if (log.isTraceEnabled()) {
+                log.trace("Forwarding rule {}", flow);
+            }
+            flowObjectiveService.forward(deviceId, flow);
+        }
     }
 
     private DefaultForwardingObjective.Builder createForwardingObjectiveBuilder(TrafficSelector selector,
@@ -941,70 +1394,6 @@
                 .withTreatment(treatment);
     }
 
-    /**
-     * Returns the write metadata value including tech profile reference and innerVlan.
-     * For param cVlan, null can be sent
-     *
-     * @param cVlan                 c (customer) tag of one subscriber
-     * @param techProfileId         tech profile id of one subscriber
-     * @param upstreamOltMeterId    upstream meter id for OLT device.
-     * @return the write metadata value including tech profile reference and innerVlan
-     */
-    private Long createTechProfValueForWm(VlanId cVlan, int techProfileId, MeterId upstreamOltMeterId) {
-        Long writeMetadata;
-
-        if (cVlan == null || VlanId.NONE.equals(cVlan)) {
-            writeMetadata = (long) techProfileId << 32;
-        } else {
-            writeMetadata = ((long) (cVlan.id()) << 48 | (long) techProfileId << 32);
-        }
-        if (upstreamOltMeterId == null) {
-            return writeMetadata;
-        } else {
-            return writeMetadata | upstreamOltMeterId.id();
-        }
-    }
-
-    private BandwidthProfileInformation getBandwidthProfileInformation(String bandwidthProfile) {
-        if (bpService == null) {
-            log.warn(SADIS_NOT_RUNNING);
-            return null;
-        }
-        if (bandwidthProfile == null) {
-            return null;
-        }
-        return bpService.get(bandwidthProfile);
-    }
-
-    /**
-     * It will be used to support AT&T use case (for EAPOL flows).
-     * If multiple services are found in uniServiceList, returns default tech profile id
-     * If one service is found, returns the found one
-     *
-     * @param port uni port
-     * @return the default technology profile id
-     */
-    private int getDefaultTechProfileId(AccessDevicePort port) {
-        if (subsService == null) {
-            log.warn(SADIS_NOT_RUNNING);
-            return defaultTechProfileId;
-        }
-        if (port != null) {
-            SubscriberAndDeviceInformation info = subsService.get(port.name());
-            if (info != null && info.uniTagList().size() == 1) {
-                return info.uniTagList().get(0).getTechnologyProfileId();
-            }
-        }
-        return defaultTechProfileId;
-    }
-
-    /**
-     * Write metadata instruction value (metadata) is 8 bytes.
-     * <p>
-     * MS 2 bytes: C Tag
-     * Next 2 bytes: Technology Profile Id
-     * Next 4 bytes: Port number (uni or nni)
-     */
     private Long createMetadata(VlanId innerVlan, int techProfileId, PortNumber egressPort) {
         if (techProfileId == NONE_TP_ID) {
             techProfileId = DEFAULT_TP_ID_DEFAULT;
@@ -1013,5 +1402,318 @@
         return ((long) (innerVlan.id()) << 48 | (long) techProfileId << 32) | egressPort.toLong();
     }
 
+    private boolean isMacLearningEnabled(SubscriberAndDeviceInformation si) {
+        AtomicBoolean requiresMacLearning = new AtomicBoolean();
+        requiresMacLearning.set(false);
 
+        si.uniTagList().forEach(uniTagInfo -> {
+            if (uniTagInfo.getEnableMacLearning()) {
+                requiresMacLearning.set(true);
+            }
+        });
+
+        return requiresMacLearning.get();
+    }
+
+    /**
+     * Checks whether the subscriber has the MacAddress configured or discovered.
+     *
+     * @param deviceId DeviceId for this subscriber
+     * @param port     Port for this subscriber
+     * @param si       SubscriberAndDeviceInformation
+     * @return boolean
+     */
+    protected boolean isMacAddressAvailable(DeviceId deviceId, Port port, SubscriberAndDeviceInformation si) {
+        AtomicBoolean isConfigured = new AtomicBoolean();
+        isConfigured.set(true);
+
+        si.uniTagList().forEach(uniTagInfo -> {
+            boolean isMacLearningEnabled = uniTagInfo.getEnableMacLearning();
+            boolean configureMac = isMacAddressValid(uniTagInfo);
+            boolean discoveredMac = false;
+            Optional<Host> optHost = hostService.getConnectedHosts(new ConnectPoint(deviceId, port.number()))
+                    .stream().filter(host -> host.vlan().equals(uniTagInfo.getPonCTag())).findFirst();
+            if (optHost.isPresent() && optHost.get().mac() != null) {
+                discoveredMac = true;
+            }
+            if (isMacLearningEnabled && !configureMac && !discoveredMac) {
+                log.debug("Awaiting for macAddress on {} for service {}",
+                        portWithName(port), uniTagInfo.getServiceName());
+                isConfigured.set(false);
+            }
+        });
+
+        return isConfigured.get();
+    }
+
+    protected MacAddress getMacAddress(DeviceId deviceId, Port port, UniTagInformation uniTagInfo) {
+        boolean configuredMac = isMacAddressValid(uniTagInfo);
+        if (configuredMac) {
+            return MacAddress.valueOf(uniTagInfo.getConfiguredMacAddress());
+        } else if (uniTagInfo.getEnableMacLearning()) {
+            Optional<Host> optHost = hostService.getConnectedHosts(new ConnectPoint(deviceId, port.number()))
+                    .stream().filter(host -> host.vlan().equals(uniTagInfo.getPonCTag())).findFirst();
+            if (optHost.isPresent() && optHost.get().mac() != null) {
+                return optHost.get().mac();
+            }
+        }
+        return null;
+    }
+
+    private boolean isMacAddressValid(UniTagInformation tagInformation) {
+        return tagInformation.getConfiguredMacAddress() != null &&
+                !tagInformation.getConfiguredMacAddress().trim().equals("") &&
+                !MacAddress.NONE.equals(MacAddress.valueOf(tagInformation.getConfiguredMacAddress()));
+    }
+
+    protected void updateConnectPointStatus(ServiceKey key, OltFlowsStatus eapolStatus,
+                                            OltFlowsStatus subscriberFlowsStatus, OltFlowsStatus dhcpStatus) {
+        try {
+            cpStatusWriteLock.lock();
+            OltPortStatus status = cpStatus.get(key);
+
+            if (status == null) {
+                status = new OltPortStatus(
+                        eapolStatus != null ? eapolStatus : OltFlowsStatus.NONE,
+                        subscriberFlowsStatus != null ? subscriberFlowsStatus : OltFlowsStatus.NONE,
+                        dhcpStatus != null ? dhcpStatus : OltFlowsStatus.NONE
+                );
+            } else {
+                if (eapolStatus != null) {
+                    status.defaultEapolStatus = eapolStatus;
+                }
+                if (subscriberFlowsStatus != null) {
+                    status.subscriberFlowsStatus = subscriberFlowsStatus;
+                }
+                if (dhcpStatus != null) {
+                    status.dhcpStatus = dhcpStatus;
+                }
+            }
+
+            cpStatus.put(key, status);
+        } finally {
+            cpStatusWriteLock.unlock();
+        }
+    }
+
+    protected class InternalFlowListener implements FlowRuleListener {
+        @Override
+        public void event(FlowRuleEvent event) {
+            if (appId.id() != (event.subject().appId())) {
+                return;
+            }
+
+            if (!oltDeviceService.isLocalLeader(event.subject().deviceId())) {
+                if (log.isTraceEnabled()) {
+                    log.trace("ignoring flow event {} " +
+                            "as not leader for {}", event, event.subject().deviceId());
+                }
+                return;
+            }
+
+            switch (event.type()) {
+                case RULE_ADDED:
+                case RULE_REMOVED:
+                    Port port = getCpFromFlowRule(event.subject());
+                    if (port == null) {
+                        log.error("Can't find port in flow {}", event.subject());
+                        return;
+                    }
+                    if (log.isTraceEnabled()) {
+                        log.trace("flow event {} on cp {}: {}", event.type(),
+                                portWithName(port), event.subject());
+                    }
+                    updateCpStatus(event.type(), port, event.subject());
+                    return;
+                case RULE_ADD_REQUESTED:
+                case RULE_REMOVE_REQUESTED:
+                    // NOTE that PENDING_ADD/REMOVE is set when we create the flowObjective
+                    return;
+                default:
+                    return;
+            }
+        }
+
+        protected void updateCpStatus(FlowRuleEvent.Type type, Port port, FlowRule flowRule) {
+            OltFlowsStatus status = flowRuleStatusToOltFlowStatus(type);
+            if (isDefaultEapolFlow(flowRule)) {
+                ServiceKey sk = new ServiceKey(new AccessDevicePort(port),
+                        defaultEapolUniTag);
+                if (log.isTraceEnabled()) {
+                    log.trace("update defaultEapolStatus {} on {}", status, sk);
+                }
+                updateConnectPointStatus(sk, status, null, null);
+            } else if (isDhcpFlow(flowRule)) {
+                ServiceKey sk = getSubscriberKeyFromFlowRule(flowRule);
+                if (sk == null) {
+                    return;
+                }
+                if (log.isTraceEnabled()) {
+                    log.trace("update dhcpStatus {} on {}", status, sk);
+                }
+                updateConnectPointStatus(sk, null, null, status);
+            } else if (isDataFlow(flowRule)) {
+
+                if (oltDeviceService.isNniPort(deviceService.getDevice(flowRule.deviceId()),
+                        getCpFromFlowRule(flowRule).number())) {
+                    // the NNI has data-plane for every subscriber, doesn't make sense to track them
+                    return;
+                }
+
+                ServiceKey sk = getSubscriberKeyFromFlowRule(flowRule);
+                if (sk == null) {
+                    return;
+                }
+                if (log.isTraceEnabled()) {
+                    log.trace("update dataplaneStatus {} on {}", status, sk);
+                }
+                updateConnectPointStatus(sk, null, status, null);
+            }
+        }
+
+        private boolean isDefaultEapolFlow(FlowRule flowRule) {
+            EthTypeCriterion c = (EthTypeCriterion) flowRule.selector().getCriterion(Criterion.Type.ETH_TYPE);
+            if (c == null) {
+                return false;
+            }
+            if (c.ethType().equals(EthType.EtherType.EAPOL.ethType())) {
+                AtomicBoolean isDefault = new AtomicBoolean(false);
+                flowRule.treatment().allInstructions().forEach(instruction -> {
+                    if (instruction.type() == L2MODIFICATION) {
+                        L2ModificationInstruction modificationInstruction = (L2ModificationInstruction) instruction;
+                        if (modificationInstruction.subtype() == L2ModificationInstruction.L2SubType.VLAN_ID) {
+                            L2ModificationInstruction.ModVlanIdInstruction vlanInstruction =
+                                    (L2ModificationInstruction.ModVlanIdInstruction) modificationInstruction;
+                            if (vlanInstruction.vlanId().id().equals(EAPOL_DEFAULT_VLAN)) {
+                                isDefault.set(true);
+                                return;
+                            }
+                        }
+                    }
+                });
+                return isDefault.get();
+            }
+            return false;
+        }
+
+        /**
+         * Returns true if the flow is a DHCP flow.
+         * Matches both upstream and downstream flows.
+         *
+         * @param flowRule The FlowRule to evaluate
+         * @return boolean
+         */
+        private boolean isDhcpFlow(FlowRule flowRule) {
+            IPProtocolCriterion ipCriterion = (IPProtocolCriterion) flowRule.selector()
+                    .getCriterion(Criterion.Type.IP_PROTO);
+            if (ipCriterion == null) {
+                return false;
+            }
+
+            UdpPortCriterion src = (UdpPortCriterion) flowRule.selector().getCriterion(Criterion.Type.UDP_SRC);
+
+            if (src == null) {
+                return false;
+            }
+            return ipCriterion.protocol() == IPv4.PROTOCOL_UDP &&
+                    (src.udpPort().toInt() == 68 || src.udpPort().toInt() == 67);
+        }
+
+        private boolean isDataFlow(FlowRule flowRule) {
+            // we consider subscriber flows the one that matches on VLAN_VID
+            // method is valid only because it's the last check after EAPOL and DHCP.
+            // this matches mcast flows as well, if we want to avoid that we can
+            // filter out the elements that have groups in the treatment or
+            // mcastIp in the selector
+            // IPV4_DST:224.0.0.22/32
+            // treatment=[immediate=[GROUP:0x1]]
+
+            return flowRule.selector().getCriterion(Criterion.Type.VLAN_VID) != null;
+        }
+
+        private Port getCpFromFlowRule(FlowRule flowRule) {
+            DeviceId deviceId = flowRule.deviceId();
+            PortCriterion inPort = (PortCriterion) flowRule.selector().getCriterion(Criterion.Type.IN_PORT);
+            if (inPort != null) {
+                PortNumber port = inPort.port();
+                return deviceService.getPort(deviceId, port);
+            }
+            return null;
+        }
+
+        private ServiceKey getSubscriberKeyFromFlowRule(FlowRule flowRule) {
+            Port flowPort = getCpFromFlowRule(flowRule);
+            SubscriberAndDeviceInformation si = subsService.get(getPortName(flowPort));
+
+            Boolean isNni = oltDeviceService.isNniPort(deviceService.getDevice(flowRule.deviceId()), flowPort.number());
+            if (si == null && !isNni) {
+                log.error("Subscriber information not found in sadis for port {}", portWithName(flowPort));
+                return null;
+            }
+
+            if (isNni) {
+                return new ServiceKey(new AccessDevicePort(flowPort), nniUniTag);
+            }
+
+            Optional<UniTagInformation> found = Optional.empty();
+            VlanId flowVlan = null;
+            if (isDhcpFlow(flowRule)) {
+                // we need to make a special case for DHCP as in the ATT workflow DHCP flows don't match on tags
+                L2ModificationInstruction.ModVlanIdInstruction instruction =
+                        (L2ModificationInstruction.ModVlanIdInstruction) flowRule.treatment().immediate().get(1);
+                flowVlan = instruction.vlanId();
+            } else {
+                // for now we assume that if it's not DHCP it's dataplane (or at least tagged)
+                VlanIdCriterion vlanIdCriterion =
+                        (VlanIdCriterion) flowRule.selector().getCriterion(Criterion.Type.VLAN_VID);
+                if (vlanIdCriterion == null) {
+                    log.warn("cannot match the flow to a subscriber service as it does not carry vlans");
+                    return null;
+                }
+                flowVlan = vlanIdCriterion.vlanId();
+            }
+
+            VlanId finalFlowVlan = flowVlan;
+            found = si.uniTagList().stream().filter(uti ->
+                    uti.getPonCTag().equals(finalFlowVlan) ||
+                            uti.getPonSTag().equals(finalFlowVlan) ||
+                            uti.getUniTagMatch().equals(finalFlowVlan)
+            ).findFirst();
+
+
+            if (found.isEmpty()) {
+                log.warn("Cannot map flow rule {} to Service in {}", flowRule, si);
+            }
+
+            return found.isPresent() ? new ServiceKey(new AccessDevicePort(flowPort), found.get()) : null;
+
+        }
+
+        private OltFlowsStatus flowRuleStatusToOltFlowStatus(FlowRuleEvent.Type type) {
+            switch (type) {
+                case RULE_ADD_REQUESTED:
+                    return OltFlowsStatus.PENDING_ADD;
+                case RULE_ADDED:
+                    return OltFlowsStatus.ADDED;
+                case RULE_REMOVE_REQUESTED:
+                    return OltFlowsStatus.PENDING_REMOVE;
+                case RULE_REMOVED:
+                    return OltFlowsStatus.REMOVED;
+                default:
+                    return OltFlowsStatus.NONE;
+            }
+        }
+    }
+
+    protected void bindSadisService(SadisService service) {
+        this.subsService = service.getSubscriberInfoService();
+        this.bpService = service.getBandwidthProfileService();
+        log.info("Sadis service is loaded");
+    }
+
+    protected void unbindSadisService(SadisService service) {
+        this.subsService = null;
+        this.bpService = null;
+        log.info("Sadis service is unloaded");
+    }
 }
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltFlowServiceInterface.java b/impl/src/main/java/org/opencord/olt/impl/OltFlowServiceInterface.java
new file mode 100644
index 0000000..e55c789
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/impl/OltFlowServiceInterface.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.impl;
+
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Port;
+import org.opencord.sadis.UniTagInformation;
+
+import java.util.Map;
+
+/**
+ * Interface for flow installation/removal methods for different types of traffic.
+ */
+public interface OltFlowServiceInterface {
+    /**
+     * Installs or removes default flows for the port to trap to controller.
+     * @param sub the information about the port
+     * @param defaultBpId the default bandwidth profile
+     * @param oltBandwidthProfile the olt bandwidth profile.
+     * @return true if successful
+     */
+    boolean handleBasicPortFlows(
+            DiscoveredSubscriber sub, String defaultBpId, String oltBandwidthProfile);
+
+    /**
+     * Installs or removes subscriber specific flows.
+     * @param sub the information about the subscriber
+     * @param defaultBpId the default bandwidth profile
+     * @param multicastServiceName the multicast service name.
+     * @return true if successful
+     */
+    boolean handleSubscriberFlows(DiscoveredSubscriber sub, String defaultBpId, String multicastServiceName);
+
+    /**
+     * Installs or removes flows on the NNI port.
+     * @param device the OLT
+     * @param port the NNI port
+     * @param action the operatio, ADD or REMOVE.
+     */
+    void handleNniFlows(Device device, Port port, OltFlowService.FlowOperation action);
+
+    /**
+     * Checks if the default eapol flow is already installed.
+     * @param port the port
+     * @return true if installed, false otherwise.
+     */
+    boolean hasDefaultEapol(Port port);
+
+    /**
+     * Checks if the dhcp flows are installed.
+     * @param port the port
+     * @param uti the UniTagInformation to check for
+     * @return true if installed, false otherwise.
+     */
+    boolean hasDhcpFlows(Port port, UniTagInformation uti);
+
+    /**
+     * Checks if the subscriber flows are installed.
+     * @param port the port
+     * @param uti the UniTagInformation to check for
+     * @return true if installed, false otherwise.
+     */
+    boolean hasSubscriberFlows(Port port, UniTagInformation uti);
+
+    /**
+     * Removes all device flows.
+     * @param deviceId the olt.
+     */
+    void purgeDeviceFlows(DeviceId deviceId);
+
+    /**
+     * Return the status of installation on the connect points.
+     * @return the status map
+     */
+    Map<ServiceKey, OltPortStatus> getConnectPointStatus();
+
+    /**
+     * Returns all the programmed subscribers.
+     * @return the subscribers
+     */
+    Map<ServiceKey, UniTagInformation> getProgrammedSubscribers();
+
+    /**
+     * Returns the list of requested subscribers to be installed with status.
+     * @return the list
+     */
+    Map<ServiceKey, Boolean> getRequestedSubscribers();
+
+    /**
+     * Returns if a subscriber on a port is provisioned or not.
+     * @param cp the port
+     * @return true if any service on that port is provisioned, false otherwise
+     */
+    boolean isSubscriberServiceProvisioned(AccessDevicePort cp);
+
+    /**
+     * Returns if a subscriber on a port is provisioned or not.
+     * @param sk the SubscriberKey
+     * @return true if provisioned, false otherwise
+     */
+    boolean isSubscriberServiceProvisioned(ServiceKey sk);
+
+    /**
+     * Updates the subscriber provisioning status.
+     * @param sk the SubscriberKey
+     * @param status the next status
+     */
+    void updateProvisionedSubscriberStatus(ServiceKey sk, Boolean status);
+}
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java b/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java
index 4b88344..be04449 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-present Open Networking Foundation
+ * Copyright 2021-present Open Networking Foundation
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -13,47 +13,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.opencord.olt.impl;
 
-import static com.google.common.base.Strings.isNullOrEmpty;
-import static java.util.stream.Collectors.collectingAndThen;
-import static java.util.stream.Collectors.groupingBy;
-import static java.util.stream.Collectors.mapping;
-import static java.util.stream.Collectors.toSet;
-import static org.onlab.util.Tools.get;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.opencord.olt.impl.OsgiPropertyConstants.*;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Dictionary;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-
+import com.google.common.collect.ImmutableMap;
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.LeadershipService;
-import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
-import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.DeviceId;
-import org.onosproject.net.device.DeviceService;
-import org.onosproject.net.flowobjective.ObjectiveError;
 import org.onosproject.net.meter.Band;
 import org.onosproject.net.meter.DefaultBand;
 import org.onosproject.net.meter.DefaultMeterRequest;
@@ -66,12 +35,14 @@
 import org.onosproject.net.meter.MeterListener;
 import org.onosproject.net.meter.MeterRequest;
 import org.onosproject.net.meter.MeterService;
+import org.onosproject.net.meter.MeterState;
 import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.ConsistentMultimap;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
-import org.opencord.olt.internalapi.AccessDeviceMeterService;
 import org.opencord.sadis.BandwidthProfileInformation;
+import org.opencord.sadis.BaseInformationService;
+import org.opencord.sadis.SadisService;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
@@ -79,219 +50,386 @@
 import org.osgi.service.component.annotations.Modified;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.ReferencePolicy;
 import org.slf4j.Logger;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-/**
- * Provisions Meters on access devices.
- */
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.opencord.olt.impl.OsgiPropertyConstants.*;
+import static org.slf4j.LoggerFactory.getLogger;
+
 @Component(immediate = true, property = {
         DELETE_METERS + ":Boolean=" + DELETE_METERS_DEFAULT,
         ZERO_REFERENCE_METER_COUNT + ":Integer=" + ZERO_REFERENCE_METER_COUNT_DEFAULT,
-        })
-public class OltMeterService implements AccessDeviceMeterService {
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected MeterService meterService;
+})
+public class OltMeterService implements OltMeterServiceInterface {
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected CoreService coreService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected ComponentConfigService componentConfigService;
+    protected ComponentConfigService cfgService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected StorageService storageService;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected DeviceService deviceService;
+    @Reference(cardinality = ReferenceCardinality.OPTIONAL,
+            bind = "bindSadisService",
+            unbind = "unbindSadisService",
+            policy = ReferencePolicy.DYNAMIC)
+    protected volatile SadisService sadisService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected ClusterService clusterService;
+    protected MeterService meterService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected MastershipService mastershipService;
+    protected OltDeviceServiceInterface oltDeviceService;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected LeadershipService leadershipService;
+    private final Logger log = getLogger(getClass());
+    protected BaseInformationService<BandwidthProfileInformation> bpService;
+    private ApplicationId appId;
+    private static final String APP_NAME = "org.opencord.olt";
+    private final ReentrantReadWriteLock programmedMeterLock = new ReentrantReadWriteLock();
+    private final Lock programmedMeterWriteLock = programmedMeterLock.writeLock();
+    private final Lock programmedMeterReadLock = programmedMeterLock.readLock();
+
+    /**
+     * Programmed Meters status map.
+     * Keeps track of which meter is programmed on which device for which BandwidthProfile.
+     * The String key is the BandwidthProfile
+     */
+    protected Map<DeviceId, Map<String, MeterData>> programmedMeters;
+
+    private final MeterListener meterListener = new InternalMeterListener();
+    protected ExecutorService pendingRemovalMetersExecutor =
+            Executors.newFixedThreadPool(5, groupedThreads("onos/olt",
+                                                           "pending-removal-meters-%d", log));
+
+    /**
+     * Map that contains a list of meters that needs to be removed.
+     * We wait to get 3 METER_REFERENCE_COUNT_ZERO events before removing the meter
+     * so that we're sure no flow is referencing it.
+     */
+    protected Map<DeviceId, Map<MeterKey, AtomicInteger>> pendingRemoveMeters;
+
+    /**
+     * Number of consecutive meter events with empty reference count
+     * after which a meter gets removed from the device.
+     */
+    protected int zeroReferenceMeterCount = 3;
 
     /**
      * Delete meters when reference count drops to zero.
      */
     protected boolean deleteMeters = DELETE_METERS_DEFAULT;
 
-    /**
-     * Number of Zero References received before deleting the meter.
-     */
-    protected int zeroReferenceMeterCount = ZERO_REFERENCE_METER_COUNT_DEFAULT;
-
-    private ApplicationId appId;
-    private static final String APP_NAME = "org.opencord.olt";
-
-    private final MeterListener meterListener = new InternalMeterListener();
-
-    private final Logger log = getLogger(getClass());
-
-    protected ExecutorService eventExecutor;
-
-    protected Map<DeviceId, Set<BandwidthProfileInformation>> pendingMeters;
-    protected Map<DeviceId, Map<MeterKey, AtomicInteger>> pendingRemoveMeters;
-    protected ConsistentMultimap<String, MeterKey> bpInfoToMeter;
-
     @Activate
     public void activate(ComponentContext context) {
-        eventExecutor = Executors.newFixedThreadPool(5, groupedThreads("onos/olt",
-                "events-%d", log));
         appId = coreService.registerApplication(APP_NAME);
         modified(context);
-
         KryoNamespace serializer = KryoNamespace.newBuilder()
                 .register(KryoNamespaces.API)
+                .register(List.class)
+                .register(MeterData.class)
+                .register(MeterState.class)
                 .register(MeterKey.class)
-                .register(BandwidthProfileInformation.class)
                 .build();
 
-        bpInfoToMeter = storageService.<String, MeterKey>consistentMultimapBuilder()
-                .withName("volt-bp-info-to-meter")
-                .withSerializer(Serializer.using(serializer))
-                .withApplicationId(appId)
-                .build();
-
-        meterService.addListener(meterListener);
-        componentConfigService.registerProperties(getClass());
-        pendingMeters = storageService.<DeviceId, Set<BandwidthProfileInformation>>consistentMapBuilder()
-                .withName("volt-pending-meters")
+        programmedMeters = storageService.<DeviceId, Map<String, MeterData>>consistentMapBuilder()
+                .withName("volt-programmed-meters")
                 .withSerializer(Serializer.using(serializer))
                 .withApplicationId(appId)
                 .build().asJavaMap();
+
         pendingRemoveMeters = storageService.<DeviceId, Map<MeterKey, AtomicInteger>>consistentMapBuilder()
                 .withName("volt-pending-remove-meters")
                 .withSerializer(Serializer.using(serializer))
                 .withApplicationId(appId)
                 .build().asJavaMap();
-        log.info("Olt Meter service started");
-    }
 
-    @Deactivate
-    public void deactivate() {
-        meterService.removeListener(meterListener);
-    }
+        cfgService.registerProperties(getClass());
 
+        meterService.addListener(meterListener);
+
+        log.info("Started");
+    }
 
     @Modified
     public void modified(ComponentContext context) {
         Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
 
-        Boolean d = Tools.isPropertyEnabled(properties, "deleteMeters");
+        Boolean d = Tools.isPropertyEnabled(properties, DELETE_METERS);
         if (d != null) {
             deleteMeters = d;
         }
 
-        String zeroReferenceMeterCountNew = get(properties, ZERO_REFERENCE_METER_COUNT);
-        zeroReferenceMeterCount = isNullOrEmpty(zeroReferenceMeterCountNew) ? ZERO_REFERENCE_METER_COUNT_DEFAULT :
-                Integer.parseInt(zeroReferenceMeterCountNew.trim());
+        String zeroCount = get(properties, ZERO_REFERENCE_METER_COUNT);
+        int oldSubscriberProcessingThreads = zeroReferenceMeterCount;
+        zeroReferenceMeterCount = isNullOrEmpty(zeroCount) ?
+                oldSubscriberProcessingThreads : Integer.parseInt(zeroCount.trim());
 
+        log.info("Modified. Values = deleteMeters: {}, zeroReferenceMeterCount: {}",
+                 deleteMeters, zeroReferenceMeterCount);
+    }
+
+    @Deactivate
+    public void deactivate(ComponentContext context) {
+        cfgService.unregisterProperties(getClass(), false);
+        meterService.removeListener(meterListener);
+        log.info("Stopped");
     }
 
     @Override
-    public ImmutableMap<String, Collection<MeterKey>> getBpMeterMappings() {
-        return bpInfoToMeter.stream()
-                .collect(collectingAndThen(
-                        groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toSet())),
-                        ImmutableMap::copyOf));
+    public Map<DeviceId, Map<String, MeterData>> getProgrammedMeters() {
+        try {
+            programmedMeterReadLock.lock();
+            return ImmutableMap.copyOf(programmedMeters);
+        } finally {
+            programmedMeterReadLock.unlock();
+        }
     }
 
-    boolean addMeterIdToBpMapping(DeviceId deviceId, MeterId meterId, String bandwidthProfile) {
-        log.debug("adding bp {} to meter {} mapping for device {}",
-                 bandwidthProfile, meterId, deviceId);
-        return bpInfoToMeter.put(bandwidthProfile, MeterKey.key(deviceId, meterId));
+    /**
+     * Will create a meter if needed and return true once available.
+     *
+     * @param deviceId         DeviceId
+     * @param bandwidthProfile Bandwidth Profile Id
+     * @return true
+     */
+    @Override
+    public synchronized boolean createMeter(DeviceId deviceId, String bandwidthProfile) {
+
+        // NOTE it is possible that hasMeterByBandwidthProfile returns false has the meter is in PENDING_ADD
+        // then a different thread changes the meter to ADDED
+        // and thus hasPendingMeterByBandwidthProfile return false as well and we install the meter a second time
+        // this causes an inconsistency between the existing meter and meterId stored in the map
+
+        if (!hasMeterByBandwidthProfile(deviceId, bandwidthProfile)) {
+            // NOTE this is at trace level as it's constantly called by the queue processor
+            if (log.isTraceEnabled()) {
+                log.trace("Missing meter for BandwidthProfile {} on device {}", bandwidthProfile, deviceId);
+            }
+
+            if (!hasPendingMeterByBandwidthProfile(deviceId, bandwidthProfile)) {
+                createMeterForBp(deviceId, bandwidthProfile);
+            }
+            if (log.isTraceEnabled()) {
+                log.trace("Meter is not yet available for {} on device {}",
+                          bandwidthProfile, deviceId);
+            }
+            return false;
+        }
+        log.debug("Meter found for BandwidthProfile {} on device {}", bandwidthProfile, deviceId);
+        return true;
     }
 
     @Override
-    public MeterId getMeterIdFromBpMapping(DeviceId deviceId, String bandwidthProfile) {
-        if (bandwidthProfile == null) {
-            log.warn("Bandwidth Profile requested is null");
-            return null;
+    public boolean createMeters(DeviceId deviceId, SubscriberAndDeviceInformation si) {
+        // Each UniTagInformation has up to 4 meters,
+        // check and/or create all of them
+        AtomicBoolean waitingOnMeter = new AtomicBoolean();
+        waitingOnMeter.set(false);
+        Map<String, List<String>> pendingMeters = new HashMap<>();
+        si.uniTagList().forEach(uniTagInfo -> {
+            String serviceName = uniTagInfo.getServiceName();
+            pendingMeters.put(serviceName, new LinkedList<>());
+            String usBp = uniTagInfo.getUpstreamBandwidthProfile();
+            String dsBp = uniTagInfo.getDownstreamBandwidthProfile();
+            String oltUBp = uniTagInfo.getDownstreamOltBandwidthProfile();
+            String oltDsBp = uniTagInfo.getUpstreamOltBandwidthProfile();
+            if (!createMeter(deviceId, usBp)) {
+                pendingMeters.get(serviceName).add(usBp);
+                waitingOnMeter.set(true);
+            }
+            if (!createMeter(deviceId, dsBp)) {
+                pendingMeters.get(serviceName).add(usBp);
+                waitingOnMeter.set(true);
+            }
+            if (!createMeter(deviceId, oltUBp)) {
+                pendingMeters.get(serviceName).add(usBp);
+                waitingOnMeter.set(true);
+            }
+            if (!createMeter(deviceId, oltDsBp)) {
+                pendingMeters.get(serviceName).add(usBp);
+                waitingOnMeter.set(true);
+            }
+        });
+        if (waitingOnMeter.get()) {
+            if (log.isTraceEnabled()) {
+                log.trace("Meters {} on device {} are not " +
+                                  "installed yet (requested by subscriber {})",
+                          pendingMeters, deviceId, si.id());
+            }
+            return false;
         }
-        if (bpInfoToMeter.get(bandwidthProfile) == null) {
-            log.warn("Bandwidth Profile '{}' is not present in the map",
-                     bandwidthProfile);
-            return null;
-        }
-        if (bpInfoToMeter.get(bandwidthProfile).value().isEmpty()) {
-            log.warn("Bandwidth Profile '{}' is not currently mapped to a meter",
-                    bandwidthProfile);
-            return null;
-        }
+        return true;
+    }
 
-        Optional<? extends MeterKey> meterKeyForDevice = bpInfoToMeter.get(bandwidthProfile).value()
-                .stream()
-                .filter(meterKey -> meterKey.deviceId().equals(deviceId))
-                .findFirst();
-        if (meterKeyForDevice.isPresent()) {
-            log.debug("Found meter {} for bandwidth profile {} on {}",
-                    meterKeyForDevice.get().meterId(), bandwidthProfile, deviceId);
-            return meterKeyForDevice.get().meterId();
-        } else {
-            log.warn("Bandwidth Profile '{}' is not currently mapped to a meter on {} , {}",
-                     bandwidthProfile, deviceId, bpInfoToMeter.get(bandwidthProfile).value());
-            return null;
+    /**
+     * Returns true if a meter is present in the programmed meters map, only if status is ADDED.
+     *
+     * @param deviceId         the DeviceId on which to look for the meter
+     * @param bandwidthProfile the Bandwidth profile associated with this meter
+     * @return true if the meter is found
+     */
+    public boolean hasMeterByBandwidthProfile(DeviceId deviceId, String bandwidthProfile) {
+        try {
+            programmedMeterReadLock.lock();
+            Map<String, MeterData> metersOnDevice = programmedMeters.get(deviceId);
+            if (metersOnDevice == null || metersOnDevice.isEmpty()) {
+                return false;
+            }
+            if (log.isTraceEnabled()) {
+                log.trace("added metersOnDevice {}: {}", deviceId, metersOnDevice);
+            }
+            return metersOnDevice.get(bandwidthProfile) != null &&
+                    metersOnDevice.get(bandwidthProfile).getMeterStatus().equals(MeterState.ADDED);
+        } finally {
+            programmedMeterReadLock.unlock();
+        }
+    }
+
+    public boolean hasPendingMeterByBandwidthProfile(DeviceId deviceId, String bandwidthProfile) {
+        try {
+            programmedMeterReadLock.lock();
+            Map<String, MeterData> metersOnDevice = programmedMeters.get(deviceId);
+            if (metersOnDevice == null || metersOnDevice.isEmpty()) {
+                return false;
+            }
+            if (log.isTraceEnabled()) {
+                log.trace("pending metersOnDevice {}: {}", deviceId, metersOnDevice);
+            }
+            // NOTE that we check in order if the meter was ADDED and if it wasn't we check for PENDING_ADD
+            // it is possible that a different thread move the meter state from PENDING_ADD
+            // to ADDED between these two checks
+            // to avoid creating the meter twice we return true event if the meter is already added
+            return metersOnDevice.get(bandwidthProfile) != null && (
+                    metersOnDevice.get(bandwidthProfile).getMeterStatus().equals(MeterState.ADDED) ||
+                            metersOnDevice.get(bandwidthProfile).getMeterStatus().equals(MeterState.PENDING_ADD)
+            );
+
+        } finally {
+            programmedMeterReadLock.unlock();
+        }
+    }
+
+    public MeterId getMeterIdForBandwidthProfile(DeviceId deviceId, String bandwidthProfile) {
+        try {
+            programmedMeterReadLock.lock();
+            Map<String, MeterData> metersOnDevice = programmedMeters.get(deviceId);
+            if (metersOnDevice == null || metersOnDevice.isEmpty()) {
+                return null;
+            }
+            MeterData meterData = metersOnDevice.get(bandwidthProfile);
+            if (meterData == null || meterData.getMeterStatus() != MeterState.ADDED) {
+                return null;
+            }
+            if (log.isTraceEnabled()) {
+                log.debug("Found meter {} on device {} for bandwidth profile {}",
+                        meterData.getMeterId(), deviceId, bandwidthProfile);
+            }
+            return meterData.getMeterId();
+        } finally {
+            programmedMeterReadLock.unlock();
         }
     }
 
     @Override
-    public ImmutableSet<MeterKey> getProgMeters() {
-        return bpInfoToMeter.stream()
-                .map(Map.Entry::getValue)
-                .collect(ImmutableSet.toImmutableSet());
+    public void purgeDeviceMeters(DeviceId deviceId) {
+        log.debug("Purging meters on device {}", deviceId);
+        meterService.purgeMeters(deviceId);
+
+        // after we purge the meters we also need to clear the map
+        try {
+            programmedMeterWriteLock.lock();
+            programmedMeters.remove(deviceId);
+        } finally {
+            programmedMeterWriteLock.unlock();
+        }
+
+        // and clear the event count
+        // NOTE do we need a lock?
+        pendingRemoveMeters.remove(deviceId);
     }
 
-    @Override
-    public MeterId createMeter(DeviceId deviceId, BandwidthProfileInformation bpInfo,
-                               CompletableFuture<Object> meterFuture) {
-        log.debug("Creating meter on {} for {}", deviceId, bpInfo);
+    /**
+     * Creates of a meter for a given Bandwidth Profile on a given device.
+     *
+     * @param deviceId         the DeviceId
+     * @param bandwidthProfile the BandwidthProfile ID
+     */
+    public void createMeterForBp(DeviceId deviceId, String bandwidthProfile) {
+        // adding meter in pending state to the programmedMeter map
+        try {
+            programmedMeterWriteLock.lock();
+            programmedMeters.compute(deviceId, (d, deviceMeters) -> {
+
+                if (deviceMeters == null) {
+                    deviceMeters = new HashMap<>();
+                }
+                // NOTE that this method is only called after verifying a
+                // meter for this BP does not already exist
+                MeterData meterData = new MeterData(
+                        null,
+                        MeterState.PENDING_ADD,
+                        bandwidthProfile
+                );
+                deviceMeters.put(bandwidthProfile, meterData);
+
+                return deviceMeters;
+            });
+        } finally {
+            programmedMeterWriteLock.unlock();
+        }
+
+        BandwidthProfileInformation bpInfo = getBandwidthProfileInformation(bandwidthProfile);
         if (bpInfo == null) {
-            log.warn("Requested bandwidth profile on {} information is NULL", deviceId);
-            meterFuture.complete(ObjectiveError.BADPARAMS);
-            return null;
+            log.error("BandwidthProfile {} information not found in sadis", bandwidthProfile);
+            return;
         }
 
-        MeterId meterId = getMeterIdFromBpMapping(deviceId, bpInfo.id());
-        if (meterId != null) {
-            log.debug("Meter {} was previously created for bp {}", meterId, bpInfo.id());
-            meterFuture.complete(null);
-            return meterId;
+        log.info("Creating meter for BandwidthProfile {} on device {}", bpInfo.id(), deviceId);
+
+        if (log.isTraceEnabled()) {
+            log.trace("BandwidthProfile: {}", bpInfo);
         }
 
         List<Band> meterBands = createMeterBands(bpInfo);
 
-        final AtomicReference<MeterId> meterIdRef = new AtomicReference<>();
+        CompletableFuture<Object> meterFuture = new CompletableFuture<>();
+
         MeterRequest meterRequest = DefaultMeterRequest.builder()
                 .withBands(meterBands)
                 .withUnit(Meter.Unit.KB_PER_SEC)
                 .withContext(new MeterContext() {
                     @Override
                     public void onSuccess(MeterRequest op) {
-                        log.debug("Meter {} for {} is installed on the device {}",
-                                  meterIdRef.get(), bpInfo.id(), deviceId);
-                        boolean added = addMeterIdToBpMapping(deviceId, meterIdRef.get(), bpInfo.id());
-                        if (added) {
-                            meterFuture.complete(null);
-                        } else {
-                            log.error("Failed to add Meter {} for {} on {} to the meter-bandwidth mapping",
-                                      meterIdRef.get(), bpInfo.id(), deviceId);
-                            meterFuture.complete(ObjectiveError.UNKNOWN);
-                        }
+                        log.info("Meter for BandwidthProfile {} is installed on the device {}",
+                                 bandwidthProfile, deviceId);
+                        meterFuture.complete(null);
                     }
 
                     @Override
                     public void onError(MeterRequest op, MeterFailReason reason) {
-                        log.error("Failed installing meter {} on {} for {}",
-                                  meterIdRef.get(), deviceId, bpInfo.id());
-                        bpInfoToMeter.remove(bpInfo.id(),
-                                             MeterKey.key(deviceId, meterIdRef.get()));
+                        log.error("Failed installing meter on {} for {}",
+                                  deviceId, bandwidthProfile);
                         meterFuture.complete(reason);
                     }
                 })
@@ -300,63 +438,34 @@
                 .burst()
                 .add();
 
+        // creating the meter
         Meter meter = meterService.submit(meterRequest);
-        meterIdRef.set(meter.id());
-        log.info("Meter {} created and sent for installation on {} for {}",
-                 meter.id(), deviceId, bpInfo);
-        return meter.id();
-    }
 
-    @Override
-    public void removeFromPendingMeters(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
-        if (deviceId == null) {
-            return;
-        }
-        pendingMeters.computeIfPresent(deviceId, (id, bwps) -> {
-            bwps.remove(bwpInfo);
-            return bwps;
-        });
-    }
-
-    @Override
-    public synchronized boolean checkAndAddPendingMeter(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
-        if (bwpInfo == null) {
-            log.debug("Bandwidth profile is null for device: {}", deviceId);
-            return false;
-        }
-        if (pendingMeters.containsKey(deviceId)
-                && pendingMeters.get(deviceId).contains(bwpInfo)) {
-            log.debug("Meter is already pending on {} with bp {}",
-                      deviceId, bwpInfo);
-            return false;
-        }
-        log.debug("Adding bandwidth profile {} to pending on {}",
-                  bwpInfo, deviceId);
-        pendingMeters.compute(deviceId, (id, bwps) -> {
-            if (bwps == null) {
-                bwps = new HashSet<>();
+        // wait for the meter to be completed
+        meterFuture.thenAccept(error -> {
+            if (error != null) {
+                log.error("Cannot create meter, TODO address me");
             }
-            bwps.add(bwpInfo);
-            return bwps;
+
+            // then update the map with the MeterId
+            try {
+                programmedMeterWriteLock.lock();
+                programmedMeters.compute(deviceId, (d, entry) -> {
+                    if (entry != null) {
+                        entry.compute(bandwidthProfile, (bp, meterData) -> {
+                            if (meterData != null) {
+                                meterData.setMeterCellId(meter.meterCellId());
+                                meterData.setMeterStatus(MeterState.ADDED);
+                            }
+                            return meterData;
+                        });
+                    }
+                    return entry;
+                });
+            } finally {
+                programmedMeterWriteLock.unlock();
+            }
         });
-
-        return true;
-    }
-
-    @Override
-    public void clearMeters(DeviceId deviceId) {
-        log.debug("Removing all meters for device {}", deviceId);
-        clearDeviceState(deviceId);
-        meterService.purgeMeters(deviceId);
-    }
-
-    @Override
-    public void clearDeviceState(DeviceId deviceId) {
-        log.info("Clearing local device state for {}", deviceId);
-        pendingRemoveMeters.remove(deviceId);
-        removeMetersFromBpMapping(deviceId);
-        //Following call handles cornercase of OLT delete during meter provisioning
-        pendingMeters.remove(deviceId);
     }
 
     private List<Band> createMeterBands(BandwidthProfileInformation bpInfo) {
@@ -408,119 +517,137 @@
                 .build();
     }
 
-    private void removeMeterFromBpMapping(MeterKey meterKey) {
-        List<Map.Entry<String, MeterKey>> meters = bpInfoToMeter.stream()
-                .filter(e -> e.getValue().equals(meterKey))
-                .collect(Collectors.toList());
-
-        meters.forEach(e -> bpInfoToMeter.remove(e.getKey(), e.getValue()));
-    }
-
-    private void removeMetersFromBpMapping(DeviceId deviceId) {
-        List<Map.Entry<String, MeterKey>> meters = bpInfoToMeter.stream()
-                .filter(e -> e.getValue().deviceId().equals(deviceId))
-                .collect(Collectors.toList());
-
-        meters.forEach(e -> bpInfoToMeter.remove(e.getKey(), e.getValue()));
-    }
-
-    /**
-     * Checks for mastership or falls back to leadership on deviceId.
-     * If the device is available use mastership,
-     * otherwise fallback on leadership.
-     * Leadership on the device topic is needed because the master can be NONE
-     * in case the device went away, we still need to handle events
-     * consistently
-     */
-    private boolean isLocalLeader(DeviceId deviceId) {
-        if (deviceService.isAvailable(deviceId)) {
-            return mastershipService.isLocalMaster(deviceId);
-        } else {
-            // Fallback with Leadership service - device id is used as topic
-            NodeId leader = leadershipService.runForLeadership(
-                    deviceId.toString()).leaderNodeId();
-            // Verify if this node is the leader
-            return clusterService.getLocalNode().id().equals(leader);
+    private BandwidthProfileInformation getBandwidthProfileInformation(String bandwidthProfile) {
+        if (!checkSadisRunning()) {
+            return null;
         }
+        if (bandwidthProfile == null) {
+            return null;
+        }
+        return bpService.get(bandwidthProfile);
+    }
+
+    private boolean checkSadisRunning() {
+        if (bpService == null) {
+            log.warn("Sadis is not running");
+            return false;
+        }
+        return true;
     }
 
     private class InternalMeterListener implements MeterListener {
-
         @Override
         public void event(MeterEvent meterEvent) {
-            eventExecutor.execute(() -> {
+            pendingRemovalMetersExecutor.execute(() -> {
+
                 Meter meter = meterEvent.subject();
-                if (meter == null) {
-                    log.error("Meter in event {} is null", meterEvent);
+                if (!appId.equals(meter.appId())) {
                     return;
                 }
-                if (isLocalLeader(meter.deviceId())) {
-                    MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
-                    if (deleteMeters &&
-                            MeterEvent.Type.METER_REFERENCE_COUNT_ZERO.equals(meterEvent.type())) {
-                        log.info("Zero Count Meter Event is received. Meter is {} on {}",
-                                 meter.id(), meter.deviceId());
-                        incrementMeterCount(meter.deviceId(), key);
 
-                        if (appId.equals(meter.appId()) && pendingRemoveMeters.get(meter.deviceId())
-                                .get(key).get() == zeroReferenceMeterCount) {
-                            log.info("Deleting unreferenced, no longer programmed Meter {} on {}",
-                                     meter.id(), meter.deviceId());
+                if (log.isTraceEnabled()) {
+                    log.trace("Received meter event {}", meterEvent);
+                }
+                MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
+                if (meterEvent.type().equals(MeterEvent.Type.METER_REFERENCE_COUNT_ZERO)) {
+                    if (!oltDeviceService.isLocalLeader(meter.deviceId())) {
+                        if (log.isTraceEnabled()) {
+                            log.trace("ignoring meter event {} " +
+                                    "as not leader for {}", meterEvent, meter.deviceId());
+                        }
+                        return;
+                    }
+                    log.info("Zero Count Reference event is received for meter {} on {}, " +
+                                     "incrementing counter",
+                             meter.id(), meter.deviceId());
+                    incrementMeterCount(meter.deviceId(), key);
+                    if (pendingRemoveMeters.get(meter.deviceId())
+                            .get(key).get() == zeroReferenceMeterCount) {
+                        // only delete the meters if the app is configured to do so
+                        if (deleteMeters) {
+                            log.info("Meter {} on device {} is unused, removing it", meter.id(), meter.deviceId());
                             deleteMeter(meter.deviceId(), meter.id());
                         }
                     }
-                    if (MeterEvent.Type.METER_REMOVED.equals(meterEvent.type())) {
-                        log.info("Meter Removed Event is received for {} on {}",
-                                 meter.id(), meter.deviceId());
-                        pendingRemoveMeters.computeIfPresent(meter.deviceId(),
-                                                             (id, meters) -> {
-                                                                 if (meters.get(key) == null) {
-                                                                     log.info("Meters is not pending " +
-                                                                                      "{} on {}", key, id);
-                                                                     return meters;
-                                                                 }
-                                                                 meters.remove(key);
-                                                                 return meters;
-                                                             });
-                        removeMeterFromBpMapping(key);
-                    }
-                } else {
-                    log.trace("Ignoring meter event, not leader of {}, {}", meter.deviceId(), meterEvent);
+                }
+
+                if (meterEvent.type().equals(MeterEvent.Type.METER_REMOVED)) {
+                    removeMeterCount(meter, key);
                 }
             });
         }
 
+        private void removeMeterCount(Meter meter, MeterKey key) {
+            pendingRemoveMeters.computeIfPresent(meter.deviceId(),
+                                                 (id, meters) -> {
+                                                     if (meters.get(key) == null) {
+                                                         log.info("Meters is not pending " +
+                                                                          "{} on {}", key, id);
+                                                         return meters;
+                                                     }
+                                                     meters.remove(key);
+                                                     return meters;
+                                                 });
+        }
+
         private void incrementMeterCount(DeviceId deviceId, MeterKey key) {
             if (key == null) {
                 return;
             }
             pendingRemoveMeters.compute(deviceId,
-                    (id, meters) -> {
-                        if (meters == null) {
-                            meters = new HashMap<>();
+                                        (id, meters) -> {
+                                            if (meters == null) {
+                                                meters = new HashMap<>();
 
-                        }
-                        if (meters.get(key) == null) {
-                            meters.put(key, new AtomicInteger(1));
-                        }
-                        meters.get(key).addAndGet(1);
-                        return meters;
-                    });
+                                            }
+                                            if (meters.get(key) == null) {
+                                                meters.put(key, new AtomicInteger(1));
+                                            }
+                                            meters.get(key).addAndGet(1);
+                                            return meters;
+                                        });
+        }
+    }
+
+    private void deleteMeter(DeviceId deviceId, MeterId meterId) {
+        Meter meter = meterService.getMeter(deviceId, meterId);
+        if (meter != null) {
+            MeterRequest meterRequest = DefaultMeterRequest.builder()
+                    .withBands(meter.bands())
+                    .withUnit(meter.unit())
+                    .forDevice(deviceId)
+                    .fromApp(appId)
+                    .burst()
+                    .remove();
+
+            meterService.withdraw(meterRequest, meterId);
         }
 
-        private void deleteMeter(DeviceId deviceId, MeterId meterId) {
-            Meter meter = meterService.getMeter(deviceId, meterId);
-            if (meter != null) {
-                MeterRequest meterRequest = DefaultMeterRequest.builder()
-                        .withBands(meter.bands())
-                        .withUnit(meter.unit())
-                        .forDevice(deviceId)
-                        .fromApp(appId)
-                        .burst()
-                        .remove();
-
-                meterService.withdraw(meterRequest, meterId);
-            }
+        // remove the meter from local caching
+        try {
+            programmedMeterWriteLock.lock();
+            programmedMeters.computeIfPresent(deviceId, (d, deviceMeters) -> {
+                Iterator<Map.Entry<String, MeterData>> iter = deviceMeters.entrySet().iterator();
+                while (iter.hasNext()) {
+                    Map.Entry<String, MeterData> entry = iter.next();
+                    if (entry.getValue().getMeterId().equals(meterId)) {
+                        deviceMeters.remove(entry.getKey());
+                    }
+                }
+                return deviceMeters;
+            });
+        } finally {
+            programmedMeterWriteLock.unlock();
         }
     }
+
+    protected void bindSadisService(SadisService service) {
+        this.bpService = service.getBandwidthProfileService();
+        log.info("Sadis service is loaded");
+    }
+
+    protected void unbindSadisService(SadisService service) {
+        this.bpService = null;
+        log.info("Sadis service is unloaded");
+    }
 }
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltMeterServiceInterface.java b/impl/src/main/java/org/opencord/olt/impl/OltMeterServiceInterface.java
new file mode 100644
index 0000000..a10455b
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/impl/OltMeterServiceInterface.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.impl;
+
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.meter.MeterId;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
+
+import java.util.Map;
+
+/**
+ * Interface for meter installation/removal methods
+ * for different types of bandwidth profiles.
+ */
+public interface OltMeterServiceInterface {
+    /**
+     * Checks for a meter, if not present it will create it and return false.
+     * @param deviceId DeviceId
+     * @param bandwidthProfile Bandwidth Profile Id
+     * @return boolean
+     */
+    boolean createMeter(DeviceId deviceId, String bandwidthProfile);
+
+    /**
+     * Checks for all the meters specified in the sadis uniTagList,
+     * if not present it will create them and return false.
+     * @param deviceId DeviceId
+     * @param si SubscriberAndDeviceInformation
+     * @return boolean
+     */
+    boolean createMeters(DeviceId deviceId, SubscriberAndDeviceInformation si);
+
+    /**
+     * Checks if a meter for the specified bandwidthProfile exists
+     * and is in ADDED state.
+     * @param deviceId DeviceId
+     * @param bandwidthProfileId bandwidth profile id
+     * @return true if present and in ADDED state
+     */
+    boolean hasMeterByBandwidthProfile(DeviceId deviceId, String bandwidthProfileId);
+
+    /**
+     * Checks if a meter for the specified bandwidthProfile exists
+     * and is in PENDING_ADD state.
+     * @param deviceId DeviceId
+     * @param bandwidthProfileId bandwidth profile id
+     * @return true if present and in PENDING_ADD state
+     */
+    boolean hasPendingMeterByBandwidthProfile(DeviceId deviceId, String bandwidthProfileId);
+
+    /**
+     * Creates a meter on a device for the given BandwidthProfile Id.
+     * @param deviceId the device id
+     * @param bandwidthProfileId the bandwidth profile Id
+     */
+    void createMeterForBp(DeviceId deviceId, String bandwidthProfileId);
+
+    /**
+     * Returns the meter Id for a given bandwidth profile Id.
+     * @param deviceId the device id
+     * @param bandwidthProfileId the bandwidth profile Id
+     * @return the meter Id
+     */
+    MeterId getMeterIdForBandwidthProfile(DeviceId deviceId, String bandwidthProfileId);
+
+    /**
+     * Purges all the meters on a device.
+     * @param deviceId the device
+     */
+    void purgeDeviceMeters(DeviceId deviceId);
+
+    /**
+     * Return all programmed meters for all OLTs controlled by this ONOS cluster.
+     * @return a map, with the device keys, and entry of map with bp Id and corresponding meter
+     */
+    Map<DeviceId, Map<String, MeterData>> getProgrammedMeters();
+
+}
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltPortStatus.java b/impl/src/main/java/org/opencord/olt/impl/OltPortStatus.java
new file mode 100644
index 0000000..655a0ac
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/impl/OltPortStatus.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.impl;
+
+import java.util.Objects;
+
+/**
+ * OltPortStatus is used to keep track of the flow status for a subscriber service.
+ */
+public class OltPortStatus {
+    // TODO consider adding a lastUpdated field, it may help with debugging
+    public OltFlowService.OltFlowsStatus defaultEapolStatus;
+    public OltFlowService.OltFlowsStatus subscriberFlowsStatus;
+    // NOTE we need to keep track of the DHCP status as that is installed before the other flows
+    // if macLearning is enabled (DHCP is needed to learn the MacAddress from the host)
+    public OltFlowService.OltFlowsStatus dhcpStatus;
+
+    public OltPortStatus(OltFlowService.OltFlowsStatus defaultEapolStatus,
+                         OltFlowService.OltFlowsStatus subscriberFlowsStatus,
+                         OltFlowService.OltFlowsStatus dhcpStatus) {
+        this.defaultEapolStatus = defaultEapolStatus;
+        this.subscriberFlowsStatus = subscriberFlowsStatus;
+        this.dhcpStatus = dhcpStatus;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        OltPortStatus that = (OltPortStatus) o;
+        return defaultEapolStatus == that.defaultEapolStatus
+                && subscriberFlowsStatus == that.subscriberFlowsStatus
+                && dhcpStatus == that.dhcpStatus;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(defaultEapolStatus, subscriberFlowsStatus, dhcpStatus);
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder("OltPortStatus{");
+        sb.append("defaultEapolStatus=").append(defaultEapolStatus);
+        sb.append(", subscriberFlowsStatus=").append(subscriberFlowsStatus);
+        sb.append(", dhcpStatus=").append(dhcpStatus);
+        sb.append('}');
+        return sb.toString();
+    }
+}
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltUtils.java b/impl/src/main/java/org/opencord/olt/impl/OltUtils.java
new file mode 100644
index 0000000..6300ff9
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/impl/OltUtils.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.impl;
+
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.Port;
+
+import static org.opencord.olt.impl.OltFlowService.FlowOperation.ADD;
+
+/**
+ * Utility class for OLT app.
+ */
+final class OltUtils {
+
+    private OltUtils() {
+    }
+
+    /**
+     * Returns the port name if present in the annotations.
+     * @param port the port
+     * @return the annotated port name
+     */
+    static String getPortName(Port port) {
+        String name = port.annotations().value(AnnotationKeys.PORT_NAME);
+        return name == null ? "" : name;
+    }
+
+    /**
+     * Returns a port printed as a connect point and with the name appended.
+     * @param port the port
+     * @return the formatted string
+     */
+    static String portWithName(Port port) {
+        return port.element().id().toString() + '/' +
+                port.number() + '[' +
+                getPortName(port) + ']';
+    }
+
+    static String flowOpToString(OltFlowService.FlowOperation op) {
+        return op == ADD ? "Adding" : "Removing";
+    }
+
+    static String completeFlowOpToString(OltFlowService.FlowOperation op) {
+        return op == ADD ? "Added" : "Removed";
+    }
+}
diff --git a/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java b/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java
index 3657436..a73a42b 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java
@@ -58,15 +58,21 @@
     public static final String ENABLE_PPPOE = "enablePppoe";
     public static final boolean ENABLE_PPPOE_DEFAULT = false;
 
-    public static final String EAPOL_DELETE_RETRY_MAX_ATTEMPS = "eapolDeleteRetryMaxAttempts";
-    public static final int EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT = 3;
-
-    public static final String PROVISION_DELAY = "provisionDelay";
-    public static final int PROVISION_DELAY_DEFAULT = 100;
+    public static final String WAIT_FOR_REMOVAL = "waitForRemoval";
+    public static final boolean WAIT_FOR_REMOVAL_DEFAULT = true;
 
     public static final String REQUIRED_DRIVERS_PROPERTY_DELAY = "requiredDriversPropertyDelay";
     public static final int REQUIRED_DRIVERS_PROPERTY_DELAY_DEFAULT = 5;
 
+    public static final String FLOW_PROCESSING_THREADS = "flowProcessingThreads";
+    public static final int FLOW_PROCESSING_THREADS_DEFAULT = 8;
+
+    public static final String SUBSCRIBER_PROCESSING_THREADS = "subscriberProcessingThreads";
+    public static final int SUBSCRIBER_PROCESSING_THREADS_DEFAULT = 8;
+
+    public static final String REQUEUE_DELAY = "requeueDelay";
+    public static final int REQUEUE_DELAY_DEFAULT = 500;
+
     public static final String UPSTREAM_ONU = "upstreamOnu";
     public static final String UPSTREAM_OLT = "upstreamOlt";
     public static final String DOWNSTREAM_ONU = "downstreamOnu";
diff --git a/impl/src/main/java/org/opencord/olt/impl/ServiceKey.java b/impl/src/main/java/org/opencord/olt/impl/ServiceKey.java
new file mode 100644
index 0000000..c878062
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/impl/ServiceKey.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.impl;
+
+import org.opencord.sadis.UniTagInformation;
+
+import java.util.Objects;
+
+/**
+ * SubscriberKey is used to identify the combination of a subscriber and a service.
+ */
+public class ServiceKey {
+    private AccessDevicePort port;
+    private UniTagInformation service;
+
+    public ServiceKey(AccessDevicePort port, UniTagInformation service) {
+        this.port = port;
+        this.service = service;
+    }
+
+    public AccessDevicePort getPort() {
+        return port;
+    }
+
+    public void setPort(AccessDevicePort port) {
+        this.port = port;
+    }
+
+    public UniTagInformation getService() {
+        return service;
+    }
+
+    public void setService(UniTagInformation service) {
+        this.service = service;
+    }
+
+    @Override
+    public String toString() {
+        return this.port.toString() + " - " + this.service.getServiceName();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        ServiceKey that = (ServiceKey) o;
+        boolean isPortEqual = Objects.equals(port, that.port);
+        boolean isServiceEqual = Objects.equals(service, that.service);
+
+        return isPortEqual && isServiceEqual;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(port, service);
+    }
+}
diff --git a/impl/src/main/java/org/opencord/olt/impl/ServiceKeySerializer.java b/impl/src/main/java/org/opencord/olt/impl/ServiceKeySerializer.java
new file mode 100644
index 0000000..4035bdd
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/impl/ServiceKeySerializer.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.impl;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.Serializer;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+import org.opencord.sadis.UniTagInformation;
+
+/**
+ * ServiceKeySerializer is a custom serializer to store a ServiceKey in an Atomix ditributed map.
+ */
+class ServiceKeySerializer extends Serializer<ServiceKey> {
+
+    ServiceKeySerializer() {
+        // non-null, immutable
+        super(false, true);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, ServiceKey object) {
+        kryo.writeClassAndObject(output, object.getPort().connectPoint().port());
+        output.writeString(object.getPort().name());
+        output.writeString(object.getPort().connectPoint().deviceId().toString());
+        kryo.writeClassAndObject(output, object.getService());
+    }
+
+    @Override
+    public ServiceKey read(Kryo kryo, Input input, Class<ServiceKey> type) {
+        PortNumber port = (PortNumber) kryo.readClassAndObject(input);
+        final String portName = input.readString();
+        final String devId = input.readString();
+
+        UniTagInformation uti = (UniTagInformation) kryo.readClassAndObject(input);
+        ConnectPoint cp = new ConnectPoint(DeviceId.deviceId(devId), port);
+        AccessDevicePort adp = new AccessDevicePort(cp, portName);
+
+        return new ServiceKey(adp, uti);
+    }
+}
diff --git a/impl/src/main/java/org/opencord/olt/impl/SubscriberFlowInfo.java b/impl/src/main/java/org/opencord/olt/impl/SubscriberFlowInfo.java
deleted file mode 100644
index 225e48f..0000000
--- a/impl/src/main/java/org/opencord/olt/impl/SubscriberFlowInfo.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Copyright 2020-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.opencord.olt.impl;
-
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.meter.MeterId;
-import org.opencord.olt.AccessDevicePort;
-import org.opencord.sadis.UniTagInformation;
-
-import java.util.Objects;
-
-/**
- * Contains the mapping of a given port to flow information, including bandwidth profile.
- */
-class SubscriberFlowInfo {
-    private final DeviceId devId;
-    private final AccessDevicePort nniPort;
-    private final AccessDevicePort uniPort;
-    private final UniTagInformation tagInfo;
-    private MeterId downId;
-    private MeterId upId;
-    private MeterId downOltId;
-    private MeterId upOltId;
-    private final String downBpInfo;
-    private final String upBpInfo;
-    private final String downOltBpInfo;
-    private final String upOltBpInfo;
-
-    /**
-     * Builds the mapper of information.
-     * @param nniPort       the nni port
-     * @param uniPort       the uni port
-     * @param tagInfo       the tag info
-     * @param downId        the downstream meter id
-     * @param upId          the upstream meter id
-     * @param downOltId     the downstream meter id of OLT device
-     * @param upOltId       the upstream meter id of OLT device
-     * @param downBpInfo    the downstream bandwidth profile
-     * @param upBpInfo      the upstream bandwidth profile
-     * @param downOltBpInfo the downstream bandwidth profile of OLT device
-     * @param upOltBpInfo   the upstream bandwidth profile of OLT device
-     */
-    SubscriberFlowInfo(AccessDevicePort nniPort, AccessDevicePort uniPort,
-                       UniTagInformation tagInfo, MeterId downId, MeterId upId,
-                       MeterId downOltId, MeterId upOltId,
-                       String downBpInfo, String upBpInfo,
-                       String downOltBpInfo, String upOltBpInfo) {
-        this.devId = uniPort.deviceId();
-        this.nniPort = nniPort;
-        this.uniPort = uniPort;
-        this.tagInfo = tagInfo;
-        this.downId = downId;
-        this.upId = upId;
-        this.downOltId = downOltId;
-        this.upOltId = upOltId;
-        this.downBpInfo = downBpInfo;
-        this.upBpInfo = upBpInfo;
-        this.downOltBpInfo = downOltBpInfo;
-        this.upOltBpInfo = upOltBpInfo;
-    }
-
-    /**
-     * Gets the device id of this subscriber and flow information.
-     *
-     * @return device id
-     */
-    DeviceId getDevId() {
-        return devId;
-    }
-
-    /**
-     * Gets the nni of this subscriber and flow information.
-     *
-     * @return nni port
-     */
-    AccessDevicePort getNniPort() {
-        return nniPort;
-    }
-
-    /**
-     * Gets the uni port of this subscriber and flow information.
-     *
-     * @return uni port
-     */
-    AccessDevicePort getUniPort() {
-        return uniPort;
-    }
-
-    /**
-     * Gets the tag of this subscriber and flow information.
-     *
-     * @return tag of the subscriber
-     */
-    UniTagInformation getTagInfo() {
-        return tagInfo;
-    }
-
-    /**
-     * Gets the downstream meter id of this subscriber and flow information.
-     *
-     * @return downstream meter id
-     */
-    MeterId getDownId() {
-        return downId;
-    }
-
-    /**
-     * Gets the upstream meter id of this subscriber and flow information.
-     *
-     * @return upstream meter id
-     */
-    MeterId getUpId() {
-        return upId;
-    }
-
-    /**
-     * Gets the downstream meter id of this subscriber and flow information of OLT device.
-     *
-     * @return downstream olt meter id
-     */
-    MeterId getDownOltId() {
-        return downOltId;
-    }
-
-    /**
-     * Gets the upstream meter id of this subscriber and flow information of OLT device.
-     *
-     * @return upstream olt meter id
-     */
-    MeterId getUpOltId() {
-        return upOltId;
-    }
-
-    /**
-     * Gets the downstream bandwidth profile of this subscriber and flow information.
-     *
-     * @return downstream bandwidth profile
-     */
-    String getDownBpInfo() {
-        return downBpInfo;
-    }
-
-    /**
-     * Gets the upstream bandwidth profile of this subscriber and flow information.
-     *
-     * @return upstream bandwidth profile.
-     */
-    String getUpBpInfo() {
-        return upBpInfo;
-    }
-
-    /**
-     * Gets the downstream bandwidth profile of this subscriber and flow information of OLT device.
-     *
-     * @return downstream OLT bandwidth profile
-     */
-    String getDownOltBpInfo() {
-        return downOltBpInfo;
-    }
-
-    /**
-     * Gets the upstream bandwidth profile of this subscriber and flow information of OLT device.
-     *
-     * @return upstream OLT bandwidth profile.
-     */
-    String getUpOltBpInfo() {
-        return upOltBpInfo;
-    }
-
-    /**
-     * Sets the upstream meter id.
-     *
-     * @param upMeterId the upstream meter id
-     */
-    void setUpMeterId(MeterId upMeterId) {
-        this.upId = upMeterId;
-    }
-
-    /**
-     * Sets the downstream meter id.
-     *
-     * @param downMeterId the downstream meter id
-     */
-    void setDownMeterId(MeterId downMeterId) {
-        this.downId = downMeterId;
-    }
-
-    /**
-     * Sets the upstream meter id of OLT.
-     *
-     * @param upOltMeterId the upstream meter id of OLT
-     */
-    void setUpOltMeterId(MeterId upOltMeterId) {
-        this.upOltId = upOltMeterId;
-    }
-
-    /**
-     * Sets the downstream meter id of OLT.
-     *
-     * @param downOltMeterId the downstream meter id of OLT
-     */
-    void setDownOltMeterId(MeterId downOltMeterId) {
-        this.downOltId = downOltMeterId;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        SubscriberFlowInfo flowInfo = (SubscriberFlowInfo) o;
-        return devId.equals(flowInfo.devId) &&
-                nniPort.equals(flowInfo.nniPort) &&
-                uniPort.equals(flowInfo.uniPort) &&
-                tagInfo.equals(flowInfo.tagInfo) &&
-                downBpInfo.equals(flowInfo.downBpInfo) &&
-                upBpInfo.equals(flowInfo.upBpInfo) &&
-                Objects.equals(downOltBpInfo, flowInfo.downOltBpInfo) &&
-                Objects.equals(upOltBpInfo, flowInfo.upOltBpInfo);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(devId, nniPort, uniPort, tagInfo, downBpInfo, upBpInfo, downOltBpInfo, upOltBpInfo);
-    }
-
-    @Override
-    public String toString() {
-        return com.google.common.base.MoreObjects.toStringHelper(this)
-                .add("devId", devId)
-                .add("nniPort", nniPort)
-                .add("uniPort", uniPort)
-                .add("tagInfo", tagInfo)
-                .add("downId", downId)
-                .add("upId", upId)
-                .add("downOltId", downOltId)
-                .add("upOltId", upOltId)
-                .add("downBpInfo", downBpInfo)
-                .add("upBpInfo", upBpInfo)
-                .add("downOltBpInfo", downOltBpInfo)
-                .add("upOltBpInfo", upOltBpInfo)
-                .toString();
-    }
-}
diff --git a/impl/src/main/java/org/opencord/olt/impl/package-info.java b/impl/src/main/java/org/opencord/olt/impl/package-info.java
index b5ba320..ccdfda7 100644
--- a/impl/src/main/java/org/opencord/olt/impl/package-info.java
+++ b/impl/src/main/java/org/opencord/olt/impl/package-info.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-present Open Networking Foundation
+ * Copyright 2021-present Open Networking Foundation
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -15,6 +15,6 @@
  */
 
 /**
- * OLT application handling PMC OLT hardware.
+ * ONOS application archetype.
  */
-package org.opencord.olt.impl;
+package org.opencord.olt.impl;
\ No newline at end of file