[VOL-3836] Extract the OLT pipeliners from ONOS

Change-Id: I0dc99aabcb17b46fc5dc8bbe8e3bbd5ece52058a
diff --git a/app/app.xml b/app/app.xml
index 6512e95..53e8767 100644
--- a/app/app.xml
+++ b/app/app.xml
@@ -21,6 +21,7 @@
      featuresRepo="mvn:${project.groupId}/${project.artifactId}/${project.version}/xml/features"
      features="${project.artifactId}" apps="org.opencord.sadis">
     <description>${project.description}</description>
-    <artifact>mvn:${project.groupId}/${project.artifactId}/${project.version}</artifact>
     <artifact>mvn:${project.groupId}/olt-api/${project.version}</artifact>
+    <artifact>mvn:${project.groupId}/olt-impl/${project.version}</artifact>
+    <artifact>mvn:${project.groupId}/olt-web/${project.version}</artifact>
 </app>
diff --git a/app/features.xml b/app/features.xml
index b1fee21..abb171b 100644
--- a/app/features.xml
+++ b/app/features.xml
@@ -19,6 +19,7 @@
              description="${project.description}">
         <feature>onos-api</feature>
         <bundle>mvn:${project.groupId}/olt-api/${project.version}</bundle>
-        <bundle>mvn:${project.groupId}/${project.artifactId}/${project.version}</bundle>
+        <bundle>mvn:${project.groupId}/olt-impl/${project.version}</bundle>
+        <bundle>mvn:${project.groupId}/olt-web/${project.version}</bundle>
     </feature>
 </features>
diff --git a/app/pom.xml b/app/pom.xml
index 8bfd618..2682ae2 100644
--- a/app/pom.xml
+++ b/app/pom.xml
@@ -30,12 +30,6 @@
     <description>OLT application for CORD</description>
 
     <properties>
-        <web.context>/onos/olt</web.context>
-        <api.version>1.0.0</api.version>
-        <api.title>ONOS OLT REST API</api.title>
-        <api.description>
-            APIs for interacting with the CORD OLT application.
-        </api.description>
         <onos.app.name>org.opencord.olt</onos.app.name>
         <onos.app.title>OLT Access Management</onos.app.title>
         <onos.app.origin>OpenCord</onos.app.origin>
@@ -44,93 +38,15 @@
         <onos.app.readme>
             CORD OLT Access management application
         </onos.app.readme>
-        <api.package>org.opencord.olt.rest</api.package>
         <olt.api.version>${project.version}</olt.api.version>
     </properties>
 
-    <dependencies>
-        <dependency>
-            <groupId>org.opencord</groupId>
-            <artifactId>olt-api</artifactId>
-            <version>${olt.api.version}</version>
-            <scope>compile</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.opencord</groupId>
-            <artifactId>sadis-api</artifactId>
-            <version>${sadis.api.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.onosproject</groupId>
-            <artifactId>onos-cli</artifactId>
-            <version>${onos.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.onosproject</groupId>
-            <artifactId>onos-api</artifactId>
-            <version>${onos.version}</version>
-            <classifier>tests</classifier>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.onosproject</groupId>
-            <artifactId>onos-core-serializers</artifactId>
-            <version>${onos.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.karaf.shell</groupId>
-            <artifactId>org.apache.karaf.shell.console</artifactId>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.karaf.shell</groupId>
-            <artifactId>org.apache.karaf.shell.core</artifactId>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>javax.ws.rs</groupId>
-            <artifactId>javax.ws.rs-api</artifactId>
-            <scope>provided</scope>
-        </dependency>
-
-    </dependencies>
-
     <build>
         <plugins>
             <plugin>
                 <groupId>org.onosproject</groupId>
                 <artifactId>onos-maven-plugin</artifactId>
             </plugin>
-
-            <plugin>
-                <groupId>org.apache.felix</groupId>
-                <artifactId>maven-bundle-plugin</artifactId>
-                <configuration>
-                    <instructions>
-                        <_wab>src/main/webapp/</_wab>
-                        <Include-Resource>
-                            WEB-INF/classes/apidoc/swagger.json=target/swagger.json,
-                            {maven-resources}
-                        </Include-Resource>
-                        <Import-Package>
-                            *,org.glassfish.jersey.servlet
-                        </Import-Package>
-                        <Web-ContextPath>${web.context}</Web-ContextPath>
-                        <Karaf-Commands>org.opencord.olt.cli</Karaf-Commands>
-                    </instructions>
-                </configuration>
-            </plugin>
-
         </plugins>
     </build>
 </project>
diff --git a/impl/pom.xml b/impl/pom.xml
new file mode 100644
index 0000000..99f4b4e
--- /dev/null
+++ b/impl/pom.xml
@@ -0,0 +1,104 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright 2016-present Open Networking Foundation
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.opencord</groupId>
+        <artifactId>olt</artifactId>
+        <version>4.4.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>olt-impl</artifactId>
+    <packaging>bundle</packaging>
+    <description>OLT application for CORD</description>
+
+    <properties>
+        <olt.api.version>${project.version}</olt.api.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.opencord</groupId>
+            <artifactId>olt-api</artifactId>
+            <version>${olt.api.version}</version>
+            <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.opencord</groupId>
+            <artifactId>sadis-api</artifactId>
+            <version>${sadis.api.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-cli</artifactId>
+            <version>${onos.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-api</artifactId>
+            <version>${onos.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-core-serializers</artifactId>
+            <version>${onos.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.karaf.shell</groupId>
+            <artifactId>org.apache.karaf.shell.console</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.karaf.shell</groupId>
+            <artifactId>org.apache.karaf.shell.core</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.onosproject</groupId>
+                <artifactId>onos-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <configuration>
+                    <instructions>
+                        <Karaf-Commands>org.opencord.olt.cli</Karaf-Commands>
+                    </instructions>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/app/src/main/java/org/opencord/olt/cli/ShowBpMeterMappingsCommand.java b/impl/src/main/java/org/opencord/olt/cli/ShowBpMeterMappingsCommand.java
similarity index 100%
rename from app/src/main/java/org/opencord/olt/cli/ShowBpMeterMappingsCommand.java
rename to impl/src/main/java/org/opencord/olt/cli/ShowBpMeterMappingsCommand.java
diff --git a/app/src/main/java/org/opencord/olt/cli/ShowFailedSubscribersCommand.java b/impl/src/main/java/org/opencord/olt/cli/ShowFailedSubscribersCommand.java
similarity index 100%
rename from app/src/main/java/org/opencord/olt/cli/ShowFailedSubscribersCommand.java
rename to impl/src/main/java/org/opencord/olt/cli/ShowFailedSubscribersCommand.java
diff --git a/app/src/main/java/org/opencord/olt/cli/ShowOltCommand.java b/impl/src/main/java/org/opencord/olt/cli/ShowOltCommand.java
similarity index 100%
rename from app/src/main/java/org/opencord/olt/cli/ShowOltCommand.java
rename to impl/src/main/java/org/opencord/olt/cli/ShowOltCommand.java
diff --git a/app/src/main/java/org/opencord/olt/cli/ShowProgrammedMetersCommand.java b/impl/src/main/java/org/opencord/olt/cli/ShowProgrammedMetersCommand.java
similarity index 100%
rename from app/src/main/java/org/opencord/olt/cli/ShowProgrammedMetersCommand.java
rename to impl/src/main/java/org/opencord/olt/cli/ShowProgrammedMetersCommand.java
diff --git a/app/src/main/java/org/opencord/olt/cli/ShowProgrammedSubscribersCommand.java b/impl/src/main/java/org/opencord/olt/cli/ShowProgrammedSubscribersCommand.java
similarity index 100%
rename from app/src/main/java/org/opencord/olt/cli/ShowProgrammedSubscribersCommand.java
rename to impl/src/main/java/org/opencord/olt/cli/ShowProgrammedSubscribersCommand.java
diff --git a/app/src/main/java/org/opencord/olt/cli/SubscriberAddCommand.java b/impl/src/main/java/org/opencord/olt/cli/SubscriberAddCommand.java
similarity index 100%
rename from app/src/main/java/org/opencord/olt/cli/SubscriberAddCommand.java
rename to impl/src/main/java/org/opencord/olt/cli/SubscriberAddCommand.java
diff --git a/app/src/main/java/org/opencord/olt/cli/SubscriberRemoveCommand.java b/impl/src/main/java/org/opencord/olt/cli/SubscriberRemoveCommand.java
similarity index 100%
rename from app/src/main/java/org/opencord/olt/cli/SubscriberRemoveCommand.java
rename to impl/src/main/java/org/opencord/olt/cli/SubscriberRemoveCommand.java
diff --git a/app/src/main/java/org/opencord/olt/cli/UniTagAddCommand.java b/impl/src/main/java/org/opencord/olt/cli/UniTagAddCommand.java
similarity index 100%
rename from app/src/main/java/org/opencord/olt/cli/UniTagAddCommand.java
rename to impl/src/main/java/org/opencord/olt/cli/UniTagAddCommand.java
diff --git a/app/src/main/java/org/opencord/olt/cli/UniTagRemoveCommand.java b/impl/src/main/java/org/opencord/olt/cli/UniTagRemoveCommand.java
similarity index 100%
rename from app/src/main/java/org/opencord/olt/cli/UniTagRemoveCommand.java
rename to impl/src/main/java/org/opencord/olt/cli/UniTagRemoveCommand.java
diff --git a/app/src/main/java/org/opencord/olt/cli/package-info.java b/impl/src/main/java/org/opencord/olt/cli/package-info.java
similarity index 100%
rename from app/src/main/java/org/opencord/olt/cli/package-info.java
rename to impl/src/main/java/org/opencord/olt/cli/package-info.java
diff --git a/impl/src/main/java/org/opencord/olt/driver/NokiaOltPipeline.java b/impl/src/main/java/org/opencord/olt/driver/NokiaOltPipeline.java
new file mode 100644
index 0000000..f8b9009
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/driver/NokiaOltPipeline.java
@@ -0,0 +1,783 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.olt.driver;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.onlab.osgi.ServiceDirectory;
+import org.onlab.packet.EthType;
+import org.onlab.packet.IPv4;
+import org.onlab.packet.VlanId;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.behaviour.NextGroup;
+import org.onosproject.net.behaviour.Pipeliner;
+import org.onosproject.net.behaviour.PipelinerContext;
+import org.onosproject.net.driver.AbstractHandlerBehaviour;
+import org.onosproject.net.flow.DefaultFlowRule;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleOperations;
+import org.onosproject.net.flow.FlowRuleOperationsContext;
+import org.onosproject.net.flow.FlowRuleService;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flow.criteria.Criteria;
+import org.onosproject.net.flow.criteria.Criterion;
+import org.onosproject.net.flow.criteria.EthTypeCriterion;
+import org.onosproject.net.flow.criteria.IPCriterion;
+import org.onosproject.net.flow.criteria.IPProtocolCriterion;
+import org.onosproject.net.flow.criteria.PortCriterion;
+import org.onosproject.net.flow.criteria.VlanIdCriterion;
+import org.onosproject.net.flow.instructions.Instruction;
+import org.onosproject.net.flow.instructions.Instructions;
+import org.onosproject.net.flow.instructions.L2ModificationInstruction;
+import org.onosproject.net.flowobjective.FilteringObjective;
+import org.onosproject.net.flowobjective.FlowObjectiveStore;
+import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.flowobjective.NextObjective;
+import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.group.DefaultGroupBucket;
+import org.onosproject.net.group.DefaultGroupDescription;
+import org.onosproject.net.group.DefaultGroupKey;
+import org.onosproject.net.group.Group;
+import org.onosproject.net.group.GroupBucket;
+import org.onosproject.net.group.GroupBuckets;
+import org.onosproject.net.group.GroupDescription;
+import org.onosproject.net.group.GroupEvent;
+import org.onosproject.net.group.GroupKey;
+import org.onosproject.net.group.GroupListener;
+import org.onosproject.net.group.GroupService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Pipeliner for OLT device.
+ */
+
+public class NokiaOltPipeline extends AbstractHandlerBehaviour implements Pipeliner {
+
+    private static final Integer QQ_TABLE = 1;
+    private static final short MCAST_VLAN = 4000;
+    private static final String OLTCOOKIES = "olt-cookies-must-be-unique";
+    private static final int EAPOL_FLOW_PRIORITY = 1200;
+    private final Logger log = getLogger(getClass());
+
+    private ServiceDirectory serviceDirectory;
+    private FlowRuleService flowRuleService;
+    private GroupService groupService;
+    private CoreService coreService;
+    private StorageService storageService;
+
+    private DeviceId deviceId;
+    private ApplicationId appId;
+
+
+    protected FlowObjectiveStore flowObjectiveStore;
+
+    private Cache<GroupKey, NextObjective> pendingGroups;
+
+    protected static KryoNamespace appKryo = new KryoNamespace.Builder()
+            .register(KryoNamespaces.API)
+            .register(GroupKey.class)
+            .register(DefaultGroupKey.class)
+            .register(OltPipelineGroup.class)
+            .build("OltPipeline");
+    @Override
+    public void init(DeviceId deviceId, PipelinerContext context) {
+        log.debug("Initiate OLT pipeline");
+        this.serviceDirectory = context.directory();
+        this.deviceId = deviceId;
+
+        flowRuleService = serviceDirectory.get(FlowRuleService.class);
+        coreService = serviceDirectory.get(CoreService.class);
+        groupService = serviceDirectory.get(GroupService.class);
+        flowObjectiveStore = context.store();
+        storageService = serviceDirectory.get(StorageService.class);
+
+        appId = coreService.registerApplication(
+                "org.onosproject.driver.OLTPipeline");
+
+
+        pendingGroups = CacheBuilder.newBuilder()
+                .expireAfterWrite(20, TimeUnit.SECONDS)
+                .removalListener((RemovalNotification<GroupKey, NextObjective> notification) -> {
+                    if (notification.getCause() == RemovalCause.EXPIRED) {
+                        fail(notification.getValue(), ObjectiveError.GROUPINSTALLATIONFAILED);
+                    }
+                }).build();
+
+        groupService.addListener(new InnerGroupListener());
+
+    }
+
+    @Override
+    public void filter(FilteringObjective filter) {
+        Instructions.OutputInstruction output;
+
+        if (filter.meta() != null && !filter.meta().immediate().isEmpty()) {
+            output = (Instructions.OutputInstruction) filter.meta().immediate().stream()
+                    .filter(t -> t.type().equals(Instruction.Type.OUTPUT))
+                    .limit(1)
+                    .findFirst().get();
+
+            if (output == null || !output.port().equals(PortNumber.CONTROLLER)) {
+                log.error("OLT can only filter packet to controller");
+                fail(filter, ObjectiveError.UNSUPPORTED);
+                return;
+            }
+        } else {
+            fail(filter, ObjectiveError.BADPARAMS);
+            return;
+        }
+
+        if (filter.key().type() != Criterion.Type.IN_PORT) {
+            fail(filter, ObjectiveError.BADPARAMS);
+            return;
+        }
+
+        EthTypeCriterion ethType = (EthTypeCriterion)
+                filterForCriterion(filter.conditions(), Criterion.Type.ETH_TYPE);
+
+        if (ethType == null) {
+            fail(filter, ObjectiveError.BADPARAMS);
+            return;
+        }
+
+        if (ethType.ethType().equals(EthType.EtherType.EAPOL.ethType())) {
+            provisionEapol(filter, ethType, output);
+        } else if (ethType.ethType().equals(EthType.EtherType.IPV4.ethType())) {
+            IPProtocolCriterion ipProto = (IPProtocolCriterion)
+                    filterForCriterion(filter.conditions(), Criterion.Type.IP_PROTO);
+            if (ipProto.protocol() == IPv4.PROTOCOL_IGMP) {
+                provisionIgmp(filter, ethType, ipProto, output);
+            } else {
+                log.error("OLT can only filter igmp");
+                fail(filter, ObjectiveError.UNSUPPORTED);
+            }
+        } else {
+            log.error("OLT can only filter eapol and igmp");
+            fail(filter, ObjectiveError.UNSUPPORTED);
+        }
+
+    }
+
+    private void installObjective(FlowRule.Builder ruleBuilder, Objective objective) {
+        FlowRuleOperations.Builder flowBuilder = FlowRuleOperations.builder();
+        switch (objective.op()) {
+
+            case ADD:
+                flowBuilder.add(ruleBuilder.build());
+                break;
+            case REMOVE:
+                flowBuilder.remove(ruleBuilder.build());
+                break;
+            default:
+                log.warn("Unknown operation {}", objective.op());
+        }
+
+        flowRuleService.apply(flowBuilder.build(new FlowRuleOperationsContext() {
+            @Override
+            public void onSuccess(FlowRuleOperations ops) {
+                objective.context().ifPresent(context -> context.onSuccess(objective));
+            }
+
+            @Override
+            public void onError(FlowRuleOperations ops) {
+                objective.context()
+                        .ifPresent(context -> context.onError(objective, ObjectiveError.FLOWINSTALLATIONFAILED));
+            }
+        }));
+    }
+
+    @Override
+    public void forward(ForwardingObjective fwd) {
+
+        if (checkForMulticast(fwd)) {
+            processMulticastRule(fwd);
+            return;
+        }
+
+        if (checkForEapol(fwd)) {
+            log.warn("Discarding EAPOL flow which is not supported on this pipeline");
+            return;
+        }
+
+        TrafficTreatment treatment = fwd.treatment();
+
+        List<Instruction> instructions = treatment.allInstructions();
+
+        Optional<Instruction> vlanIntruction = instructions.stream()
+                .filter(i -> i.type() == Instruction.Type.L2MODIFICATION)
+                .filter(i -> ((L2ModificationInstruction) i).subtype() ==
+                        L2ModificationInstruction.L2SubType.VLAN_PUSH ||
+                        ((L2ModificationInstruction) i).subtype() ==
+                                L2ModificationInstruction.L2SubType.VLAN_POP)
+                .findAny();
+
+        if (vlanIntruction.isPresent()) {
+            L2ModificationInstruction vlanIns =
+                    (L2ModificationInstruction) vlanIntruction.get();
+
+            if (vlanIns.subtype() == L2ModificationInstruction.L2SubType.VLAN_PUSH) {
+                installUpstreamRules(fwd);
+            } else if (vlanIns.subtype() == L2ModificationInstruction.L2SubType.VLAN_POP) {
+                installDownstreamRules(fwd);
+            } else {
+                log.error("Unknown OLT operation: {}", fwd);
+                fail(fwd, ObjectiveError.UNSUPPORTED);
+                return;
+            }
+
+            pass(fwd);
+        } else {
+            TrafficSelector selector = fwd.selector();
+
+            if (fwd.treatment() != null) {
+                // Deal with SPECIFIC and VERSATILE in the same manner.
+                FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
+                        .forDevice(deviceId)
+                        .withSelector(selector)
+                        .fromApp(fwd.appId())
+                        .withPriority(fwd.priority())
+                        .withTreatment(fwd.treatment());
+
+                if (fwd.permanent()) {
+                    ruleBuilder.makePermanent();
+                } else {
+                    ruleBuilder.makeTemporary(fwd.timeout());
+                }
+                installObjective(ruleBuilder, fwd);
+
+            } else {
+                log.error("No treatment error: {}", fwd);
+                fail(fwd, ObjectiveError.UNSUPPORTED);
+            }
+        }
+
+    }
+
+
+    @Override
+    public void next(NextObjective nextObjective) {
+        if (nextObjective.type() != NextObjective.Type.BROADCAST) {
+            log.error("OLT only supports broadcast groups.");
+            fail(nextObjective, ObjectiveError.BADPARAMS);
+        }
+
+        if (nextObjective.next().size() != 1) {
+            log.error("OLT only supports singleton broadcast groups.");
+            fail(nextObjective, ObjectiveError.BADPARAMS);
+        }
+
+        TrafficTreatment treatment = nextObjective.next().stream().findFirst().get();
+
+
+        GroupBucket bucket = DefaultGroupBucket.createAllGroupBucket(treatment);
+        GroupKey key = new DefaultGroupKey(appKryo.serialize(nextObjective.id()));
+
+
+        pendingGroups.put(key, nextObjective);
+
+        switch (nextObjective.op()) {
+            case ADD:
+                GroupDescription groupDesc =
+                        new DefaultGroupDescription(deviceId,
+                                                    GroupDescription.Type.ALL,
+                                                    new GroupBuckets(Collections.singletonList(bucket)),
+                                                    key,
+                                                    null,
+                                                    nextObjective.appId());
+                groupService.addGroup(groupDesc);
+                break;
+            case REMOVE:
+                groupService.removeGroup(deviceId, key, nextObjective.appId());
+                break;
+            case ADD_TO_EXISTING:
+                groupService.addBucketsToGroup(deviceId, key,
+                                               new GroupBuckets(Collections.singletonList(bucket)),
+                                               key, nextObjective.appId());
+                break;
+            case REMOVE_FROM_EXISTING:
+                groupService.removeBucketsFromGroup(deviceId, key,
+                                                    new GroupBuckets(Collections.singletonList(bucket)),
+                                                    key, nextObjective.appId());
+                break;
+            default:
+                log.warn("Unknown next objective operation: {}", nextObjective.op());
+        }
+
+
+    }
+
+    private void processMulticastRule(ForwardingObjective fwd) {
+        if (fwd.nextId() == null) {
+            log.error("Multicast objective does not have a next id");
+            fail(fwd, ObjectiveError.BADPARAMS);
+        }
+
+        GroupKey key = getGroupForNextObjective(fwd.nextId());
+
+        if (key == null) {
+            log.error("Group for forwarding objective missing: {}", fwd);
+            fail(fwd, ObjectiveError.GROUPMISSING);
+        }
+
+        Group group = groupService.getGroup(deviceId, key);
+        TrafficTreatment treatment =
+                buildTreatment(Instructions.createGroup(group.id()));
+
+        FlowRule rule = DefaultFlowRule.builder()
+                .fromApp(fwd.appId())
+                .forDevice(deviceId)
+                .forTable(0)
+                .makePermanent()
+                .withPriority(fwd.priority())
+                .withSelector(fwd.selector())
+                .withTreatment(treatment)
+                .build();
+
+        FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
+        switch (fwd.op()) {
+
+            case ADD:
+                builder.add(rule);
+                break;
+            case REMOVE:
+                builder.remove(rule);
+                break;
+            case ADD_TO_EXISTING:
+            case REMOVE_FROM_EXISTING:
+                break;
+            default:
+                log.warn("Unknown forwarding operation: {}", fwd.op());
+        }
+
+        applyFlowRules(builder, fwd);
+
+    }
+
+    private boolean checkForMulticast(ForwardingObjective fwd) {
+
+        IPCriterion ip = (IPCriterion) filterForCriterion(fwd.selector().criteria(),
+                                                          Criterion.Type.IPV4_DST);
+
+        if (ip == null) {
+            return false;
+        }
+
+        return ip.ip().isMulticast();
+
+    }
+
+    private boolean checkForEapol(ForwardingObjective fwd) {
+        EthTypeCriterion ethType = (EthTypeCriterion)
+                filterForCriterion(fwd.selector().criteria(), Criterion.Type.ETH_TYPE);
+
+        return ethType != null && ethType.ethType().equals(EthType.EtherType.EAPOL.ethType());
+    }
+    private GroupKey getGroupForNextObjective(Integer nextId) {
+        NextGroup next = flowObjectiveStore.getNextGroup(nextId);
+        return appKryo.deserialize(next.data());
+
+    }
+
+    private void installDownstreamRules(ForwardingObjective fwd) {
+        List<Pair<Instruction, Instruction>> vlanOps =
+                vlanOps(fwd,
+                        L2ModificationInstruction.L2SubType.VLAN_POP);
+
+        if (vlanOps == null) {
+            return;
+        }
+
+        Instructions.OutputInstruction output = (Instructions.OutputInstruction) fetchOutput(fwd, "downstream");
+
+        if (output == null) {
+            return;
+        }
+
+        Pair<Instruction, Instruction> popAndRewrite = vlanOps.remove(0);
+
+        TrafficSelector selector = fwd.selector();
+
+        Criterion outerVlan = selector.getCriterion(Criterion.Type.VLAN_VID);
+        Criterion innerVlan = selector.getCriterion(Criterion.Type.INNER_VLAN_VID);
+        Criterion inport = selector.getCriterion(Criterion.Type.IN_PORT);
+        Criterion bullshit = Criteria.matchMetadata(output.port().toLong());
+
+        if (outerVlan == null || innerVlan == null || inport == null) {
+            log.error("Forwarding objective is underspecified: {}", fwd);
+            fail(fwd, ObjectiveError.BADPARAMS);
+            return;
+        }
+
+        Criterion innerVid = Criteria.matchVlanId(((VlanIdCriterion) innerVlan).vlanId());
+
+        FlowRule.Builder outer = DefaultFlowRule.builder()
+                .fromApp(fwd.appId())
+                .forDevice(deviceId)
+                .makePermanent()
+                .withPriority(fwd.priority())
+                .withSelector(buildSelector(inport, outerVlan, bullshit))
+                .withTreatment(buildTreatment(popAndRewrite.getLeft(),
+                                              Instructions.transition(QQ_TABLE)));
+
+        FlowRule.Builder inner = DefaultFlowRule.builder()
+                .fromApp(fwd.appId())
+                .forDevice(deviceId)
+                .forTable(QQ_TABLE)
+                .makePermanent()
+                .withPriority(fwd.priority())
+                .withSelector(buildSelector(inport, innerVid))
+                .withTreatment(buildTreatment(popAndRewrite.getLeft(),
+                                              output));
+
+        applyRules(fwd, inner, outer);
+
+    }
+
+    private boolean hasUntaggedVlanTag(TrafficSelector selector) {
+        Iterator<Criterion> iter = selector.criteria().iterator();
+
+        while (iter.hasNext()) {
+            Criterion criterion = iter.next();
+            if (criterion.type() == Criterion.Type.VLAN_VID &&
+                    ((VlanIdCriterion) criterion).vlanId().toShort() == VlanId.UNTAGGED) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private void installUpstreamRules(ForwardingObjective fwd) {
+        List<Pair<Instruction, Instruction>> vlanOps =
+                vlanOps(fwd,
+                        L2ModificationInstruction.L2SubType.VLAN_PUSH);
+        FlowRule.Builder inner;
+
+        if (vlanOps == null) {
+            return;
+        }
+
+        Instruction output = fetchOutput(fwd, "upstream");
+
+        if (output == null) {
+            return;
+        }
+
+        Pair<Instruction, Instruction> innerPair = vlanOps.remove(0);
+
+        Pair<Instruction, Instruction> outerPair = vlanOps.remove(0);
+
+
+        if (hasUntaggedVlanTag(fwd.selector())) {
+            inner = DefaultFlowRule.builder()
+                    .fromApp(fwd.appId())
+                    .forDevice(deviceId)
+                    .makePermanent()
+                    .withPriority(fwd.priority())
+                    .withSelector(fwd.selector())
+                    .withTreatment(buildTreatment(innerPair.getLeft(),
+                                                  innerPair.getRight(),
+                                                  Instructions.transition(QQ_TABLE)));
+        } else {
+            inner = DefaultFlowRule.builder()
+                    .fromApp(fwd.appId())
+                    .forDevice(deviceId)
+                    .makePermanent()
+                    .withPriority(fwd.priority())
+                    .withSelector(fwd.selector())
+                    .withTreatment(buildTreatment(
+                            innerPair.getRight(),
+                            Instructions.transition(QQ_TABLE)));
+        }
+
+
+        PortCriterion inPort = (PortCriterion)
+                fwd.selector().getCriterion(Criterion.Type.IN_PORT);
+
+        VlanId cVlanId = ((L2ModificationInstruction.ModVlanIdInstruction)
+                innerPair.getRight()).vlanId();
+
+        FlowRule.Builder outer = DefaultFlowRule.builder()
+                .fromApp(fwd.appId())
+                .forDevice(deviceId)
+                .forTable(QQ_TABLE)
+                .makePermanent()
+                .withPriority(fwd.priority())
+                .withSelector(buildSelector(inPort,
+                                            Criteria.matchVlanId(cVlanId)))
+                .withTreatment(buildTreatment(outerPair.getLeft(),
+                                              outerPair.getRight(),
+                                              output));
+
+        applyRules(fwd, inner, outer);
+
+    }
+
+    private Instruction fetchOutput(ForwardingObjective fwd, String direction) {
+        Instruction output = fwd.treatment().allInstructions().stream()
+                .filter(i -> i.type() == Instruction.Type.OUTPUT)
+                .findFirst().orElse(null);
+
+        if (output == null) {
+            log.error("OLT {} rule has no output", direction);
+            fail(fwd, ObjectiveError.BADPARAMS);
+            return null;
+        }
+        return output;
+    }
+
+    private List<Pair<Instruction, Instruction>> vlanOps(ForwardingObjective fwd,
+                                                         L2ModificationInstruction.L2SubType type) {
+
+        List<Pair<Instruction, Instruction>> vlanOps = findVlanOps(
+                fwd.treatment().allInstructions(), type);
+
+        if (vlanOps == null) {
+            String direction = type == L2ModificationInstruction.L2SubType.VLAN_POP
+                    ? "downstream" : "upstream";
+            log.error("Missing vlan operations in {} forwarding: {}", direction, fwd);
+            fail(fwd, ObjectiveError.BADPARAMS);
+            return null;
+        }
+        return vlanOps;
+    }
+
+
+    private List<Pair<Instruction, Instruction>> findVlanOps(List<Instruction> instructions,
+                                                             L2ModificationInstruction.L2SubType type) {
+
+        List<Instruction> vlanPushs = findL2Instructions(
+                type,
+                instructions);
+        List<Instruction> vlanSets = findL2Instructions(
+                L2ModificationInstruction.L2SubType.VLAN_ID,
+                instructions);
+
+        if (vlanPushs.size() != vlanSets.size()) {
+            return null;
+        }
+
+        List<Pair<Instruction, Instruction>> pairs = Lists.newArrayList();
+
+        for (int i = 0; i < vlanPushs.size(); i++) {
+            pairs.add(new ImmutablePair<>(vlanPushs.get(i), vlanSets.get(i)));
+        }
+        return pairs;
+    }
+
+    private List<Instruction> findL2Instructions(L2ModificationInstruction.L2SubType subType,
+                                                 List<Instruction> actions) {
+        return actions.stream()
+                .filter(i -> i.type() == Instruction.Type.L2MODIFICATION)
+                .filter(i -> ((L2ModificationInstruction) i).subtype() == subType)
+                .collect(Collectors.toList());
+    }
+
+    private void provisionEapol(FilteringObjective filter,
+                                EthTypeCriterion ethType,
+                                Instructions.OutputInstruction output) {
+
+        TrafficSelector selector = buildSelector(filter.key(), ethType);
+        TrafficTreatment treatment = buildTreatment(output);
+        buildAndApplyRule(filter, selector, treatment, EAPOL_FLOW_PRIORITY);
+
+    }
+
+    private void provisionIgmp(FilteringObjective filter, EthTypeCriterion ethType,
+                               IPProtocolCriterion ipProto,
+                               Instructions.OutputInstruction output) {
+        TrafficSelector selector = buildSelector(filter.key(), ethType, ipProto);
+        TrafficTreatment treatment = buildTreatment(output);
+        buildAndApplyRule(filter, selector, treatment);
+    }
+
+    private void buildAndApplyRule(FilteringObjective filter, TrafficSelector selector,
+                                   TrafficTreatment treatment) {
+        buildAndApplyRule(filter, selector, treatment, filter.priority());
+    }
+
+    private void buildAndApplyRule(FilteringObjective filter, TrafficSelector selector,
+                                   TrafficTreatment treatment, int priority) {
+        FlowRule rule = DefaultFlowRule.builder()
+                .fromApp(filter.appId())
+                .forDevice(deviceId)
+                .forTable(0)
+                .makePermanent()
+                .withSelector(selector)
+                .withTreatment(treatment)
+                .withPriority(priority)
+                .build();
+
+        FlowRuleOperations.Builder opsBuilder = FlowRuleOperations.builder();
+
+        switch (filter.type()) {
+            case PERMIT:
+                opsBuilder.add(rule);
+                break;
+            case DENY:
+                opsBuilder.remove(rule);
+                break;
+            default:
+                log.warn("Unknown filter type : {}", filter.type());
+                fail(filter, ObjectiveError.UNSUPPORTED);
+        }
+
+        applyFlowRules(opsBuilder, filter);
+    }
+
+    private void applyRules(ForwardingObjective fwd,
+                            FlowRule.Builder inner, FlowRule.Builder outer) {
+        FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
+        switch (fwd.op()) {
+            case ADD:
+                builder.add(inner.build()).add(outer.build());
+                break;
+            case REMOVE:
+                builder.remove(inner.build()).remove(outer.build());
+                break;
+            case ADD_TO_EXISTING:
+                break;
+            case REMOVE_FROM_EXISTING:
+                break;
+            default:
+                log.warn("Unknown forwarding operation: {}", fwd.op());
+        }
+
+        applyFlowRules(builder, fwd);
+    }
+
+    private void applyFlowRules(FlowRuleOperations.Builder builder,
+                                Objective objective) {
+        flowRuleService.apply(builder.build(new FlowRuleOperationsContext() {
+            @Override
+            public void onSuccess(FlowRuleOperations ops) {
+                pass(objective);
+            }
+
+            @Override
+            public void onError(FlowRuleOperations ops) {
+                fail(objective, ObjectiveError.FLOWINSTALLATIONFAILED);
+            }
+        }));
+    }
+
+    private Criterion filterForCriterion(Collection<Criterion> criteria, Criterion.Type type) {
+        return criteria.stream()
+                .filter(c -> c.type().equals(type))
+                .limit(1)
+                .findFirst().orElse(null);
+    }
+
+    private TrafficSelector buildSelector(Criterion... criteria) {
+
+
+        TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
+
+        for (Criterion c : criteria) {
+            sBuilder.add(c);
+        }
+
+        return sBuilder.build();
+    }
+
+    private TrafficTreatment buildTreatment(Instruction... instructions) {
+
+
+        TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
+
+        for (Instruction i : instructions) {
+            tBuilder.add(i);
+        }
+
+        return tBuilder.build();
+    }
+
+
+    private void fail(Objective obj, ObjectiveError error) {
+        obj.context().ifPresent(context -> context.onError(obj, error));
+    }
+
+    private void pass(Objective obj) {
+        obj.context().ifPresent(context -> context.onSuccess(obj));
+    }
+
+
+    private class InnerGroupListener implements GroupListener {
+        @Override
+        public void event(GroupEvent event) {
+            if (event.type() == GroupEvent.Type.GROUP_ADDED || event.type() == GroupEvent.Type.GROUP_UPDATED) {
+                GroupKey key = event.subject().appCookie();
+
+                NextObjective obj = pendingGroups.getIfPresent(key);
+                if (obj != null) {
+                    flowObjectiveStore.putNextGroup(obj.id(), new OltPipelineGroup(key));
+                    pass(obj);
+                    pendingGroups.invalidate(key);
+                }
+            }
+        }
+    }
+
+    private static class OltPipelineGroup implements NextGroup {
+
+        private final GroupKey key;
+
+        public OltPipelineGroup(GroupKey key) {
+            this.key = key;
+        }
+
+        public GroupKey key() {
+            return key;
+        }
+
+        @Override
+        public byte[] data() {
+            return appKryo.serialize(key);
+        }
+
+    }
+
+    @Override
+    public List<String> getNextMappings(NextGroup nextGroup) {
+        // TODO Implementation deferred to vendor
+        return null;
+    }
+}
diff --git a/impl/src/main/java/org/opencord/olt/driver/OltDriversLoader.java b/impl/src/main/java/org/opencord/olt/driver/OltDriversLoader.java
new file mode 100644
index 0000000..a91621f
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/driver/OltDriversLoader.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.driver;
+
+import org.onosproject.net.driver.AbstractDriverLoader;
+import org.osgi.service.component.annotations.Component;
+import org.slf4j.Logger;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Loader for olt device drivers.
+ */
+@Component(immediate = true)
+public class OltDriversLoader extends AbstractDriverLoader {
+
+    private final Logger log = getLogger(getClass());
+
+    public OltDriversLoader() {
+        super("/olt-drivers.xml");
+    }
+
+    @Override
+    public void activate() {
+        super.activate();
+    }
+
+    @Override
+    public void deactivate() {
+        super.deactivate();
+    }
+}
diff --git a/impl/src/main/java/org/opencord/olt/driver/OltPipeline.java b/impl/src/main/java/org/opencord/olt/driver/OltPipeline.java
new file mode 100644
index 0000000..d5beb63
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/driver/OltPipeline.java
@@ -0,0 +1,1291 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.olt.driver;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.onlab.osgi.ServiceDirectory;
+import org.onlab.packet.EthType;
+import org.onlab.packet.IPv4;
+import org.onlab.packet.IPv6;
+import org.onlab.packet.IpPrefix;
+import org.onlab.packet.VlanId;
+import org.onlab.util.AbstractAccumulator;
+import org.onlab.util.Accumulator;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.behaviour.NextGroup;
+import org.onosproject.net.behaviour.Pipeliner;
+import org.onosproject.net.behaviour.PipelinerContext;
+import org.onosproject.net.driver.AbstractHandlerBehaviour;
+import org.onosproject.net.driver.Driver;
+import org.onosproject.net.flow.DefaultFlowRule;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleOperations;
+import org.onosproject.net.flow.FlowRuleOperationsContext;
+import org.onosproject.net.flow.FlowRuleService;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flow.criteria.Criteria;
+import org.onosproject.net.flow.criteria.Criterion;
+import org.onosproject.net.flow.criteria.EthTypeCriterion;
+import org.onosproject.net.flow.criteria.IPCriterion;
+import org.onosproject.net.flow.criteria.IPProtocolCriterion;
+import org.onosproject.net.flow.criteria.PortCriterion;
+import org.onosproject.net.flow.criteria.UdpPortCriterion;
+import org.onosproject.net.flow.criteria.VlanIdCriterion;
+import org.onosproject.net.flow.instructions.Instruction;
+import org.onosproject.net.flow.instructions.Instructions;
+import org.onosproject.net.flow.instructions.L2ModificationInstruction;
+import org.onosproject.net.flowobjective.FilteringObjective;
+import org.onosproject.net.flowobjective.FlowObjectiveStore;
+import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.flowobjective.NextObjective;
+import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.group.DefaultGroupBucket;
+import org.onosproject.net.group.DefaultGroupDescription;
+import org.onosproject.net.group.DefaultGroupKey;
+import org.onosproject.net.group.Group;
+import org.onosproject.net.group.GroupBucket;
+import org.onosproject.net.group.GroupBuckets;
+import org.onosproject.net.group.GroupDescription;
+import org.onosproject.net.group.GroupEvent;
+import org.onosproject.net.group.GroupKey;
+import org.onosproject.net.group.GroupListener;
+import org.onosproject.net.group.GroupService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.Timer;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.onosproject.core.CoreService.CORE_APP_NAME;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Pipeliner for OLT device.
+ */
+
+public class OltPipeline extends AbstractHandlerBehaviour implements Pipeliner {
+
+    private static final Integer QQ_TABLE = 1;
+    private static final int NO_ACTION_PRIORITY = 500;
+    private static final String DOWNSTREAM = "downstream";
+    private static final String UPSTREAM = "upstream";
+    private final Logger log = getLogger(getClass());
+
+    private ServiceDirectory serviceDirectory;
+    private FlowRuleService flowRuleService;
+    private GroupService groupService;
+    private CoreService coreService;
+    private StorageService storageService;
+
+    private DeviceId deviceId;
+    private ApplicationId appId;
+
+
+    protected FlowObjectiveStore flowObjectiveStore;
+
+    private Cache<GroupKey, NextObjective> pendingGroups;
+
+    protected static KryoNamespace appKryo = new KryoNamespace.Builder()
+            .register(KryoNamespaces.API)
+            .register(GroupKey.class)
+            .register(DefaultGroupKey.class)
+            .register(OltPipelineGroup.class)
+            .build("OltPipeline");
+
+    private static final Timer TIMER = new Timer("filterobj-batching");
+    private Accumulator<Pair<FilteringObjective, FlowRule>> accumulator;
+
+    // accumulator executor service
+    private ScheduledExecutorService accumulatorExecutorService
+            = newSingleThreadScheduledExecutor(groupedThreads("OltPipeliner", "acc-%d", log));
+
+    @Override
+    public void init(DeviceId deviceId, PipelinerContext context) {
+        log.debug("Initiate OLT pipeline");
+        this.serviceDirectory = context.directory();
+        this.deviceId = deviceId;
+
+        flowRuleService = serviceDirectory.get(FlowRuleService.class);
+        coreService = serviceDirectory.get(CoreService.class);
+        groupService = serviceDirectory.get(GroupService.class);
+        flowObjectiveStore = context.store();
+        storageService = serviceDirectory.get(StorageService.class);
+
+        appId = coreService.registerApplication(
+                "org.onosproject.driver.OLTPipeline");
+
+        // Init the accumulator, if enabled
+        if (isAccumulatorEnabled()) {
+            log.debug("Building accumulator with maxObjs {}, batchMs {}, idleMs {}",
+                      context.accumulatorMaxObjectives(), context.accumulatorMaxBatchMillis(),
+                      context.accumulatorMaxIdleMillis());
+            accumulator = new ObjectiveAccumulator(context.accumulatorMaxObjectives(),
+                                                   context.accumulatorMaxBatchMillis(),
+                                                   context.accumulatorMaxIdleMillis());
+        }
+
+
+        pendingGroups = CacheBuilder.newBuilder()
+                .expireAfterWrite(20, TimeUnit.SECONDS)
+                .removalListener((RemovalNotification<GroupKey, NextObjective> notification) -> {
+                    if (notification.getCause() == RemovalCause.EXPIRED) {
+                        fail(notification.getValue(), ObjectiveError.GROUPINSTALLATIONFAILED);
+                    }
+                }).build();
+
+        groupService.addListener(new InnerGroupListener());
+
+    }
+
+    public boolean isAccumulatorEnabled() {
+        Driver driver = super.data().driver();
+        // we cannot determine the property
+        if (driver == null) {
+            return false;
+        }
+        return Boolean.parseBoolean(driver.getProperty(ACCUMULATOR_ENABLED));
+    }
+
+    @Override
+    public void filter(FilteringObjective filter) {
+        Instructions.OutputInstruction output;
+
+        if (filter.meta() != null && !filter.meta().immediate().isEmpty()) {
+            output = (Instructions.OutputInstruction) filter.meta().immediate().stream()
+                    .filter(t -> t.type().equals(Instruction.Type.OUTPUT))
+                    .limit(1)
+                    .findFirst().get();
+
+            if (output == null || !output.port().equals(PortNumber.CONTROLLER)) {
+                log.warn("OLT can only filter packet to controller");
+                fail(filter, ObjectiveError.UNSUPPORTED);
+                return;
+            }
+        } else {
+            fail(filter, ObjectiveError.BADPARAMS);
+            return;
+        }
+
+        if (filter.key().type() != Criterion.Type.IN_PORT) {
+            fail(filter, ObjectiveError.BADPARAMS);
+            return;
+        }
+
+        EthTypeCriterion ethType = (EthTypeCriterion)
+                filterForCriterion(filter.conditions(), Criterion.Type.ETH_TYPE);
+
+        if (ethType == null) {
+            fail(filter, ObjectiveError.BADPARAMS);
+            return;
+        }
+        Optional<Instruction> vlanId = filter.meta().immediate().stream()
+                .filter(t -> t.type().equals(Instruction.Type.L2MODIFICATION)
+                        && ((L2ModificationInstruction) t).subtype()
+                        .equals(L2ModificationInstruction.L2SubType.VLAN_ID))
+                .limit(1)
+                .findFirst();
+
+        Optional<Instruction> vlanPcp = filter.meta().immediate().stream()
+                .filter(t -> t.type().equals(Instruction.Type.L2MODIFICATION)
+                        && ((L2ModificationInstruction) t).subtype()
+                        .equals(L2ModificationInstruction.L2SubType.VLAN_PCP))
+                .limit(1)
+                .findFirst();
+
+        Optional<Instruction> vlanPush = filter.meta().immediate().stream()
+                .filter(t -> t.type().equals(Instruction.Type.L2MODIFICATION)
+                        && ((L2ModificationInstruction) t).subtype()
+                        .equals(L2ModificationInstruction.L2SubType.VLAN_PUSH))
+                .limit(1)
+                .findFirst();
+
+        if (ethType.ethType().equals(EthType.EtherType.EAPOL.ethType())) {
+
+            if (vlanId.isEmpty() || vlanPush.isEmpty()) {
+                log.warn("Missing EAPOL vlan or vlanPush");
+                fail(filter, ObjectiveError.BADPARAMS);
+                return;
+            }
+            provisionEthTypeBasedFilter(filter, ethType, output,
+                                        (L2ModificationInstruction) vlanId.get(),
+                                        (L2ModificationInstruction) vlanPush.get());
+        } else if (ethType.ethType().equals(EthType.EtherType.PPPoED.ethType()))  {
+            provisionPPPoED(filter, ethType, vlanId.orElse(null), vlanPcp.orElse(null), output);
+        } else if (ethType.ethType().equals(EthType.EtherType.LLDP.ethType())) {
+            provisionEthTypeBasedFilter(filter, ethType, output, null, null);
+        } else if (ethType.ethType().equals(EthType.EtherType.IPV4.ethType())) {
+            IPProtocolCriterion ipProto = (IPProtocolCriterion)
+                    filterForCriterion(filter.conditions(), Criterion.Type.IP_PROTO);
+            if (ipProto == null) {
+                log.warn("OLT can only filter IGMP and DHCP");
+                fail(filter, ObjectiveError.UNSUPPORTED);
+                return;
+            }
+            if (ipProto.protocol() == IPv4.PROTOCOL_IGMP) {
+                provisionIgmp(filter, ethType, ipProto, output,
+                              vlanId.orElse(null),
+                              vlanPcp.orElse(null));
+            } else if (ipProto.protocol() == IPv4.PROTOCOL_UDP) {
+                UdpPortCriterion udpSrcPort = (UdpPortCriterion)
+                        filterForCriterion(filter.conditions(), Criterion.Type.UDP_SRC);
+
+                UdpPortCriterion udpDstPort = (UdpPortCriterion)
+                        filterForCriterion(filter.conditions(), Criterion.Type.UDP_DST);
+
+                if ((udpSrcPort.udpPort().toInt() == 67 && udpDstPort.udpPort().toInt() == 68) ||
+                        (udpSrcPort.udpPort().toInt() == 68 && udpDstPort.udpPort().toInt() == 67)) {
+                    provisionDhcp(filter, ethType, ipProto, udpSrcPort, udpDstPort, vlanId.orElse(null),
+                                  vlanPcp.orElse(null), output);
+                } else {
+                    log.warn("Filtering rule with unsupported UDP src {} or dst {} port", udpSrcPort, udpDstPort);
+                    fail(filter, ObjectiveError.UNSUPPORTED);
+                }
+            } else {
+                log.warn("Currently supporting only IGMP and DHCP filters for IPv4 packets");
+                fail(filter, ObjectiveError.UNSUPPORTED);
+            }
+        } else if (ethType.ethType().equals(EthType.EtherType.IPV6.ethType())) {
+            IPProtocolCriterion ipProto = (IPProtocolCriterion)
+                    filterForCriterion(filter.conditions(), Criterion.Type.IP_PROTO);
+            if (ipProto == null) {
+                log.warn("OLT can only filter DHCP");
+                fail(filter, ObjectiveError.UNSUPPORTED);
+                return;
+            }
+            if (ipProto.protocol() == IPv6.PROTOCOL_UDP) {
+                UdpPortCriterion udpSrcPort = (UdpPortCriterion)
+                        filterForCriterion(filter.conditions(), Criterion.Type.UDP_SRC);
+
+                UdpPortCriterion udpDstPort = (UdpPortCriterion)
+                        filterForCriterion(filter.conditions(), Criterion.Type.UDP_DST);
+
+                if ((udpSrcPort.udpPort().toInt() == 546 && udpDstPort.udpPort().toInt() == 547) ||
+                        (udpSrcPort.udpPort().toInt() == 547 && udpDstPort.udpPort().toInt() == 546)) {
+                    provisionDhcp(filter, ethType, ipProto, udpSrcPort, udpDstPort, vlanId.orElse(null),
+                                  vlanPcp.orElse(null), output);
+                } else {
+                    log.warn("Filtering rule with unsupported UDP src {} or dst {} port", udpSrcPort, udpDstPort);
+                    fail(filter, ObjectiveError.UNSUPPORTED);
+                }
+            } else {
+                log.warn("Currently supporting only DHCP filters for IPv6 packets");
+                fail(filter, ObjectiveError.UNSUPPORTED);
+            }
+        } else {
+            log.warn("\nOnly the following are Supported in OLT for filter ->\n"
+                             + "ETH TYPE : EAPOL, LLDP and IPV4\n"
+                             + "IPV4 TYPE: IGMP and UDP (for DHCP)"
+                             + "IPV6 TYPE: UDP (for DHCP)");
+            fail(filter, ObjectiveError.UNSUPPORTED);
+        }
+
+    }
+
+
+    @Override
+    public void forward(ForwardingObjective fwd) {
+        log.debug("Installing forwarding objective {}", fwd);
+        if (checkForMulticast(fwd)) {
+            processMulticastRule(fwd);
+            return;
+        }
+
+        TrafficTreatment treatment = fwd.treatment();
+
+        List<Instruction> instructions = treatment.allInstructions();
+
+        Optional<Instruction> vlanInstruction = instructions.stream()
+                .filter(i -> i.type() == Instruction.Type.L2MODIFICATION)
+                .filter(i -> ((L2ModificationInstruction) i).subtype() ==
+                        L2ModificationInstruction.L2SubType.VLAN_PUSH ||
+                        ((L2ModificationInstruction) i).subtype() ==
+                                L2ModificationInstruction.L2SubType.VLAN_POP)
+                .findAny();
+
+
+        if (!vlanInstruction.isPresent()) {
+            installNoModificationRules(fwd);
+        } else {
+            L2ModificationInstruction vlanIns =
+                    (L2ModificationInstruction) vlanInstruction.get();
+            if (vlanIns.subtype() == L2ModificationInstruction.L2SubType.VLAN_PUSH) {
+                installUpstreamRules(fwd);
+            } else if (vlanIns.subtype() == L2ModificationInstruction.L2SubType.VLAN_POP) {
+                installDownstreamRules(fwd);
+            } else {
+                log.error("Unknown OLT operation: {}", fwd);
+                fail(fwd, ObjectiveError.UNSUPPORTED);
+                return;
+            }
+        }
+
+        pass(fwd);
+
+    }
+
+
+    @Override
+    public void next(NextObjective nextObjective) {
+        if (nextObjective.type() != NextObjective.Type.BROADCAST) {
+            log.error("OLT only supports broadcast groups.");
+            fail(nextObjective, ObjectiveError.BADPARAMS);
+            return;
+        }
+
+        if (nextObjective.next().size() != 1 && !nextObjective.op().equals(Objective.Operation.REMOVE)) {
+            log.error("OLT only supports singleton broadcast groups.");
+            fail(nextObjective, ObjectiveError.BADPARAMS);
+            return;
+        }
+
+        Optional<TrafficTreatment> treatmentOpt = nextObjective.next().stream().findFirst();
+        if (treatmentOpt.isEmpty() && !nextObjective.op().equals(Objective.Operation.REMOVE)) {
+            log.error("Next objective {} does not have a treatment", nextObjective);
+            fail(nextObjective, ObjectiveError.BADPARAMS);
+            return;
+        }
+
+        GroupKey key = new DefaultGroupKey(appKryo.serialize(nextObjective.id()));
+
+        pendingGroups.put(key, nextObjective);
+        log.trace("NextObjective Operation {}", nextObjective.op());
+        switch (nextObjective.op()) {
+            case ADD:
+                GroupDescription groupDesc =
+                        new DefaultGroupDescription(deviceId,
+                                                    GroupDescription.Type.ALL,
+                                                    new GroupBuckets(
+                                                            Collections.singletonList(
+                                                                    buildBucket(treatmentOpt.get()))),
+                                                    key,
+                                                    null,
+                                                    nextObjective.appId());
+                groupService.addGroup(groupDesc);
+                break;
+            case REMOVE:
+                groupService.removeGroup(deviceId, key, nextObjective.appId());
+                break;
+            case ADD_TO_EXISTING:
+                groupService.addBucketsToGroup(deviceId, key,
+                                               new GroupBuckets(
+                                                       Collections.singletonList(
+                                                               buildBucket(treatmentOpt.get()))),
+                                               key, nextObjective.appId());
+                break;
+            case REMOVE_FROM_EXISTING:
+                groupService.removeBucketsFromGroup(deviceId, key,
+                                                    new GroupBuckets(
+                                                            Collections.singletonList(
+                                                                    buildBucket(treatmentOpt.get()))),
+                                                    key, nextObjective.appId());
+                break;
+            default:
+                log.warn("Unknown next objective operation: {}", nextObjective.op());
+        }
+
+
+    }
+
+    private GroupBucket buildBucket(TrafficTreatment treatment) {
+        return DefaultGroupBucket.createAllGroupBucket(treatment);
+    }
+
+    private void processMulticastRule(ForwardingObjective fwd) {
+        if (fwd.nextId() == null) {
+            log.error("Multicast objective does not have a next id");
+            fail(fwd, ObjectiveError.BADPARAMS);
+        }
+
+        GroupKey key = getGroupForNextObjective(fwd.nextId());
+
+        if (key == null) {
+            log.error("Group for forwarding objective missing: {}", fwd);
+            fail(fwd, ObjectiveError.GROUPMISSING);
+        }
+
+        Group group = groupService.getGroup(deviceId, key);
+        TrafficTreatment treatment =
+                buildTreatment(Instructions.createGroup(group.id()));
+
+        TrafficSelector.Builder selectorBuilder = buildIpv4SelectorForMulticast(fwd);
+
+        FlowRule rule = DefaultFlowRule.builder()
+                .fromApp(fwd.appId())
+                .forDevice(deviceId)
+                .forTable(0)
+                .makePermanent()
+                .withPriority(fwd.priority())
+                .withSelector(selectorBuilder.build())
+                .withTreatment(treatment)
+                .build();
+
+        FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
+        switch (fwd.op()) {
+
+            case ADD:
+                builder.add(rule);
+                break;
+            case REMOVE:
+                builder.remove(rule);
+                break;
+            case ADD_TO_EXISTING:
+            case REMOVE_FROM_EXISTING:
+                break;
+            default:
+                log.warn("Unknown forwarding operation: {}", fwd.op());
+        }
+
+        applyFlowRules(ImmutableList.of(fwd), builder);
+
+
+    }
+
+    private TrafficSelector.Builder buildIpv4SelectorForMulticast(ForwardingObjective fwd) {
+        TrafficSelector.Builder builderToUpdate = DefaultTrafficSelector.builder();
+
+        Optional<Criterion> vlanIdCriterion = readFromSelector(fwd.meta(), Criterion.Type.VLAN_VID);
+        if (vlanIdCriterion.isPresent()) {
+            VlanId assignedVlan = ((VlanIdCriterion) vlanIdCriterion.get()).vlanId();
+            builderToUpdate.matchVlanId(assignedVlan);
+        }
+
+        Optional<Criterion> innerVlanIdCriterion = readFromSelector(fwd.meta(), Criterion.Type.INNER_VLAN_VID);
+        if (innerVlanIdCriterion.isPresent()) {
+            VlanId assignedInnerVlan = ((VlanIdCriterion) innerVlanIdCriterion.get()).vlanId();
+            builderToUpdate.matchMetadata(assignedInnerVlan.toShort());
+        }
+
+        Optional<Criterion> ethTypeCriterion = readFromSelector(fwd.selector(), Criterion.Type.ETH_TYPE);
+        if (ethTypeCriterion.isPresent()) {
+            EthType ethType = ((EthTypeCriterion) ethTypeCriterion.get()).ethType();
+            builderToUpdate.matchEthType(ethType.toShort());
+        }
+
+        Optional<Criterion> ipv4DstCriterion = readFromSelector(fwd.selector(), Criterion.Type.IPV4_DST);
+        if (ipv4DstCriterion.isPresent()) {
+            IpPrefix ipv4Dst = ((IPCriterion) ipv4DstCriterion.get()).ip();
+            builderToUpdate.matchIPDst(ipv4Dst);
+        }
+
+        return builderToUpdate;
+    }
+
+    static Optional<Criterion> readFromSelector(TrafficSelector selector, Criterion.Type type) {
+        if (selector == null) {
+            return Optional.empty();
+        }
+        Criterion criterion = selector.getCriterion(type);
+        return (criterion == null)
+                ? Optional.empty() : Optional.of(criterion);
+    }
+
+    private boolean checkForMulticast(ForwardingObjective fwd) {
+
+        IPCriterion ip = (IPCriterion) filterForCriterion(fwd.selector().criteria(),
+                                                          Criterion.Type.IPV4_DST);
+
+        if (ip == null) {
+            return false;
+        }
+
+        return ip.ip().isMulticast();
+
+    }
+
+    private GroupKey getGroupForNextObjective(Integer nextId) {
+        NextGroup next = flowObjectiveStore.getNextGroup(nextId);
+        return appKryo.deserialize(next.data());
+
+    }
+
+    private void installNoModificationRules(ForwardingObjective fwd) {
+        Instructions.OutputInstruction output = (Instructions.OutputInstruction) fetchOutput(fwd, DOWNSTREAM);
+        Instructions.MetadataInstruction writeMetadata = fetchWriteMetadata(fwd);
+        Instructions.MeterInstruction meter = (Instructions.MeterInstruction) fetchMeter(fwd);
+
+        TrafficSelector selector = fwd.selector();
+
+        Criterion inport = selector.getCriterion(Criterion.Type.IN_PORT);
+        Criterion outerVlan = selector.getCriterion(Criterion.Type.VLAN_VID);
+        Criterion innerVlan = selector.getCriterion(Criterion.Type.INNER_VLAN_VID);
+
+        if (inport == null || output == null || innerVlan == null || outerVlan == null) {
+            // Avoid logging a non-error from lldp, bbdp and eapol core flows.
+            if (!fwd.appId().name().equals(CORE_APP_NAME)) {
+                log.error("Forwarding objective is underspecified: {}", fwd);
+            } else {
+                log.debug("Not installing unsupported core generated flow {}", fwd);
+            }
+            fail(fwd, ObjectiveError.BADPARAMS);
+            return;
+        }
+
+
+        FlowRule.Builder outer = DefaultFlowRule.builder()
+                .fromApp(fwd.appId())
+                .forDevice(deviceId)
+                .makePermanent()
+                .withPriority(fwd.priority())
+                .withSelector(buildSelector(inport, outerVlan))
+                .withTreatment(buildTreatment(output, writeMetadata, meter));
+
+        applyRules(fwd, outer);
+    }
+
+    private void installDownstreamRules(ForwardingObjective fwd) {
+        Instructions.OutputInstruction output = (Instructions.OutputInstruction) fetchOutput(fwd, DOWNSTREAM);
+
+        if (output == null) {
+            return;
+        }
+
+        TrafficSelector selector = fwd.selector();
+
+        Criterion outerVlan = selector.getCriterion(Criterion.Type.VLAN_VID);
+        Criterion outerPbit = selector.getCriterion(Criterion.Type.VLAN_PCP);
+        Criterion innerVlanCriterion = selector.getCriterion(Criterion.Type.INNER_VLAN_VID);
+        Criterion inport = selector.getCriterion(Criterion.Type.IN_PORT);
+        Criterion dstMac = selector.getCriterion(Criterion.Type.ETH_DST);
+
+        if (outerVlan == null || innerVlanCriterion == null || inport == null) {
+            // Avoid logging a non-error from lldp, bbdp and eapol core flows.
+            if (!fwd.appId().name().equals(CORE_APP_NAME)) {
+                log.error("Forwarding objective is underspecified: {}", fwd);
+            } else {
+                log.debug("Not installing unsupported core generated flow {}", fwd);
+            }
+            fail(fwd, ObjectiveError.BADPARAMS);
+            return;
+        }
+
+        VlanId innerVlan = ((VlanIdCriterion) innerVlanCriterion).vlanId();
+        Criterion innerVid = Criteria.matchVlanId(innerVlan);
+
+        // In the case where the C-tag is the same for all the subscribers,
+        // we add a metadata with the outport in the selector to make the flow unique
+        Criterion innerSelectorMeta = Criteria.matchMetadata(output.port().toLong());
+
+        if (innerVlan.toShort() == VlanId.ANY_VALUE) {
+            TrafficSelector outerSelector = buildSelector(inport, outerVlan, outerPbit, dstMac);
+            installDownstreamRulesForAnyVlan(fwd, output, outerSelector,
+                                             buildSelector(inport,
+                                                           Criteria.matchVlanId(VlanId.ANY),
+                                                           innerSelectorMeta));
+        } else {
+            // Required to differentiate the same match flows
+            // Please note that S tag and S p bit values will be same for the same service - so conflict flows!
+            // Metadata match criteria solves the conflict issue - but not used by the voltha
+            // Maybe - find a better way to solve the above problem
+            Criterion metadata = Criteria.matchMetadata(innerVlan.toShort());
+            TrafficSelector outerSelector = buildSelector(inport, metadata, outerVlan, outerPbit, dstMac);
+            installDownstreamRulesForVlans(fwd, output, outerSelector, buildSelector(inport, innerVid,
+                                                                                     innerSelectorMeta));
+        }
+    }
+
+    private void installDownstreamRulesForVlans(ForwardingObjective fwd, Instruction output,
+                                                TrafficSelector outerSelector, TrafficSelector innerSelector) {
+
+        List<Pair<Instruction, Instruction>> vlanOps =
+                vlanOps(fwd,
+                        L2ModificationInstruction.L2SubType.VLAN_POP);
+
+        if (vlanOps == null || vlanOps.isEmpty()) {
+            return;
+        }
+
+        Pair<Instruction, Instruction> popAndRewrite = vlanOps.remove(0);
+
+        TrafficTreatment innerTreatment;
+        VlanId setVlanId = ((L2ModificationInstruction.ModVlanIdInstruction) popAndRewrite.getRight()).vlanId();
+        if (VlanId.NONE.equals(setVlanId)) {
+            innerTreatment = (buildTreatment(popAndRewrite.getLeft(), fetchMeter(fwd),
+                                             writeMetadataIncludingOnlyTp(fwd), output));
+        } else {
+            innerTreatment = (buildTreatment(popAndRewrite.getRight(),
+                                             fetchMeter(fwd), writeMetadataIncludingOnlyTp(fwd), output));
+        }
+
+        List<Instruction> setVlanPcps = findL2Instructions(L2ModificationInstruction.L2SubType.VLAN_PCP,
+                                                           fwd.treatment().allInstructions());
+
+        Instruction innerPbitSet = null;
+
+        if (setVlanPcps != null && !setVlanPcps.isEmpty()) {
+            innerPbitSet = setVlanPcps.get(0);
+        }
+
+        VlanId remarkInnerVlan = null;
+        Optional<Criterion> vlanIdCriterion = readFromSelector(innerSelector, Criterion.Type.VLAN_VID);
+        if (vlanIdCriterion.isPresent()) {
+            remarkInnerVlan = ((VlanIdCriterion) vlanIdCriterion.get()).vlanId();
+        }
+
+        Instruction modVlanId = null;
+        if (innerPbitSet != null) {
+            modVlanId = Instructions.modVlanId(remarkInnerVlan);
+        }
+
+        //match: in port (nni), s-tag
+        //action: pop vlan (s-tag), write metadata, go to table 1, meter
+        FlowRule.Builder outer = DefaultFlowRule.builder()
+                .fromApp(fwd.appId())
+                .forDevice(deviceId)
+                .makePermanent()
+                .withPriority(fwd.priority())
+                .withSelector(outerSelector)
+                .withTreatment(buildTreatment(popAndRewrite.getLeft(), modVlanId,
+                                              innerPbitSet, fetchMeter(fwd),
+                                              fetchWriteMetadata(fwd),
+                                              Instructions.transition(QQ_TABLE)));
+
+        //match: in port (nni), c-tag
+        //action: immediate: write metadata and pop, meter, output
+        FlowRule.Builder inner = DefaultFlowRule.builder()
+                .fromApp(fwd.appId())
+                .forDevice(deviceId)
+                .forTable(QQ_TABLE)
+                .makePermanent()
+                .withPriority(fwd.priority())
+                .withSelector(innerSelector)
+                .withTreatment(innerTreatment);
+        applyRules(fwd, inner, outer);
+    }
+
+    private void installDownstreamRulesForAnyVlan(ForwardingObjective fwd, Instruction output,
+                                                  TrafficSelector outerSelector, TrafficSelector innerSelector) {
+
+        //match: in port (nni), s-tag
+        //action: immediate: write metadata, pop vlan, meter and go to table 1
+        FlowRule.Builder outer = DefaultFlowRule.builder()
+                .fromApp(fwd.appId())
+                .forDevice(deviceId)
+                .makePermanent()
+                .withPriority(fwd.priority())
+                .withSelector(outerSelector)
+                .withTreatment(buildTreatment(Instructions.popVlan(), fetchMeter(fwd),
+                                              fetchWriteMetadata(fwd), Instructions.transition(QQ_TABLE)));
+
+        //match: in port (nni) and s-tag
+        //action: immediate : write metadata, meter and output
+        FlowRule.Builder inner = DefaultFlowRule.builder()
+                .fromApp(fwd.appId())
+                .forDevice(deviceId)
+                .forTable(QQ_TABLE)
+                .makePermanent()
+                .withPriority(fwd.priority())
+                .withSelector(innerSelector)
+                .withTreatment(buildTreatment(fetchMeter(fwd),
+                                              writeMetadataIncludingOnlyTp(fwd), output));
+
+        applyRules(fwd, inner, outer);
+    }
+
+    private void installUpstreamRules(ForwardingObjective fwd) {
+        List<Pair<Instruction, Instruction>> vlanOps =
+                vlanOps(fwd,
+                        L2ModificationInstruction.L2SubType.VLAN_PUSH);
+
+        if (vlanOps == null || vlanOps.isEmpty()) {
+            return;
+        }
+
+        Instruction output = fetchOutput(fwd, UPSTREAM);
+
+        if (output == null) {
+            return;
+        }
+
+        Pair<Instruction, Instruction> outerPair = vlanOps.remove(0);
+
+        boolean noneValueVlanStatus = checkNoneVlanCriteria(fwd);
+        boolean anyValueVlanStatus = checkAnyVlanMatchCriteria(fwd);
+
+        if (anyValueVlanStatus) {
+            installUpstreamRulesForAnyVlan(fwd, output, outerPair);
+        } else {
+            Pair<Instruction, Instruction> innerPair = outerPair;
+            outerPair = vlanOps.remove(0);
+            installUpstreamRulesForVlans(fwd, output, innerPair, outerPair, noneValueVlanStatus);
+        }
+    }
+
+    private void installUpstreamRulesForVlans(ForwardingObjective fwd, Instruction output,
+                                              Pair<Instruction, Instruction> innerPair,
+                                              Pair<Instruction, Instruction> outerPair, Boolean noneValueVlanStatus) {
+
+        List<Instruction> setVlanPcps = findL2Instructions(L2ModificationInstruction.L2SubType.VLAN_PCP,
+                                                           fwd.treatment().allInstructions());
+
+        Instruction innerPbitSet = null;
+        Instruction outerPbitSet = null;
+
+        if (setVlanPcps != null && !setVlanPcps.isEmpty()) {
+            innerPbitSet = setVlanPcps.get(0);
+            outerPbitSet = setVlanPcps.get(1);
+        }
+
+        TrafficTreatment innerTreatment;
+        if (noneValueVlanStatus) {
+            innerTreatment = buildTreatment(innerPair.getLeft(), innerPair.getRight(), fetchMeter(fwd),
+                                            fetchWriteMetadata(fwd), innerPbitSet,
+                                            Instructions.transition(QQ_TABLE));
+        } else {
+            innerTreatment = buildTreatment(innerPair.getRight(), fetchMeter(fwd), fetchWriteMetadata(fwd),
+                                            innerPbitSet, Instructions.transition(QQ_TABLE));
+        }
+
+        //match: in port, vlanId (0 or None)
+        //action:
+        //if vlanId None, push & set c-tag go to table 1
+        //if vlanId 0 or any specific vlan, set c-tag, write metadata, meter and go to table 1
+        FlowRule.Builder inner = DefaultFlowRule.builder()
+                .fromApp(fwd.appId())
+                .forDevice(deviceId)
+                .makePermanent()
+                .withPriority(fwd.priority())
+                .withSelector(fwd.selector())
+                .withTreatment(innerTreatment);
+
+        PortCriterion inPort = (PortCriterion)
+                fwd.selector().getCriterion(Criterion.Type.IN_PORT);
+
+        VlanId cVlanId = ((L2ModificationInstruction.ModVlanIdInstruction)
+                innerPair.getRight()).vlanId();
+
+        //match: in port, c-tag
+        //action: immediate: push s-tag, write metadata, meter and output
+        FlowRule.Builder outer = DefaultFlowRule.builder()
+                .fromApp(fwd.appId())
+                .forDevice(deviceId)
+                .forTable(QQ_TABLE)
+                .makePermanent()
+                .withPriority(fwd.priority())
+                .withTreatment(buildTreatment(outerPair.getLeft(), outerPair.getRight(),
+                                              fetchMeter(fwd), writeMetadataIncludingOnlyTp(fwd),
+                                              outerPbitSet, output));
+
+        if (innerPbitSet != null) {
+            byte innerPbit = ((L2ModificationInstruction.ModVlanPcpInstruction)
+                    innerPbitSet).vlanPcp();
+            outer.withSelector(buildSelector(inPort, Criteria.matchVlanId(cVlanId), Criteria.matchVlanPcp(innerPbit)));
+        } else {
+            outer.withSelector(buildSelector(inPort, Criteria.matchVlanId(cVlanId)));
+        }
+
+        applyRules(fwd, inner, outer);
+    }
+
+    private void installUpstreamRulesForAnyVlan(ForwardingObjective fwd, Instruction output,
+                                                Pair<Instruction, Instruction> outerPair) {
+
+        log.debug("Installing upstream rules for any value vlan");
+
+        //match: in port and any-vlan (coming from OLT app.)
+        //action: write metadata, go to table 1 and meter
+        FlowRule.Builder inner = DefaultFlowRule.builder()
+                .fromApp(fwd.appId())
+                .forDevice(deviceId)
+                .makePermanent()
+                .withPriority(fwd.priority())
+                .withSelector(fwd.selector())
+                .withTreatment(buildTreatment(Instructions.transition(QQ_TABLE), fetchMeter(fwd),
+                                              fetchWriteMetadata(fwd)));
+
+        //match: in port and any-vlan (coming from OLT app.)
+        //action: immediate: push:QinQ, vlanId (s-tag), write metadata, meter and output
+        FlowRule.Builder outer = DefaultFlowRule.builder()
+                .fromApp(fwd.appId())
+                .forDevice(deviceId)
+                .forTable(QQ_TABLE)
+                .makePermanent()
+                .withPriority(fwd.priority())
+                .withSelector(fwd.selector())
+                .withTreatment(buildTreatment(outerPair.getLeft(), outerPair.getRight(),
+                                              fetchMeter(fwd), writeMetadataIncludingOnlyTp(fwd), output));
+
+        applyRules(fwd, inner, outer);
+    }
+
+    private boolean checkNoneVlanCriteria(ForwardingObjective fwd) {
+        // Add the VLAN_PUSH treatment if we're matching on VlanId.NONE
+        Criterion vlanMatchCriterion = filterForCriterion(fwd.selector().criteria(), Criterion.Type.VLAN_VID);
+        boolean noneValueVlanStatus = false;
+        if (vlanMatchCriterion != null) {
+            noneValueVlanStatus = ((VlanIdCriterion) vlanMatchCriterion).vlanId().equals(VlanId.NONE);
+        }
+        return noneValueVlanStatus;
+    }
+
+    private boolean checkAnyVlanMatchCriteria(ForwardingObjective fwd) {
+        Criterion anyValueVlanCriterion = fwd.selector().criteria().stream()
+                .filter(c -> c.type().equals(Criterion.Type.VLAN_VID))
+                .filter(vc -> ((VlanIdCriterion) vc).vlanId().toShort() == VlanId.ANY_VALUE)
+                .findAny().orElse(null);
+
+        if (anyValueVlanCriterion == null) {
+            log.debug("Any value vlan match criteria is not found, criteria {}",
+                      fwd.selector().criteria());
+            return false;
+        }
+
+        return true;
+    }
+
+    private Instruction fetchOutput(ForwardingObjective fwd, String direction) {
+        Instruction output = fwd.treatment().allInstructions().stream()
+                .filter(i -> i.type() == Instruction.Type.OUTPUT)
+                .findFirst().orElse(null);
+
+        if (output == null) {
+            log.error("OLT {} rule has no output", direction);
+            fail(fwd, ObjectiveError.BADPARAMS);
+            return null;
+        }
+        return output;
+    }
+
+    private Instruction fetchMeter(ForwardingObjective fwd) {
+        Instruction meter = fwd.treatment().metered();
+
+        if (meter == null) {
+            log.debug("Meter instruction is not found for the forwarding objective {}", fwd);
+            return null;
+        }
+
+        log.debug("Meter instruction is found.");
+        return meter;
+    }
+
+    private Instructions.MetadataInstruction fetchWriteMetadata(ForwardingObjective fwd) {
+        Instructions.MetadataInstruction writeMetadata = fwd.treatment().writeMetadata();
+
+        if (writeMetadata == null) {
+            log.warn("Write metadata is not found for the forwarding obj");
+            fail(fwd, ObjectiveError.BADPARAMS);
+            return null;
+        }
+
+        log.debug("Write metadata is found {}", writeMetadata);
+        return writeMetadata;
+    }
+
+    private List<Pair<Instruction, Instruction>> vlanOps(ForwardingObjective fwd,
+                                                         L2ModificationInstruction.L2SubType type) {
+
+        List<Pair<Instruction, Instruction>> vlanOps = findVlanOps(
+                fwd.treatment().allInstructions(), type);
+
+        if (vlanOps == null || vlanOps.isEmpty()) {
+            String direction = type == L2ModificationInstruction.L2SubType.VLAN_POP
+                    ? DOWNSTREAM : UPSTREAM;
+            log.error("Missing vlan operations in {} forwarding: {}", direction, fwd);
+            fail(fwd, ObjectiveError.BADPARAMS);
+            return ImmutableList.of();
+        }
+        return vlanOps;
+    }
+
+
+    private List<Pair<Instruction, Instruction>> findVlanOps(List<Instruction> instructions,
+                                                             L2ModificationInstruction.L2SubType type) {
+
+        List<Instruction> vlanOperations = findL2Instructions(
+                type,
+                instructions);
+        List<Instruction> vlanSets = findL2Instructions(
+                L2ModificationInstruction.L2SubType.VLAN_ID,
+                instructions);
+
+        if (vlanOperations.size() != vlanSets.size()) {
+            return ImmutableList.of();
+        }
+
+        List<Pair<Instruction, Instruction>> pairs = Lists.newArrayList();
+
+        for (int i = 0; i < vlanOperations.size(); i++) {
+            pairs.add(new ImmutablePair<>(vlanOperations.get(i), vlanSets.get(i)));
+        }
+        return pairs;
+    }
+
+    private List<Instruction> findL2Instructions(L2ModificationInstruction.L2SubType subType,
+                                                 List<Instruction> actions) {
+        return actions.stream()
+                .filter(i -> i.type() == Instruction.Type.L2MODIFICATION)
+                .filter(i -> ((L2ModificationInstruction) i).subtype() == subType)
+                .collect(Collectors.toList());
+    }
+
+    private void provisionEthTypeBasedFilter(FilteringObjective filter,
+                                             EthTypeCriterion ethType,
+                                             Instructions.OutputInstruction output,
+                                             L2ModificationInstruction vlanId,
+                                             L2ModificationInstruction vlanPush) {
+
+        Instruction meter = filter.meta().metered();
+        Instruction writeMetadata = filter.meta().writeMetadata();
+
+        TrafficSelector selector = buildSelector(filter.key(), ethType);
+        TrafficTreatment treatment;
+
+        if (vlanPush == null || vlanId == null) {
+            treatment = buildTreatment(output, meter, writeMetadata);
+        } else {
+            // we need to push the vlan because it came untagged (ATT)
+            treatment = buildTreatment(output, meter, vlanPush, vlanId, writeMetadata);
+        }
+
+        buildAndApplyRule(filter, selector, treatment);
+
+    }
+
+    private void provisionIgmp(FilteringObjective filter, EthTypeCriterion ethType,
+                               IPProtocolCriterion ipProto,
+                               Instructions.OutputInstruction output,
+                               Instruction vlan, Instruction pcp) {
+
+        Instruction meter = filter.meta().metered();
+        Instruction writeMetadata = filter.meta().writeMetadata();
+
+        // uniTagMatch
+        VlanIdCriterion vlanId = (VlanIdCriterion) filterForCriterion(filter.conditions(),
+                                                                      Criterion.Type.VLAN_VID);
+
+        TrafficSelector selector = buildSelector(filter.key(), ethType, ipProto, vlanId);
+        TrafficTreatment treatment = buildTreatment(output, vlan, pcp, meter, writeMetadata);
+        buildAndApplyRule(filter, selector, treatment);
+    }
+
+    private void provisionDhcp(FilteringObjective filter, EthTypeCriterion ethType,
+                               IPProtocolCriterion ipProto,
+                               UdpPortCriterion udpSrcPort,
+                               UdpPortCriterion udpDstPort,
+                               Instruction vlanIdInstruction,
+                               Instruction vlanPcpInstruction,
+                               Instructions.OutputInstruction output) {
+
+        Instruction meter = filter.meta().metered();
+        Instruction writeMetadata = filter.meta().writeMetadata();
+
+        VlanIdCriterion matchVlanId = (VlanIdCriterion)
+                filterForCriterion(filter.conditions(), Criterion.Type.VLAN_VID);
+
+        TrafficSelector selector;
+        TrafficTreatment treatment;
+
+        if (matchVlanId != null) {
+            log.debug("Building selector with match VLAN, {}", matchVlanId);
+            // in case of TT upstream the packet comes tagged and the vlan is swapped.
+            selector = buildSelector(filter.key(), ethType, ipProto, udpSrcPort,
+                                     udpDstPort, matchVlanId);
+            treatment = buildTreatment(output, meter, writeMetadata,
+                                       vlanIdInstruction, vlanPcpInstruction);
+        } else {
+            log.debug("Building selector with no VLAN");
+            // in case of ATT upstream the packet comes in untagged and we need to push the vlan
+            selector = buildSelector(filter.key(), ethType, ipProto, udpSrcPort, udpDstPort);
+            treatment = buildTreatment(output, meter, vlanIdInstruction, writeMetadata);
+        }
+        //In case of downstream there will be no match on the VLAN, which is null,
+        // so it will just be output, meter, writeMetadata
+
+        buildAndApplyRule(filter, selector, treatment);
+    }
+
+    private void provisionPPPoED(FilteringObjective filter, EthTypeCriterion ethType,
+                                 Instruction vlanIdInstruction,
+                                 Instruction vlanPcpInstruction,
+                                 Instructions.OutputInstruction output) {
+        Instruction meter = filter.meta().metered();
+        Instruction writeMetadata = filter.meta().writeMetadata();
+
+        VlanIdCriterion matchVlanId = (VlanIdCriterion)
+                filterForCriterion(filter.conditions(), Criterion.Type.VLAN_VID);
+
+        TrafficSelector selector;
+        TrafficTreatment treatment;
+
+        if (matchVlanId != null) {
+            log.debug("Building pppoed selector with match VLAN {}.", matchVlanId);
+        } else {
+            log.debug("Building pppoed selector without match VLAN.");
+        }
+
+        selector = buildSelector(filter.key(), ethType, matchVlanId);
+        treatment = buildTreatment(output, meter, writeMetadata, vlanIdInstruction, vlanPcpInstruction);
+        buildAndApplyRule(filter, selector, treatment);
+    }
+
+    private void buildAndApplyRule(FilteringObjective filter, TrafficSelector selector,
+                                   TrafficTreatment treatment) {
+        FlowRule rule = DefaultFlowRule.builder()
+                .fromApp(filter.appId())
+                .forDevice(deviceId)
+                .forTable(0)
+                .makePermanent()
+                .withSelector(selector)
+                .withTreatment(treatment)
+                .withPriority(filter.priority())
+                .build();
+
+        if (accumulator != null) {
+            if (log.isDebugEnabled()) {
+                log.debug("Adding pair to batch: {}", Pair.of(filter, rule));
+            }
+            accumulator.add(Pair.of(filter, rule));
+        } else {
+            FlowRuleOperations.Builder opsBuilder = FlowRuleOperations.builder();
+            switch (filter.type()) {
+                case PERMIT:
+                    opsBuilder.add(rule);
+                    break;
+                case DENY:
+                    opsBuilder.remove(rule);
+                    break;
+                default:
+                    log.warn("Unknown filter type : {}", filter.type());
+                    fail(filter, ObjectiveError.UNSUPPORTED);
+            }
+            applyFlowRules(ImmutableList.of(filter), opsBuilder);
+        }
+    }
+
+    private void applyRules(ForwardingObjective fwd, FlowRule.Builder... fwdBuilders) {
+        FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
+        switch (fwd.op()) {
+            case ADD:
+                for (FlowRule.Builder fwdBuilder : fwdBuilders) {
+                    builder.add(fwdBuilder.build());
+                }
+                break;
+            case REMOVE:
+                for (FlowRule.Builder fwdBuilder : fwdBuilders) {
+                    builder.remove(fwdBuilder.build());
+                }
+                break;
+            case ADD_TO_EXISTING:
+                break;
+            case REMOVE_FROM_EXISTING:
+                break;
+            default:
+                log.warn("Unknown forwarding operation: {}", fwd.op());
+        }
+
+        applyFlowRules(ImmutableList.of(fwd), builder);
+
+
+    }
+
+    private void applyFlowRules(List<Objective> objectives, FlowRuleOperations.Builder builder) {
+        flowRuleService.apply(builder.build(new FlowRuleOperationsContext() {
+            @Override
+            public void onSuccess(FlowRuleOperations ops) {
+                objectives.forEach(obj -> {
+                    pass(obj);
+                });
+            }
+
+            @Override
+            public void onError(FlowRuleOperations ops) {
+                objectives.forEach(obj -> {
+                    fail(obj, ObjectiveError.FLOWINSTALLATIONFAILED);
+                });
+
+            }
+        }));
+    }
+
+    // Builds the batch using the accumulated flow rules
+    private void sendFilters(List<Pair<FilteringObjective, FlowRule>> pairs) {
+        FlowRuleOperations.Builder flowOpsBuilder = FlowRuleOperations.builder();
+        log.debug("Sending batch of {} filter-objs", pairs.size());
+        List<Objective> filterObjs = Lists.newArrayList();
+        // Iterates over all accumulated flow rules and then build an unique batch
+        pairs.forEach(pair -> {
+            FilteringObjective filter = pair.getLeft();
+            FlowRule rule = pair.getRight();
+            switch (filter.type()) {
+                case PERMIT:
+                    flowOpsBuilder.add(rule);
+                    log.debug("Applying add filter-obj {} to device: {}", filter.id(), deviceId);
+                    filterObjs.add(filter);
+                    break;
+                case DENY:
+                    flowOpsBuilder.remove(rule);
+                    log.debug("Deleting flow rule {} to device: {}", rule, deviceId);
+                    filterObjs.add(filter);
+                    break;
+                default:
+                    fail(filter, ObjectiveError.UNKNOWN);
+                    log.warn("Unknown forwarding type {}", filter.type());
+            }
+        });
+        if (log.isDebugEnabled()) {
+            log.debug("Applying batch {}", flowOpsBuilder.build());
+        }
+        // Finally applies the operations
+        applyFlowRules(filterObjs, flowOpsBuilder);
+    }
+
+    private Criterion filterForCriterion(Collection<Criterion> criteria, Criterion.Type type) {
+        return criteria.stream()
+                .filter(c -> c.type().equals(type))
+                .limit(1)
+                .findFirst().orElse(null);
+    }
+
+    private TrafficSelector buildSelector(Criterion... criteria) {
+
+        TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
+
+        Arrays.stream(criteria).filter(Objects::nonNull).forEach(sBuilder::add);
+
+        return sBuilder.build();
+    }
+
+    private TrafficTreatment buildTreatment(Instruction... instructions) {
+
+        TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
+
+        Arrays.stream(instructions).filter(Objects::nonNull).forEach(tBuilder::add);
+
+        return tBuilder.build();
+    }
+
+    private Instruction writeMetadataIncludingOnlyTp(ForwardingObjective fwd) {
+
+        return Instructions.writeMetadata(
+                fetchWriteMetadata(fwd).metadata() & 0xFFFF00000000L, 0L);
+    }
+
+    private void fail(Objective obj, ObjectiveError error) {
+        obj.context().ifPresent(context -> context.onError(obj, error));
+    }
+
+    private void pass(Objective obj) {
+        obj.context().ifPresent(context -> context.onSuccess(obj));
+    }
+
+
+    private class InnerGroupListener implements GroupListener {
+        @Override
+        public void event(GroupEvent event) {
+            GroupKey key = event.subject().appCookie();
+            NextObjective obj = pendingGroups.getIfPresent(key);
+            if (obj == null) {
+                log.debug("No pending group for {}, moving on", key);
+                return;
+            }
+            log.trace("Event {} for group {}, handling pending" +
+                              "NextGroup {}", event.type(), key, obj.id());
+            if (event.type() == GroupEvent.Type.GROUP_ADDED ||
+                    event.type() == GroupEvent.Type.GROUP_UPDATED) {
+                flowObjectiveStore.putNextGroup(obj.id(), new OltPipelineGroup(key));
+                pass(obj);
+                pendingGroups.invalidate(key);
+            } else if (event.type() == GroupEvent.Type.GROUP_REMOVED) {
+                flowObjectiveStore.removeNextGroup(obj.id());
+                pass(obj);
+                pendingGroups.invalidate(key);
+            }
+        }
+    }
+
+    private static class OltPipelineGroup implements NextGroup {
+
+        private final GroupKey key;
+
+        public OltPipelineGroup(GroupKey key) {
+            this.key = key;
+        }
+
+        public GroupKey key() {
+            return key;
+        }
+
+        @Override
+        public byte[] data() {
+            return appKryo.serialize(key);
+        }
+
+    }
+
+    @Override
+    public List<String> getNextMappings(NextGroup nextGroup) {
+        // TODO Implementation deferred to vendor
+        return null;
+    }
+
+    // Flow rules accumulator for reducing the number of transactions required to the devices.
+    private final class ObjectiveAccumulator
+            extends AbstractAccumulator<Pair<FilteringObjective, FlowRule>> {
+
+        ObjectiveAccumulator(int maxFilter, int maxBatchMS, int maxIdleMS) {
+            super(TIMER, maxFilter, maxBatchMS, maxIdleMS);
+        }
+
+        @Override
+        public void processItems(List<Pair<FilteringObjective, FlowRule>> pairs) {
+            // Triggers creation of a batch using the list of flowrules generated from objs.
+            accumulatorExecutorService.execute(new FlowRulesBuilderTask(pairs));
+        }
+    }
+
+    // Task for building batch of flow rules in a separate thread.
+    private final class FlowRulesBuilderTask implements Runnable {
+        private final List<Pair<FilteringObjective, FlowRule>> pairs;
+
+        FlowRulesBuilderTask(List<Pair<FilteringObjective, FlowRule>> pairs) {
+            this.pairs = pairs;
+        }
+
+        @Override
+        public void run() {
+            try {
+                sendFilters(pairs);
+            } catch (Exception e) {
+                log.warn("Unable to send objectives", e);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/impl/src/main/java/org/opencord/olt/driver/package-info.java b/impl/src/main/java/org/opencord/olt/driver/package-info.java
new file mode 100644
index 0000000..dbf2a79
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/driver/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Drivers for the VOLTHA OLT application.
+ */
+package org.opencord.olt.driver;
diff --git a/app/src/main/java/org/opencord/olt/impl/ConsistentHasher.java b/impl/src/main/java/org/opencord/olt/impl/ConsistentHasher.java
similarity index 100%
rename from app/src/main/java/org/opencord/olt/impl/ConsistentHasher.java
rename to impl/src/main/java/org/opencord/olt/impl/ConsistentHasher.java
diff --git a/app/src/main/java/org/opencord/olt/impl/Olt.java b/impl/src/main/java/org/opencord/olt/impl/Olt.java
similarity index 100%
rename from app/src/main/java/org/opencord/olt/impl/Olt.java
rename to impl/src/main/java/org/opencord/olt/impl/Olt.java
diff --git a/app/src/main/java/org/opencord/olt/impl/OltFlowService.java b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
similarity index 100%
rename from app/src/main/java/org/opencord/olt/impl/OltFlowService.java
rename to impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
diff --git a/app/src/main/java/org/opencord/olt/impl/OltMeterService.java b/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java
similarity index 100%
rename from app/src/main/java/org/opencord/olt/impl/OltMeterService.java
rename to impl/src/main/java/org/opencord/olt/impl/OltMeterService.java
diff --git a/app/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java b/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java
similarity index 100%
rename from app/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java
rename to impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java
diff --git a/app/src/main/java/org/opencord/olt/impl/SubscriberFlowInfo.java b/impl/src/main/java/org/opencord/olt/impl/SubscriberFlowInfo.java
similarity index 100%
rename from app/src/main/java/org/opencord/olt/impl/SubscriberFlowInfo.java
rename to impl/src/main/java/org/opencord/olt/impl/SubscriberFlowInfo.java
diff --git a/app/src/main/java/org/opencord/olt/impl/package-info.java b/impl/src/main/java/org/opencord/olt/impl/package-info.java
similarity index 100%
rename from app/src/main/java/org/opencord/olt/impl/package-info.java
rename to impl/src/main/java/org/opencord/olt/impl/package-info.java
diff --git a/app/src/main/java/org/opencord/olt/internalapi/AccessDeviceFlowService.java b/impl/src/main/java/org/opencord/olt/internalapi/AccessDeviceFlowService.java
similarity index 100%
rename from app/src/main/java/org/opencord/olt/internalapi/AccessDeviceFlowService.java
rename to impl/src/main/java/org/opencord/olt/internalapi/AccessDeviceFlowService.java
diff --git a/app/src/main/java/org/opencord/olt/internalapi/AccessDeviceMeterService.java b/impl/src/main/java/org/opencord/olt/internalapi/AccessDeviceMeterService.java
similarity index 100%
rename from app/src/main/java/org/opencord/olt/internalapi/AccessDeviceMeterService.java
rename to impl/src/main/java/org/opencord/olt/internalapi/AccessDeviceMeterService.java
diff --git a/app/src/main/java/org/opencord/olt/internalapi/package-info.java b/impl/src/main/java/org/opencord/olt/internalapi/package-info.java
similarity index 100%
rename from app/src/main/java/org/opencord/olt/internalapi/package-info.java
rename to impl/src/main/java/org/opencord/olt/internalapi/package-info.java
diff --git a/app/src/main/resources/any_vlan_cfg.json b/impl/src/main/resources/any_vlan_cfg.json
similarity index 100%
rename from app/src/main/resources/any_vlan_cfg.json
rename to impl/src/main/resources/any_vlan_cfg.json
diff --git a/app/src/main/resources/cfg.json b/impl/src/main/resources/cfg.json
similarity index 100%
rename from app/src/main/resources/cfg.json
rename to impl/src/main/resources/cfg.json
diff --git a/app/src/main/resources/custom-topo.py b/impl/src/main/resources/custom-topo.py
similarity index 100%
rename from app/src/main/resources/custom-topo.py
rename to impl/src/main/resources/custom-topo.py
diff --git a/impl/src/main/resources/olt-drivers.xml b/impl/src/main/resources/olt-drivers.xml
new file mode 100644
index 0000000..fa7b585
--- /dev/null
+++ b/impl/src/main/resources/olt-drivers.xml
@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright 2016-present Open Networking Foundation
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<drivers>
+    <driver name="celestica" extends="default"
+            manufacturer="PMC GPON Networks" hwVersion="PAS5211 v2" swVersion="vOLT version 1.5.3.*">
+        <behaviour api="org.onosproject.net.behaviour.Pipeliner"
+                   impl="org.opencord.olt.driver.OltPipeline"/>
+        <behaviour api="org.onosproject.net.behaviour.MeterQuery"
+                   impl="org.onosproject.driver.query.FullMetersAvailable"/>
+    </driver>
+    <driver name="pmc-olt" extends="default"
+            manufacturer="PMC GPON Networks" hwVersion="PASffffffff v-1" swVersion="vOLT.*">
+        <behaviour api="org.onosproject.net.behaviour.Pipeliner"
+                   impl="org.opencord.olt.driver.OltPipeline"/>
+        <behaviour api="org.onosproject.net.behaviour.MeterQuery"
+                   impl="org.onosproject.driver.query.FullMetersAvailable"/>
+    </driver>
+    <driver name="voltha" extends="default"
+            manufacturer="VOLTHA Project" hwVersion=".*" swVersion=".*">
+        <behaviour api="org.onosproject.net.behaviour.Pipeliner"
+                   impl="org.opencord.olt.driver.OltPipeline"/>
+        <behaviour api="org.onosproject.net.behaviour.MeterQuery"
+                   impl="org.onosproject.driver.query.FullMetersAvailable"/>
+        <property name="accumulatorEnabled">true</property>
+    </driver>
+    <driver name="fj-olt" extends="default"
+            manufacturer="Fujitsu" hwVersion="svkOLT" swVersion="v1.0">
+        <behaviour api="org.onosproject.net.behaviour.Pipeliner"
+                   impl="org.opencord.olt.driver.OltPipeline"/>
+        <behaviour api="org.onosproject.net.behaviour.MeterQuery"
+                   impl="org.onosproject.driver.query.FullMetersAvailable"/>
+    </driver>
+    <driver name="nokia-olt" extends="default"
+            manufacturer="Nokia" hwVersion="SDOLT" swVersion="5.2.1">
+        <behaviour api="org.onosproject.net.behaviour.Pipeliner"
+                   impl="org.opencord.olt.driver.NokiaOltPipeline"/>
+        <behaviour api="org.onosproject.net.behaviour.MeterQuery"
+                   impl="org.onosproject.driver.query.FullMetersAvailable"/>
+    </driver>
+    <driver name="g.fast" extends="default"
+            manufacturer="TEST1" hwVersion="TEST2" swVersion="TEST3">
+        <behaviour api="org.onosproject.net.behaviour.Pipeliner"
+                   impl="org.opencord.olt.driver.OltPipeline"/>
+        <behaviour api="org.onosproject.net.behaviour.MeterQuery"
+                   impl="org.onosproject.driver.query.FullMetersAvailable"/>
+    </driver>
+</drivers>
+
diff --git a/app/src/main/resources/vlan_cfg.json b/impl/src/main/resources/vlan_cfg.json
similarity index 100%
rename from app/src/main/resources/vlan_cfg.json
rename to impl/src/main/resources/vlan_cfg.json
diff --git a/app/src/main/resources/vlan_cfg_with_default.json b/impl/src/main/resources/vlan_cfg_with_default.json
similarity index 100%
rename from app/src/main/resources/vlan_cfg_with_default.json
rename to impl/src/main/resources/vlan_cfg_with_default.json
diff --git a/app/src/test/java/org/opencord/olt/impl/ConsistentHasherTest.java b/impl/src/test/java/org/opencord/olt/impl/ConsistentHasherTest.java
similarity index 100%
rename from app/src/test/java/org/opencord/olt/impl/ConsistentHasherTest.java
rename to impl/src/test/java/org/opencord/olt/impl/ConsistentHasherTest.java
diff --git a/app/src/test/java/org/opencord/olt/impl/OltFlowTest.java b/impl/src/test/java/org/opencord/olt/impl/OltFlowTest.java
similarity index 100%
rename from app/src/test/java/org/opencord/olt/impl/OltFlowTest.java
rename to impl/src/test/java/org/opencord/olt/impl/OltFlowTest.java
diff --git a/app/src/test/java/org/opencord/olt/impl/OltMeterTest.java b/impl/src/test/java/org/opencord/olt/impl/OltMeterTest.java
similarity index 100%
rename from app/src/test/java/org/opencord/olt/impl/OltMeterTest.java
rename to impl/src/test/java/org/opencord/olt/impl/OltMeterTest.java
diff --git a/app/src/test/java/org/opencord/olt/impl/OltTest.java b/impl/src/test/java/org/opencord/olt/impl/OltTest.java
similarity index 100%
rename from app/src/test/java/org/opencord/olt/impl/OltTest.java
rename to impl/src/test/java/org/opencord/olt/impl/OltTest.java
diff --git a/app/src/test/java/org/opencord/olt/impl/TestBase.java b/impl/src/test/java/org/opencord/olt/impl/TestBase.java
similarity index 100%
rename from app/src/test/java/org/opencord/olt/impl/TestBase.java
rename to impl/src/test/java/org/opencord/olt/impl/TestBase.java
diff --git a/pom.xml b/pom.xml
index 5dddfc1..e7e689c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -36,6 +36,8 @@
 
     <modules>
         <module>api</module>
+        <module>web</module>
+        <module>impl</module>
         <module>app</module>
     </modules>
 
diff --git a/web/pom.xml b/web/pom.xml
new file mode 100644
index 0000000..fdbb820
--- /dev/null
+++ b/web/pom.xml
@@ -0,0 +1,134 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright 2016-present Open Networking Foundation
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.opencord</groupId>
+        <artifactId>olt</artifactId>
+        <version>4.4.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>olt-web</artifactId>
+    <packaging>bundle</packaging>
+    <description>REST API for OLT application for CORD</description>
+
+    <properties>
+        <web.context>/onos/olt</web.context>
+        <api.version>1.0.0</api.version>
+        <api.title>ONOS OLT REST API</api.title>
+        <api.description>
+            APIs for interacting with the CORD OLT application.
+        </api.description>
+        <api.package>org.opencord.olt.rest</api.package>
+        <olt.api.version>${project.version}</olt.api.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.opencord</groupId>
+            <artifactId>olt-api</artifactId>
+            <version>${olt.api.version}</version>
+            <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.opencord</groupId>
+            <artifactId>olt-impl</artifactId>
+            <version>${olt.api.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.opencord</groupId>
+            <artifactId>sadis-api</artifactId>
+            <version>${sadis.api.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-cli</artifactId>
+            <version>${onos.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-api</artifactId>
+            <version>${onos.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-core-serializers</artifactId>
+            <version>${onos.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.karaf.shell</groupId>
+            <artifactId>org.apache.karaf.shell.console</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.karaf.shell</groupId>
+            <artifactId>org.apache.karaf.shell.core</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>javax.ws.rs</groupId>
+            <artifactId>javax.ws.rs-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.onosproject</groupId>
+                <artifactId>onos-maven-plugin</artifactId>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <configuration>
+                    <instructions>
+                        <_wab>src/main/webapp/</_wab>
+                        <Include-Resource>
+                            WEB-INF/classes/apidoc/swagger.json=target/swagger.json,
+                            {maven-resources}
+                        </Include-Resource>
+                        <Import-Package>
+                            *,org.glassfish.jersey.servlet
+                        </Import-Package>
+                        <Web-ContextPath>${web.context}</Web-ContextPath>
+                        <Karaf-Commands>org.opencord.olt.cli</Karaf-Commands>
+                    </instructions>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/app/src/main/java/org/opencord/olt/rest/OltWebResource.java b/web/src/main/java/org/opencord/olt/rest/OltWebResource.java
similarity index 100%
rename from app/src/main/java/org/opencord/olt/rest/OltWebResource.java
rename to web/src/main/java/org/opencord/olt/rest/OltWebResource.java
diff --git a/app/src/main/java/org/opencord/olt/rest/package-info.java b/web/src/main/java/org/opencord/olt/rest/package-info.java
similarity index 100%
rename from app/src/main/java/org/opencord/olt/rest/package-info.java
rename to web/src/main/java/org/opencord/olt/rest/package-info.java
diff --git a/app/src/main/webapp/WEB-INF/web.xml b/web/src/main/webapp/WEB-INF/web.xml
similarity index 100%
rename from app/src/main/webapp/WEB-INF/web.xml
rename to web/src/main/webapp/WEB-INF/web.xml