[VOL-3661] PPPoE IA application - initial commit

Change-Id: Idaf23f8736cba955fe8a3049b8fc9c85b3cd3ab9
Signed-off-by: Gustavo Silva <gsilva@furukawalatam.com>
diff --git a/app/app.xml b/app/app.xml
new file mode 100644
index 0000000..d441b88
--- /dev/null
+++ b/app/app.xml
@@ -0,0 +1,24 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright 2021-present Open Networking Foundation
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<app name="org.opencord.pppoeagent" origin="Open Networking Foundation" version="${project.version}"
+     category="vOlt" url="http://onosproject.org" title="PPPoE Agent"
+     featuresRepo="mvn:${project.groupId}/${project.artifactId}/${project.version}/xml/features"
+     features="${project.artifactId}" apps="org.opencord.sadis">
+    <description>${project.description}</description>
+    <artifact>mvn:${project.groupId}/${project.artifactId}/${project.version}</artifact>
+    <artifact>mvn:${project.groupId}/pppoeagent-api/${project.version}</artifact>
+</app>
diff --git a/app/features.xml b/app/features.xml
new file mode 100644
index 0000000..7b8b9a3
--- /dev/null
+++ b/app/features.xml
@@ -0,0 +1,24 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+  ~ Copyright 2021-present Open Networking Foundation
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<features xmlns="http://karaf.apache.org/xmlns/features/v1.2.0" name="${project.artifactId}-${project.version}">
+    <feature name="${project.artifactId}" version="${project.version}"
+             description="${project.description}">
+        <feature>onos-api</feature>
+        <bundle>mvn:${project.groupId}/pppoeagent-api/${project.version}</bundle>
+        <bundle>mvn:${project.groupId}/${project.artifactId}/${project.version}</bundle>
+    </feature>
+</features>
diff --git a/app/pom.xml b/app/pom.xml
new file mode 100644
index 0000000..187a9b7
--- /dev/null
+++ b/app/pom.xml
@@ -0,0 +1,190 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright 2021-present Open Networking Foundation
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <parent>
+        <groupId>org.opencord</groupId>
+        <artifactId>pppoeagent</artifactId>
+        <version>1.0.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>pppoeagent-app</artifactId>
+
+    <packaging>bundle</packaging>
+    <description>PPPoE Intermediate Agent application</description>
+
+    <properties>
+        <web.context>/onos/pppoeagent</web.context>
+        <api.version>1.0.0-SNAPSHOT</api.version>
+        <api.title>PPPoE Agent REST API</api.title>
+        <api.description>REST API to query PPPoE agent sessions information.</api.description>
+        <api.package>org.opencord.pppoeagent.rest</api.package>
+        <onos.app.requires>
+            org.opencord.sadis
+        </onos.app.requires>
+        <onos.app.name>org.opencord.pppoeagent</onos.app.name>
+        <onos.app.title>PPPoE Intermediate Agent App</onos.app.title>
+        <onos.app.category>vOLT</onos.app.category>
+        <onos.app.url>http://opencord.org</onos.app.url>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <onos.app.readme>PPPoE Intermediate Agent</onos.app.readme>
+        <onos.app.origin>Furukawa Electric LatAm</onos.app.origin>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-api</artifactId>
+            <version>${onos.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-core-serializers</artifactId>
+            <version>${onos.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onlab-rest</artifactId>
+            <version>${onos.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.opencord</groupId>
+            <artifactId>pppoeagent-api</artifactId>
+            <version>1.0.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onlab-junit</artifactId>
+            <version>${onos.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-cli</artifactId>
+            <version>${onos.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-api</artifactId>
+            <version>${onos.version}</version>
+            <scope>test</scope>
+            <classifier>tests</classifier>
+        </dependency>
+
+        <dependency>
+            <groupId>org.opencord</groupId>
+            <artifactId>sadis-api</artifactId>
+            <version>${sadis.api.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-core-common</artifactId>
+            <version>${onos.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onlab-misc</artifactId>
+            <version>${onos.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-api</artifactId>
+            <version>${onos.version}</version>
+            <scope>test</scope>
+            <classifier>tests</classifier>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onlab-osgi</artifactId>
+            <version>${onos.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.karaf.shell</groupId>
+            <artifactId>org.apache.karaf.shell.core</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.karaf.shell</groupId>
+            <artifactId>org.apache.karaf.shell.console</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>javax.ws.rs</groupId>
+            <artifactId>javax.ws.rs-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.onosproject</groupId>
+                <artifactId>onos-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <configuration>
+                    <instructions>
+                        <_wab>src/main/webapp/</_wab>
+                        <Include-Resource>
+                            WEB-INF/classes/apidoc/swagger.json=target/swagger.json,
+                            {maven-resources}
+                        </Include-Resource>
+                        <Import-Package>
+                            *,org.glassfish.jersey.servlet
+                        </Import-Package>
+                        <Web-ContextPath>${web.context}</Web-ContextPath>
+                        <Karaf-Commands>org.opencord.pppoeagent.cli</Karaf-Commands>
+                    </instructions>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/app/src/main/java/org/opencord/pppoeagent/cli/PppoeAgentShowUsersCommand.java b/app/src/main/java/org/opencord/pppoeagent/cli/PppoeAgentShowUsersCommand.java
new file mode 100644
index 0000000..a67ffca
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/cli/PppoeAgentShowUsersCommand.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.pppoeagent.cli;
+
+import org.opencord.pppoeagent.PppoeAgentService;
+import org.opencord.pppoeagent.PppoeSessionInfo;
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.PPPoED;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.Port;
+import org.onosproject.net.device.DeviceService;
+
+/**
+ *  Shows the PPPoE sessions/users learned by the agent.
+ */
+@Service
+@Command(scope = "pppoe", name = "pppoe-users",
+        description = "Shows the PPPoE users")
+public class PppoeAgentShowUsersCommand extends AbstractShellCommand {
+
+    @Argument(index = 0, name = "mac", description = "MAC address related to PPPoE session.",
+            required = false, multiValued = false)
+    private String macStr = null;
+
+    @Override
+    protected void doExecute() {
+        MacAddress macAddress = null;
+        if (macStr != null && !macStr.isEmpty()) {
+            try {
+                macAddress = MacAddress.valueOf(macStr);
+            } catch (IllegalArgumentException e) {
+                log.error(e.getMessage());
+                return;
+            }
+        }
+
+        DeviceService deviceService = AbstractShellCommand.get(DeviceService.class);
+        PppoeAgentService pppoeAgentService = AbstractShellCommand.get(PppoeAgentService.class);
+
+        if (macAddress != null) {
+            PppoeSessionInfo singleInfo = pppoeAgentService.getSessionsMap().get(macAddress);
+            if (singleInfo != null) {
+                Port devicePort = deviceService.getPort(singleInfo.getClientCp());
+                printPppoeInfo(macAddress, singleInfo, devicePort);
+            } else {
+                print("No session information found for provided MAC address %s", macAddress.toString());
+            }
+        } else {
+            pppoeAgentService.getSessionsMap().forEach((mac, sessionInfo) -> {
+                final Port devicePortFinal = deviceService.getPort(sessionInfo.getClientCp());
+                printPppoeInfo(mac, sessionInfo, devicePortFinal);
+            });
+        }
+    }
+
+    private void printPppoeInfo(MacAddress macAddr, PppoeSessionInfo sessionInfo, Port devicePort) {
+        PPPoED.Type lastReceivedPkt = PPPoED.Type.getTypeByValue(sessionInfo.getPacketCode());
+        ConnectPoint cp = sessionInfo.getClientCp();
+        String subscriberId = devicePort != null ? devicePort.annotations().value(AnnotationKeys.PORT_NAME) :
+                "UNKNOWN";
+
+        print("MacAddress=%s,SessionId=%s,CurrentState=%s,LastReceivedPacket=%s,DeviceId=%s,PortNumber=%s," +
+                        "SubscriberId=%s",
+                macAddr.toString(), String.valueOf(sessionInfo.getSessionId()),
+                sessionInfo.getCurrentState(), lastReceivedPkt.name(),
+                cp.deviceId().toString(), cp.port().toString(), subscriberId);
+    }
+}
diff --git a/app/src/main/java/org/opencord/pppoeagent/cli/PppoeAgentStatsCommand.java b/app/src/main/java/org/opencord/pppoeagent/cli/PppoeAgentStatsCommand.java
new file mode 100644
index 0000000..885bf36
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/cli/PppoeAgentStatsCommand.java
@@ -0,0 +1,137 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.pppoeagent.cli;
+
+import org.opencord.pppoeagent.PppoeAgentEvent;
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.Option;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import java.util.Collections;
+import java.util.Map;
+import org.opencord.pppoeagent.impl.PppoeAgentCountersIdentifier;
+import org.opencord.pppoeagent.impl.PppoeAgentCountersStore;
+import org.opencord.pppoeagent.impl.PppoeAgentCounterNames;
+
+/**
+ * Display/Reset the PPPoE Agent application statistics.
+ */
+@Service
+@Command(scope = "pppoe", name = "pppoeagent-stats",
+        description = "Display or Reset the PPPoE Agent application statistics.")
+public class PppoeAgentStatsCommand extends AbstractShellCommand {
+    private static final String CONFIRM_PHRASE = "please";
+
+    @Option(name = "-r", aliases = "--reset", description = "Resets a specific counter.\n",
+            required = false, multiValued = false)
+    PppoeAgentCounterNames reset = null;
+
+    @Option(name = "-R", aliases = "--reset-all", description = "Resets all counters.\n",
+            required = false, multiValued = false)
+    boolean resetAll = false;
+
+    @Option(name = "-s", aliases = "--subscriberId", description = "Subscriber Id.\n",
+            required = false, multiValued = false)
+    String subscriberId = null;
+
+    @Option(name = "-p", aliases = "--please", description = "Confirmation phrase.",
+            required = false, multiValued = false)
+    String please = null;
+
+    @Argument(index = 0, name = "counter",
+            description = "The counter to display. In case not specified, all counters will be displayed.",
+            required = false, multiValued = false)
+    PppoeAgentCounterNames counter = null;
+
+    @Override
+    protected void doExecute() {
+        PppoeAgentCountersStore pppoeCounters = AbstractShellCommand.get(PppoeAgentCountersStore.class);
+
+        if ((subscriberId == null) || (subscriberId.equals("global"))) {
+            // All subscriber Ids
+            subscriberId = PppoeAgentEvent.GLOBAL_COUNTER;
+        }
+
+        if (resetAll || reset != null) {
+            if (please == null || !please.equals(CONFIRM_PHRASE)) {
+                print("WARNING: Be aware that you are going to reset the counters. " +
+                        "Enter confirmation phrase to continue.");
+                return;
+            }
+            if (resetAll) {
+                // Reset all counters.
+                pppoeCounters.resetCounters(subscriberId);
+            } else {
+                // Reset the specified counter.
+                pppoeCounters.setCounter(subscriberId, reset, (long) 0);
+            }
+        } else {
+            Map<PppoeAgentCountersIdentifier, Long> countersMap = pppoeCounters.getCounters().counters();
+            if (countersMap.size() > 0) {
+                if (counter == null) {
+                    String jsonString = "";
+                    if (outputJson()) {
+                        jsonString = String.format("{\"%s\":{", pppoeCounters.NAME);
+                    } else {
+                        print("%s [%s] :", pppoeCounters.NAME, subscriberId);
+                    }
+                    PppoeAgentCounterNames[] counters = PppoeAgentCounterNames.values();
+                    for (int i = 0; i < counters.length; i++) {
+                        PppoeAgentCounterNames counterType = counters[i];
+                        Long value = countersMap.get(new PppoeAgentCountersIdentifier(subscriberId, counterType));
+                        if (value == null) {
+                            value = 0L;
+                        }
+                        if (outputJson()) {
+                            jsonString += String.format("\"%s\":%d", counterType, value);
+                            if (i < counters.length - 1) {
+                                jsonString += ",";
+                            }
+                        } else {
+                            printCounter(counterType, value);
+                        }
+                    }
+                    if (outputJson()) {
+                        jsonString += "}}";
+                        print("%s", jsonString);
+                    }
+                } else {
+                    // Show only the specified counter
+                    Long value = countersMap.get(new PppoeAgentCountersIdentifier(subscriberId, counter));
+                    if (value == null) {
+                        value = 0L;
+                    }
+                    if (outputJson()) {
+                        print("{\"%s\":%d}", counter, value);
+                    } else {
+                        printCounter(counter, value);
+                    }
+                }
+            } else {
+                print("No PPPoE Agent Counters were created yet for counter class [%s]",
+                        PppoeAgentEvent.GLOBAL_COUNTER);
+            }
+        }
+    }
+
+    void printCounter(PppoeAgentCounterNames c, long value) {
+        // print in non-JSON format
+        print("  %s %s %-4d", c,
+                String.join("", Collections.nCopies(50 - c.toString().length(), ".")), value);
+    }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/cli/package-info.java b/app/src/main/java/org/opencord/pppoeagent/cli/package-info.java
new file mode 100644
index 0000000..515908f
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/cli/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * CLI commands for the PPPoE agent.
+ */
+package org.opencord.pppoeagent.cli;
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/OsgiPropertyConstants.java b/app/src/main/java/org/opencord/pppoeagent/impl/OsgiPropertyConstants.java
new file mode 100644
index 0000000..65c8310
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/OsgiPropertyConstants.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.pppoeagent.impl;
+
+/**
+ * Constants for default values of configurable properties.
+ */
+public final class OsgiPropertyConstants {
+
+    private OsgiPropertyConstants() {
+    }
+
+    public static final String ENABLE_CIRCUIT_ID_VALIDATION = "enableCircuitIdValidation";
+    public static final boolean ENABLE_CIRCUIT_ID_VALIDATION_DEFAULT = true;
+
+    public static final String PPPOE_COUNTERS_TOPIC = "pppoeCountersTopic";
+    public static final String PPPOE_COUNTERS_TOPIC_DEFAULT = "onos.pppoe.stats.kpis";
+
+    public static final String PUBLISH_COUNTERS_RATE = "publishCountersRate";
+    public static final int PUBLISH_COUNTERS_RATE_DEFAULT = 10;
+
+    public static final String PPPOE_MAX_MTU = "pppoeMaxMtu";
+    public static final int PPPOE_MAX_MTU_DEFAULT = 1500;
+
+    public static final String PACKET_PROCESSOR_THREADS = "packetProcessorThreads";
+    public static final int PACKET_PROCESSOR_THREADS_DEFAULT = 10;
+
+    public static final String SYNC_COUNTERS_RATE = "syncCountersRate";
+    public static final int SYNC_COUNTERS_RATE_DEFAULT = 5;
+}
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgent.java b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgent.java
new file mode 100644
index 0000000..67c71da
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgent.java
@@ -0,0 +1,929 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.impl;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableSet;
+
+import com.google.common.collect.Sets;
+import org.opencord.pppoeagent.PppoeAgentEvent;
+import org.opencord.pppoeagent.PppoeAgentListener;
+import org.opencord.pppoeagent.PppoeAgentService;
+import org.opencord.pppoeagent.PppoeAgentStoreDelegate;
+import org.opencord.pppoeagent.PPPoEDVendorSpecificTag;
+import org.opencord.pppoeagent.PppoeSessionInfo;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Serializer;
+import com.google.common.collect.ImmutableMap;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+
+import org.onlab.util.KryoNamespace;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.PPPoED;
+import org.onlab.packet.PPPoEDTag;
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.event.AbstractListenerManager;
+import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipListener;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.config.ConfigFactory;
+import org.onosproject.net.config.NetworkConfigEvent;
+import org.onosproject.net.config.NetworkConfigListener;
+import org.onosproject.net.config.NetworkConfigRegistry;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.packet.DefaultOutboundPacket;
+import org.onosproject.net.packet.OutboundPacket;
+import org.onosproject.net.packet.PacketContext;
+import org.onosproject.net.packet.PacketProcessor;
+import org.onosproject.net.packet.PacketService;
+import org.opencord.sadis.BaseInformationService;
+import org.opencord.sadis.SadisService;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
+import org.opencord.sadis.UniTagInformation;
+import org.opencord.pppoeagent.util.CircuitIdBuilder;
+import org.opencord.pppoeagent.util.CircuitIdFieldName;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import org.onosproject.store.service.Versioned;
+
+import static java.util.concurrent.Executors.newFixedThreadPool;
+
+import static org.onlab.packet.PPPoED.PPPOED_CODE_PADI;
+import static org.onlab.packet.PPPoED.PPPOED_CODE_PADO;
+import static org.onlab.packet.PPPoED.PPPOED_CODE_PADR;
+import static org.onlab.packet.PPPoED.PPPOED_CODE_PADS;
+import static org.onlab.packet.PPPoED.PPPOED_CODE_PADT;
+import static org.onlab.packet.PPPoEDTag.PPPOED_TAG_AC_SYSTEM_ERROR;
+import static org.onlab.packet.PPPoEDTag.PPPOED_TAG_GENERIC_ERROR;
+import static org.onlab.packet.PPPoEDTag.PPPOED_TAG_SERVICE_NAME_ERROR;
+import static org.onlab.packet.PPPoEDTag.PPPOED_TAG_VENDOR_SPECIFIC;
+
+import static org.onlab.util.Tools.getIntegerProperty;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
+
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.PPPOE_MAX_MTU;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.PPPOE_MAX_MTU_DEFAULT;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.ENABLE_CIRCUIT_ID_VALIDATION;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.ENABLE_CIRCUIT_ID_VALIDATION_DEFAULT;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.PACKET_PROCESSOR_THREADS;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.PACKET_PROCESSOR_THREADS_DEFAULT;
+
+/**
+ * PPPoE Intermediate Agent application.
+ */
+@Component(immediate = true,
+property = {
+        PPPOE_MAX_MTU + ":Integer=" + PPPOE_MAX_MTU_DEFAULT,
+        ENABLE_CIRCUIT_ID_VALIDATION + ":Boolean=" + ENABLE_CIRCUIT_ID_VALIDATION_DEFAULT,
+        PACKET_PROCESSOR_THREADS + ":Integer=" + PACKET_PROCESSOR_THREADS_DEFAULT,
+})
+public class PppoeAgent
+        extends AbstractListenerManager<PppoeAgentEvent, PppoeAgentListener>
+        implements PppoeAgentService {
+    private static final String APP_NAME = "org.opencord.pppoeagent";
+    private static final short QINQ_VID_NONE = (short) -1;
+
+    private final InternalConfigListener cfgListener = new InternalConfigListener();
+    private final Set<ConfigFactory> factories = ImmutableSet.of(
+            new ConfigFactory<>(APP_SUBJECT_FACTORY, PppoeAgentConfig.class, "pppoeagent") {
+                @Override
+                public PppoeAgentConfig createConfig() {
+                    return new PppoeAgentConfig();
+                }
+            }
+    );
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected CoreService coreService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected NetworkConfigRegistry cfgService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected PacketService packetService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ComponentConfigService componentConfigService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected SadisService sadisService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected DeviceService deviceService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected MastershipService mastershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected PppoeAgentCountersStore pppoeAgentCounters;
+
+    // OSGi Properties
+    protected int pppoeMaxMtu = PPPOE_MAX_MTU_DEFAULT;
+    protected boolean enableCircuitIdValidation = ENABLE_CIRCUIT_ID_VALIDATION_DEFAULT;
+    protected int packetProcessorThreads = PACKET_PROCESSOR_THREADS_DEFAULT;
+
+    private ApplicationId appId;
+    private InnerDeviceListener deviceListener = new InnerDeviceListener();
+    private InnerMastershipListener changeListener = new InnerMastershipListener();
+    private PppoeAgentPacketProcessor pppoeAgentPacketProcessor = new PppoeAgentPacketProcessor();
+    protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
+    private PppoeAgentStoreDelegate delegate = new InnerPppoeAgentStoreDelegate();
+
+    Set<ConnectPoint> pppoeConnectPoints;
+    protected AtomicReference<ConnectPoint> pppoeServerConnectPoint = new AtomicReference<>();
+    protected boolean useOltUplink = false;
+
+    static ConsistentMap<MacAddress, PppoeSessionInfo> sessionsMap;
+
+    @Override
+    public Map<MacAddress, PppoeSessionInfo> getSessionsMap() {
+        return ImmutableMap.copyOf(sessionsMap.asJavaMap());
+    }
+
+    @Override
+    public void clearSessionsMap() {
+        sessionsMap.clear();
+    }
+
+    private final ArrayList<CircuitIdFieldName> circuitIdfields = new ArrayList<>(Arrays.asList(
+            CircuitIdFieldName.AcessNodeIdentifier,
+            CircuitIdFieldName.Slot,
+            CircuitIdFieldName.Port,
+            CircuitIdFieldName.OnuSerialNumber));
+
+    protected ExecutorService packetProcessorExecutor;
+
+    @Activate
+    protected void activate(ComponentContext context) {
+        eventDispatcher.addSink(PppoeAgentEvent.class, listenerRegistry);
+
+        appId = coreService.registerApplication(APP_NAME);
+        cfgService.addListener(cfgListener);
+        componentConfigService.registerProperties(getClass());
+
+        KryoNamespace serializer = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .register(PppoeSessionInfo.class)
+                .register(MacAddress.class)
+                .register(SubscriberAndDeviceInformation.class)
+                .register(UniTagInformation.class)
+                .register(ConnectPoint.class)
+                .build();
+
+        sessionsMap = storageService.<MacAddress, PppoeSessionInfo>consistentMapBuilder()
+                .withName("pppoeagent-sessions")
+                .withSerializer(Serializer.using(serializer))
+                .withApplicationId(appId)
+                .build();
+
+        factories.forEach(cfgService::registerConfigFactory);
+        deviceService.addListener(deviceListener);
+        subsService = sadisService.getSubscriberInfoService();
+        mastershipService.addListener(changeListener);
+        pppoeAgentCounters.setDelegate(delegate);
+        updateConfig();
+        packetService.addProcessor(pppoeAgentPacketProcessor, PacketProcessor.director(0));
+        if (context != null) {
+            modified(context);
+        }
+        log.info("PPPoE Intermediate Agent Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        cfgService.removeListener(cfgListener);
+        factories.forEach(cfgService::unregisterConfigFactory);
+        packetService.removeProcessor(pppoeAgentPacketProcessor);
+        packetProcessorExecutor.shutdown();
+        componentConfigService.unregisterProperties(getClass(), false);
+        deviceService.removeListener(deviceListener);
+        eventDispatcher.removeSink(PppoeAgentEvent.class);
+        pppoeAgentCounters.unsetDelegate(delegate);
+        log.info("PPPoE Intermediate Agent Stopped");
+    }
+
+    private void updateConfig() {
+        PppoeAgentConfig cfg = cfgService.getConfig(appId, PppoeAgentConfig.class);
+        if (cfg == null) {
+            log.warn("PPPoE server info not available");
+            return;
+        }
+
+        synchronized (this) {
+            pppoeConnectPoints = Sets.newConcurrentHashSet(cfg.getPppoeServerConnectPoint());
+        }
+        useOltUplink = cfg.getUseOltUplinkForServerPktInOut();
+    }
+
+    /**
+     * Returns whether the passed port is the uplink port of the olt device.
+     */
+    private boolean isUplinkPortOfOlt(DeviceId dId, Port p) {
+        log.debug("isUplinkPortOfOlt: DeviceId: {} Port: {}", dId, p);
+        if (!mastershipService.isLocalMaster(dId)) {
+            return false;
+        }
+
+        Device d = deviceService.getDevice(dId);
+        SubscriberAndDeviceInformation deviceInfo = subsService.get(d.serialNumber());
+        if (deviceInfo != null) {
+            return (deviceInfo.uplinkPort() == p.number().toLong());
+        }
+
+        return false;
+    }
+
+    /**
+     * Returns the connectPoint which is the uplink port of the OLT.
+     */
+    private ConnectPoint getUplinkConnectPointOfOlt(DeviceId dId) {
+        Device d = deviceService.getDevice(dId);
+        SubscriberAndDeviceInformation deviceInfo = subsService.get(d.serialNumber());
+        log.debug("getUplinkConnectPointOfOlt DeviceId: {} devInfo: {}", dId, deviceInfo);
+        if (deviceInfo != null) {
+            PortNumber pNum = PortNumber.portNumber(deviceInfo.uplinkPort());
+            Port port = deviceService.getPort(d.id(), pNum);
+            if (port != null) {
+                return new ConnectPoint(d.id(), pNum);
+            }
+        }
+        return null;
+    }
+
+    @Modified
+    protected void modified(ComponentContext context) {
+        Dictionary<?, ?> properties = context.getProperties();
+
+        Integer newPppoeMaxMtu = getIntegerProperty(properties, PPPOE_MAX_MTU);
+        if (newPppoeMaxMtu != null) {
+            if (newPppoeMaxMtu != pppoeMaxMtu && newPppoeMaxMtu >= 0) {
+                log.info("PPPPOE MTU modified from {} to {}", pppoeMaxMtu, newPppoeMaxMtu);
+                pppoeMaxMtu = newPppoeMaxMtu;
+            } else if (newPppoeMaxMtu < 0) {
+                log.error("Invalid newPppoeMaxMtu : {}, defaulting to 1492", newPppoeMaxMtu);
+                pppoeMaxMtu = PPPOE_MAX_MTU_DEFAULT;
+            }
+        }
+
+        Boolean newEnableCircuitIdValidation = Tools.isPropertyEnabled(properties, ENABLE_CIRCUIT_ID_VALIDATION);
+        if (newEnableCircuitIdValidation != null) {
+            if (enableCircuitIdValidation != newEnableCircuitIdValidation) {
+                log.info("Property enableCircuitIdValidation modified from {} to {}",
+                        enableCircuitIdValidation, newEnableCircuitIdValidation);
+                enableCircuitIdValidation = newEnableCircuitIdValidation;
+            }
+        }
+
+        String s = Tools.get(properties, PACKET_PROCESSOR_THREADS);
+
+        int oldpacketProcessorThreads = packetProcessorThreads;
+        packetProcessorThreads = Strings.isNullOrEmpty(s) ? oldpacketProcessorThreads
+                : Integer.parseInt(s.trim());
+        if (packetProcessorExecutor == null || oldpacketProcessorThreads != packetProcessorThreads) {
+            if (packetProcessorExecutor != null) {
+                packetProcessorExecutor.shutdown();
+            }
+            packetProcessorExecutor = newFixedThreadPool(packetProcessorThreads,
+                    groupedThreads("onos/pppoe",
+                            "pppoe-packet-%d", log));
+        }
+    }
+
+    /**
+     * Selects a connect point through an available device for which it is the master.
+     */
+    private void selectServerConnectPoint() {
+        synchronized (this) {
+            pppoeServerConnectPoint.set(null);
+            if (pppoeConnectPoints != null) {
+                // find a connect point through a device for which we are master
+                for (ConnectPoint cp: pppoeConnectPoints) {
+                    if (mastershipService.isLocalMaster(cp.deviceId())) {
+                        if (deviceService.isAvailable(cp.deviceId())) {
+                            pppoeServerConnectPoint.set(cp);
+                        }
+                        log.info("PPPOE connectPoint selected is {}", cp);
+                        break;
+                    }
+                }
+            }
+            log.info("PPPOE Server connectPoint is {}", pppoeServerConnectPoint.get());
+            if (pppoeServerConnectPoint.get() == null) {
+                log.error("Master of none, can't relay PPPOE messages to server");
+            }
+        }
+    }
+
+    private SubscriberAndDeviceInformation getSubscriber(ConnectPoint cp) {
+        Port p = deviceService.getPort(cp);
+        String subscriberId = p.annotations().value(AnnotationKeys.PORT_NAME);
+        return subsService.get(subscriberId);
+    }
+
+    private SubscriberAndDeviceInformation getDevice(PacketContext context) {
+        String serialNo = deviceService.getDevice(context.inPacket().
+                receivedFrom().deviceId()).serialNumber();
+
+        return subsService.get(serialNo);
+    }
+
+    private UniTagInformation getUnitagInformationFromPacketContext(PacketContext context,
+                                                                    SubscriberAndDeviceInformation sub) {
+        return sub.uniTagList()
+                .stream()
+                .filter(u -> u.getPonCTag().toShort() == context.inPacket().parsed().getVlanID())
+                .findFirst()
+                .orElse(null);
+    }
+
+    private boolean removeSessionsByConnectPoint(ConnectPoint cp) {
+        boolean removed = false;
+        for (MacAddress key : sessionsMap.keySet()) {
+            PppoeSessionInfo entry = sessionsMap.asJavaMap().get(key);
+            if (entry.getClientCp().equals(cp)) {
+                sessionsMap.remove(key);
+                removed = true;
+            }
+        }
+        return removed;
+    }
+
+    private boolean removeSessionsByDevice(DeviceId devid) {
+        boolean removed = false;
+        for (MacAddress key : sessionsMap.keySet()) {
+            PppoeSessionInfo entry = sessionsMap.asJavaMap().get(key);
+            if (entry.getClientCp().deviceId().equals(devid)) {
+                sessionsMap.remove(key);
+                removed = true;
+            }
+        }
+        return removed;
+    }
+
+    private class PppoeAgentPacketProcessor implements PacketProcessor {
+        @Override
+        public void process(PacketContext context) {
+            packetProcessorExecutor.execute(() -> {
+                processInternal(context);
+            });
+        }
+
+        private void processInternal(PacketContext context) {
+            Ethernet packet = (Ethernet) context.inPacket().parsed().clone();
+            if (packet.getEtherType() == Ethernet.TYPE_PPPOED) {
+                processPppoedPacket(context, packet);
+            }
+        }
+
+        private void processPppoedPacket(PacketContext context, Ethernet packet) {
+            PPPoED pppoed = (PPPoED) packet.getPayload();
+            if (pppoed == null) {
+                log.warn("PPPoED payload is null");
+                return;
+            }
+
+            final byte pppoedCode = pppoed.getCode();
+            final short sessionId = pppoed.getSessionId();
+            final MacAddress clientMacAddress;
+            final ConnectPoint packetCp = context.inPacket().receivedFrom();
+            boolean serverMessage = false;
+
+            // Get the client MAC address
+            switch (pppoedCode) {
+                case PPPOED_CODE_PADT: {
+                    if (sessionsMap.containsKey(packet.getDestinationMAC())) {
+                        clientMacAddress = packet.getDestinationMAC();
+                        serverMessage = true;
+                    } else if (sessionsMap.containsKey(packet.getSourceMAC())) {
+                        clientMacAddress = packet.getSourceMAC();
+                    } else {
+                        // In the unlikely case of receiving a PADT without an existing session
+                        log.warn("PADT received for unknown session. Dropping packet.");
+                        return;
+                    }
+                    break;
+                }
+                case PPPOED_CODE_PADI:
+                case PPPOED_CODE_PADR: {
+                    clientMacAddress = packet.getSourceMAC();
+                    break;
+                }
+                default: {
+                    clientMacAddress = packet.getDestinationMAC();
+                    serverMessage = true;
+                    break;
+                }
+            }
+
+            SubscriberAndDeviceInformation subsInfo;
+            if (serverMessage) {
+                if (!sessionsMap.containsKey(clientMacAddress)) {
+                    log.error("PPPoED message received from server without an existing session. Message not relayed.");
+                    return;
+                } else {
+                    PppoeSessionInfo sessInfo = sessionsMap.get(clientMacAddress).value();
+                    subsInfo = getSubscriber(sessInfo.getClientCp());
+                }
+            } else {
+                subsInfo = getSubscriber(packetCp);
+            }
+
+            if (subsInfo == null) {
+                log.error("No Sadis info for subscriber on connect point {}. Message not relayed.", packetCp);
+                return;
+            }
+
+            log.trace("{} received from {} at {} with client mac: {}",
+                    PPPoED.Type.getTypeByValue(pppoedCode).toString(),
+                    serverMessage ? "server" : "client", packetCp, clientMacAddress);
+
+            if (log.isTraceEnabled()) {
+                log.trace("PPPoED message received from {} at {} {}",
+                        serverMessage ? "server" : "client", packetCp, packet);
+            }
+
+            // In case of PADI, force the removal of the previous session entry
+            if ((pppoedCode == PPPOED_CODE_PADI) && (sessionsMap.containsKey(clientMacAddress))) {
+                log.trace("PADI received from MAC: {} with existing session data. Removing the existing data.",
+                        clientMacAddress.toString());
+                sessionsMap.remove(clientMacAddress);
+            }
+
+            // Fill the session map entry
+            PppoeSessionInfo sessionInfo;
+            if (!sessionsMap.containsKey(clientMacAddress)) {
+                if (!serverMessage)  {
+                    ConnectPoint serverCp = getServerConnectPoint(packetCp.deviceId());
+                    SubscriberAndDeviceInformation subscriber = getSubscriber(packetCp);
+                    sessionInfo = new PppoeSessionInfo(packetCp, serverCp, pppoedCode,
+                            sessionId, subscriber, clientMacAddress);
+                    sessionsMap.put(clientMacAddress, sessionInfo);
+                } else {
+                    // This case was already covered.
+                    return;
+                }
+            } else {
+                sessionInfo = sessionsMap.get(clientMacAddress).value();
+            }
+
+            switch (pppoedCode) {
+                case PPPOED_CODE_PADI:
+                case PPPOED_CODE_PADR:
+                    updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+                            pppoedCode == PPPOED_CODE_PADI ? PppoeAgentCounterNames.PADI : PppoeAgentCounterNames.PADR);
+
+                    Ethernet padir = processPacketFromClient(context, packet, pppoed, sessionInfo, clientMacAddress);
+                    if (padir != null) {
+                        if (padir.serialize().length <= pppoeMaxMtu) {
+                            forwardPacketToServer(padir, sessionInfo);
+                        } else {
+                            log.debug("MTU message size: {} exceeded configured pppoeMaxMtu: {}. Dropping Packet.",
+                                    padir.serialize().length, pppoeMaxMtu);
+                            forwardPacketToClient(errorToClient(packet, pppoed, "MTU message size exceeded"),
+                                                                sessionInfo, clientMacAddress);
+                            updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+                                                          PppoeAgentCounterNames.MTU_EXCEEDED);
+                        }
+                    }
+                    break;
+                case PPPOED_CODE_PADO:
+                case PPPOED_CODE_PADS:
+                    updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+                            pppoedCode == PPPOED_CODE_PADO ? PppoeAgentCounterNames.PADO : PppoeAgentCounterNames.PADS);
+                    Ethernet pados = processPacketFromServer(packet, pppoed, sessionInfo, clientMacAddress);
+                    if (pados != null) {
+                        forwardPacketToClient(pados, sessionInfo, clientMacAddress);
+                    }
+                    break;
+                case PPPOED_CODE_PADT:
+                    if (serverMessage) {
+                        updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+                                                      PppoeAgentCounterNames.PADT_FROM_SERVER);
+                        forwardPacketToClient(packet, sessionInfo, clientMacAddress);
+                    } else {
+                        updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+                                                      PppoeAgentCounterNames.PADT_FROM_CLIENT);
+                        forwardPacketToServer(packet, sessionInfo);
+                    }
+
+                    String reason = "";
+                    PPPoEDTag genericErrorTag = pppoed.getTags()
+                            .stream()
+                            .filter(tag -> tag.getType() == PPPOED_TAG_GENERIC_ERROR)
+                            .findFirst()
+                            .orElse(null);
+
+                    if (genericErrorTag != null) {
+                        reason = new String(genericErrorTag.getValue(), StandardCharsets.UTF_8);
+                    }
+                    log.debug("PADT sessionId:{}  client MAC:{}  Terminate reason:{}.",
+                            Integer.toHexString(sessionId & 0xFFFF), clientMacAddress, reason);
+
+                    boolean knownSessionId = sessionInfo.getSessionId() == sessionId;
+                    if (knownSessionId) {
+                        PppoeSessionInfo removedSessionInfo = Versioned
+                                .valueOrNull(sessionsMap.remove(clientMacAddress));
+                        if (removedSessionInfo != null) {
+                            post(new PppoeAgentEvent(PppoeAgentEvent.Type.TERMINATE, removedSessionInfo,
+                                                     packetCp, clientMacAddress, reason));
+                        }
+                    } else {
+                        log.warn("PADT received for a known MAC address but for unknown session.");
+                    }
+                    break;
+                default:
+                    break;
+            }
+        }
+
+        private Ethernet processPacketFromClient(PacketContext context, Ethernet ethernetPacket, PPPoED pppoed,
+                                                 PppoeSessionInfo sessionInfo, MacAddress clientMacAddress) {
+            byte pppoedCode = pppoed.getCode();
+
+            sessionInfo.setPacketCode(pppoedCode);
+            sessionsMap.put(clientMacAddress, sessionInfo);
+
+            // Update Counters
+            for (PPPoEDTag tag : pppoed.getTags()) {
+                if (tag.getType() == PPPOED_TAG_GENERIC_ERROR) {
+                    updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+                            PppoeAgentCounterNames.GENERIC_ERROR_FROM_CLIENT);
+                    break;
+                }
+            }
+
+            Ethernet ethFwd = ethernetPacket;
+
+            // If this is a PADI packet, there'll be a START event.
+            if (pppoedCode == PPPOED_CODE_PADI) {
+                post(new PppoeAgentEvent(PppoeAgentEvent.Type.START, sessionInfo, sessionInfo.getClientCp(),
+                        clientMacAddress));
+            }
+
+            // Creates the vendor specific tag.
+            String circuitId = getCircuitId(sessionInfo.getClientCp());
+            if (circuitId == null) {
+                log.error("Failed to build circuid-id for client on connect point {}. Dropping packet.",
+                        sessionInfo.getClientCp());
+                return null;
+            }
+
+            // Checks whether the circuit-id is valid, if it's not it drops the packet.
+            if (!isCircuitIdValid(circuitId, sessionInfo.getSubscriber())) {
+                log.warn("Invalid circuit ID, dropping packet.");
+                PppoeAgentEvent invalidCidEvent = new PppoeAgentEvent(PppoeAgentEvent.Type.INVALID_CID, sessionInfo,
+                        context.inPacket().receivedFrom(), clientMacAddress);
+                post(invalidCidEvent);
+                return null;
+            }
+
+            String remoteId = sessionInfo.getSubscriber().remoteId();
+            byte[] vendorSpecificTag = new PPPoEDVendorSpecificTag(circuitId, remoteId).toByteArray();
+
+            // According to TR-101, R-149 (by Broadband Forum), agent must REPLACE vendor-specific tag that may come
+            // from client message with its own tag.
+            // The following block ensures that agent removes any previous vendor-specific tag.
+            List<PPPoEDTag> originalTags = pppoed.getTags();
+            if (originalTags != null) {
+                PPPoEDTag originalVendorSpecificTag = originalTags.stream()
+                        .filter(tag -> tag.getType() == PPPOED_TAG_VENDOR_SPECIFIC)
+                        .findFirst()
+                        .orElse(null);
+
+                if (originalVendorSpecificTag != null) {
+                    int tagToRemoveLength = originalVendorSpecificTag.getLength();
+                    originalTags.removeIf(tag -> tag.getType() == PPPOED_TAG_VENDOR_SPECIFIC);
+                    pppoed.setPayloadLength((short) (pppoed.getPayloadLength() - tagToRemoveLength));
+                }
+            }
+
+            pppoed.setTag(PPPOED_TAG_VENDOR_SPECIFIC, vendorSpecificTag);
+
+            ethFwd.setPayload(pppoed);
+            ethFwd.setQinQTPID(Ethernet.TYPE_VLAN);
+
+            UniTagInformation uniTagInformation = getUnitagInformationFromPacketContext(context,
+                    sessionInfo.getSubscriber());
+            if (uniTagInformation == null) {
+                log.warn("Missing service information for connectPoint {} / cTag {}",
+                        context.inPacket().receivedFrom(),  context.inPacket().parsed().getVlanID());
+                return null;
+            }
+            ethFwd.setQinQVID(uniTagInformation.getPonSTag().toShort());
+            ethFwd.setPad(true);
+            return ethFwd;
+        }
+
+        private Ethernet processPacketFromServer(Ethernet ethernetPacket, PPPoED pppoed,
+                                                 PppoeSessionInfo sessionInfo, MacAddress clientMacAddress) {
+            // Update counters
+            List<PPPoEDTag> tags = pppoed.getTags();
+            for (PPPoEDTag tag : tags) {
+                switch (tag.getType()) {
+                    case PPPOED_TAG_GENERIC_ERROR:
+                        updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+                                PppoeAgentCounterNames.GENERIC_ERROR_FROM_SERVER);
+                        break;
+                    case PPPOED_TAG_SERVICE_NAME_ERROR:
+                        updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+                                PppoeAgentCounterNames.SERVICE_NAME_ERROR);
+                        break;
+                    case PPPOED_TAG_AC_SYSTEM_ERROR:
+                        updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+                                PppoeAgentCounterNames.AC_SYSTEM_ERROR);
+                        break;
+                    default:
+                        break;
+                }
+            }
+
+            byte pppoedCode = pppoed.getCode();
+
+            if (pppoedCode == PPPOED_CODE_PADS) {
+                log.debug("PADS sessionId:{}  client MAC:{}", Integer.toHexString(pppoed.getSessionId() & 0xFFFF),
+                        clientMacAddress);
+                sessionInfo.setSessionId(pppoed.getSessionId());
+            }
+            sessionInfo.setPacketCode(pppoedCode);
+            sessionsMap.put(clientMacAddress, sessionInfo);
+
+            PppoeAgentEvent.Type eventType = pppoedCode == PPPOED_CODE_PADS ?
+                    PppoeAgentEvent.Type.SESSION_ESTABLISHED :
+                    PppoeAgentEvent.Type.NEGOTIATION;
+
+            post(new PppoeAgentEvent(eventType, sessionInfo, sessionInfo.getClientCp(), clientMacAddress));
+
+            ethernetPacket.setQinQVID(QINQ_VID_NONE);
+            ethernetPacket.setPad(true);
+            return ethernetPacket;
+        }
+
+        private void updatePppoeAgentCountersStore(SubscriberAndDeviceInformation sub,
+                                                   PppoeAgentCounterNames counterType) {
+            // Update global counter stats
+            pppoeAgentCounters.incrementCounter(PppoeAgentEvent.GLOBAL_COUNTER, counterType);
+            if (sub == null) {
+                log.warn("Counter not updated as subscriber info not found.");
+            } else {
+                // Update subscriber counter stats
+                pppoeAgentCounters.incrementCounter(sub.id(), counterType);
+            }
+        }
+
+        private String getCircuitId(ConnectPoint cp) {
+            return new CircuitIdBuilder()
+                    .setConnectPoint(cp)
+                    .setDeviceService(deviceService)
+                    .setSubsService(subsService)
+                    .setCircuitIdConfig(circuitIdfields)
+                    .addCustomSeparator(CircuitIdFieldName.AcessNodeIdentifier, " ")
+                    .addCustomSeparator(CircuitIdFieldName.Port, ":")
+                    .build();
+        }
+
+        protected ConnectPoint getServerConnectPoint(DeviceId deviceId) {
+            ConnectPoint serverCp;
+            if (!useOltUplink) {
+                serverCp = pppoeServerConnectPoint.get();
+            } else {
+                serverCp = getUplinkConnectPointOfOlt(deviceId);
+            }
+            return serverCp;
+        }
+
+        private boolean isCircuitIdValid(String cId, SubscriberAndDeviceInformation entry) {
+            if (!enableCircuitIdValidation) {
+                log.debug("Circuit ID validation is disabled.");
+                return true;
+            }
+
+            if (entry == null) {
+                log.error("SubscriberAndDeviceInformation cannot be null.");
+                return false;
+            }
+
+            if (entry.circuitId() == null || entry.circuitId().isEmpty()) {
+                log.debug("Circuit ID not configured in SADIS entry. No check is done.");
+                return true;
+            } else {
+                if (cId.equals(entry.circuitId())) {
+                    log.info("Circuit ID in packet: {} matched the configured entry on SADIS.", cId);
+                    return true;
+                } else {
+                    log.warn("Circuit ID in packet: {} did not match the configured entry on SADIS: {}.",
+                            cId, entry.circuitId());
+                    return false;
+                }
+            }
+        }
+
+        private void forwardPacketToServer(Ethernet packet, PppoeSessionInfo sessionInfo) {
+            ConnectPoint toSendTo = sessionInfo.getServerCp();
+            if (toSendTo != null) {
+                log.info("Sending PPPOE packet to server at {}", toSendTo);
+                TrafficTreatment t = DefaultTrafficTreatment.builder().setOutput(toSendTo.port()).build();
+                OutboundPacket o = new DefaultOutboundPacket(toSendTo.deviceId(), t,
+                        ByteBuffer.wrap(packet.serialize()));
+                if (log.isTraceEnabled()) {
+                    log.trace("Relaying packet to pppoe server at {} {}", toSendTo, packet);
+                }
+                packetService.emit(o);
+
+                updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+                        PppoeAgentCounterNames.PPPOED_PACKETS_TO_SERVER);
+            } else {
+                log.error("No connect point to send msg to PPPOE Server");
+            }
+        }
+
+        private void forwardPacketToClient(Ethernet packet, PppoeSessionInfo sessionInfo, MacAddress clientMacAddress) {
+            ConnectPoint subCp = sessionInfo.getClientCp();
+            if (subCp == null) {
+                log.error("Dropping PPPOE packet, can't find a connectpoint for MAC {}", clientMacAddress);
+                return;
+            }
+
+            log.info("Sending PPPOE packet to client at {}", subCp);
+            TrafficTreatment t = DefaultTrafficTreatment.builder()
+                    .setOutput(subCp.port()).build();
+            OutboundPacket o = new DefaultOutboundPacket(
+                    subCp.deviceId(), t, ByteBuffer.wrap(packet.serialize()));
+            if (log.isTraceEnabled()) {
+                log.trace("Relaying packet to pppoe client at {} {}", subCp, packet);
+            }
+
+            packetService.emit(o);
+
+            updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+                    PppoeAgentCounterNames.PPPOED_PACKETS_FROM_SERVER);
+        }
+
+        private Ethernet errorToClient(Ethernet packet, PPPoED p, String errString) {
+            PPPoED err = new PPPoED();
+            err.setVersion(p.getVersion());
+            err.setType(p.getType());
+            switch (p.getCode()) {
+                case PPPOED_CODE_PADI:
+                    err.setCode(PPPOED_CODE_PADO);
+                    break;
+                case PPPOED_CODE_PADR:
+                    err.setCode(PPPOED_CODE_PADS);
+                    break;
+                default:
+                    break;
+            }
+            err.setCode(p.getCode());
+            err.setSessionId(p.getSessionId());
+            err.setTag(PPPOED_TAG_GENERIC_ERROR, errString.getBytes(StandardCharsets.UTF_8));
+
+            Ethernet ethPacket = new Ethernet();
+            ethPacket.setPayload(err);
+            ethPacket.setSourceMACAddress(packet.getDestinationMACAddress());
+            ethPacket.setDestinationMACAddress(packet.getSourceMACAddress());
+            ethPacket.setQinQVID(QINQ_VID_NONE);
+            ethPacket.setPad(true);
+
+            return ethPacket;
+        }
+    }
+
+    private class InternalConfigListener implements NetworkConfigListener {
+        @Override
+        public void event(NetworkConfigEvent event) {
+
+            if ((event.type() == NetworkConfigEvent.Type.CONFIG_ADDED ||
+                    event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED) &&
+                    event.configClass().equals(PppoeAgentConfig.class)) {
+                updateConfig();
+                log.info("Reconfigured");
+            }
+        }
+    }
+
+    /**
+     * Handles Mastership changes for the devices which connect to the PPPOE server.
+     */
+    private class InnerMastershipListener implements MastershipListener {
+        @Override
+        public void event(MastershipEvent event) {
+            if (!useOltUplink) {
+                if (pppoeServerConnectPoint.get() != null &&
+                        pppoeServerConnectPoint.get().deviceId().equals(event.subject())) {
+                    log.trace("Mastership Event recevived for {}", event.subject());
+                    // mastership of the device for our connect point has changed, reselect
+                    selectServerConnectPoint();
+                }
+            }
+        }
+    }
+
+    private class InnerDeviceListener implements DeviceListener {
+        @Override
+        public void event(DeviceEvent event) {
+            DeviceId devId = event.subject().id();
+
+            if (log.isTraceEnabled() && !event.type().equals(DeviceEvent.Type.PORT_STATS_UPDATED)) {
+                log.trace("Device Event received for {} event {}", event.subject(), event.type());
+            }
+
+            // Handle events from any other device
+            switch (event.type()) {
+                case PORT_UPDATED:
+                    if (!event.port().isEnabled()) {
+                        ConnectPoint cp = new ConnectPoint(devId, event.port().number());
+                        removeSessionsByConnectPoint(cp);
+                    }
+                    break;
+                case PORT_REMOVED:
+                    // Remove all entries related to this port from sessions map
+                    ConnectPoint cp = new ConnectPoint(devId, event.port().number());
+                    removeSessionsByConnectPoint(cp);
+                    break;
+                case DEVICE_REMOVED:
+                    // Remove all entries related to this device from sessions map
+                    removeSessionsByDevice(devId);
+                    break;
+                default:
+                    break;
+            }
+        }
+    }
+
+    private class InnerPppoeAgentStoreDelegate implements PppoeAgentStoreDelegate {
+        @Override
+        public void notify(PppoeAgentEvent event) {
+            if (event.type().equals(PppoeAgentEvent.Type.STATS_UPDATE)) {
+                PppoeAgentEvent toPost = event;
+                if (event.getSubscriberId() != null) {
+                    // infuse the event with the allocation info before posting
+                    PppoeSessionInfo info = Versioned.valueOrNull(
+                            sessionsMap.stream().filter(entry -> event.getSubscriberId()
+                                    .equals(entry.getValue().value().getSubscriber().id()))
+                                    .map(Map.Entry::getValue)
+                                    .findFirst()
+                                    .orElse(null));
+                    if (info == null) {
+                        log.debug("Not handling STATS_UPDATE event for session that no longer exists. {}.", event);
+                        return;
+                    }
+
+                    toPost = new PppoeAgentEvent(event.type(), info, event.getCounterName(), event.getCounterValue(),
+                                                 info.getClientMac(), event.getSubscriberId());
+                }
+                post(toPost);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentConfig.java b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentConfig.java
new file mode 100644
index 0000000..5d14c76
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentConfig.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.impl;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.google.common.collect.ImmutableSet;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.config.Config;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class PppoeAgentConfig extends Config<ApplicationId> {
+    private static final String PPPOE_CONNECT_POINTS = "pppoeServerConnectPoints";
+    private static final String USE_OLT_ULPORT_FOR_PKT_INOUT = "useOltUplinkForServerPktInOut";
+
+    protected static final Boolean DEFAULT_USE_OLT_ULPORT_FOR_PKT_INOUT = true;
+
+    @Override
+    public boolean isValid() {
+        return hasOnlyFields(PPPOE_CONNECT_POINTS, USE_OLT_ULPORT_FOR_PKT_INOUT);
+    }
+
+    /**
+     * Returns whether the app would use the uplink port of OLT for sending/receving
+     * messages to/from the server.
+     *
+     * @return true if OLT uplink port is to be used, false otherwise
+     */
+    public boolean getUseOltUplinkForServerPktInOut() {
+        if (object == null) {
+            return DEFAULT_USE_OLT_ULPORT_FOR_PKT_INOUT;
+        }
+        if (!object.has(USE_OLT_ULPORT_FOR_PKT_INOUT)) {
+            return DEFAULT_USE_OLT_ULPORT_FOR_PKT_INOUT;
+        }
+        return object.path(USE_OLT_ULPORT_FOR_PKT_INOUT).asBoolean();
+    }
+
+    /**
+     * Returns the pppoe server connect points.
+     *
+     * @return pppoe server connect points
+     */
+    public Set<ConnectPoint> getPppoeServerConnectPoint() {
+        if (object == null) {
+            return new HashSet<ConnectPoint>();
+        }
+
+        if (!object.has(PPPOE_CONNECT_POINTS)) {
+            return ImmutableSet.of();
+        }
+
+        ImmutableSet.Builder<ConnectPoint> builder = ImmutableSet.builder();
+        ArrayNode arrayNode = (ArrayNode) object.path(PPPOE_CONNECT_POINTS);
+        for (JsonNode jsonNode : arrayNode) {
+            String portName = jsonNode.asText(null);
+            if (portName == null) {
+                return null;
+            }
+            try {
+                builder.add(ConnectPoint.deviceConnectPoint(portName));
+            } catch (IllegalArgumentException e) {
+                return null;
+            }
+        }
+        return builder.build();
+    }
+
+
+}
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCounterNames.java b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCounterNames.java
new file mode 100644
index 0000000..0d58078
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCounterNames.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.opencord.pppoeagent.impl;
+
+import com.google.common.collect.ImmutableSet;
+
+import java.util.Set;
+
+/**
+ * Represents PPPoE agent counters type.
+ */
+public enum PppoeAgentCounterNames {
+    /**
+     * Number of PADI messages received from client.
+     */
+    PADI,
+    /**
+     * Number of PADO messages received from server.
+     */
+    PADO,
+    /**
+     * Number of PADR messages received from client.
+     */
+    PADR,
+    /**
+     * Number of PADS messages received from server.
+     */
+    PADS,
+    /**
+     * Number of PADT messages received from server.
+     */
+    PADT_FROM_SERVER,
+    /**
+     * Number of PADT messages received from client.
+     */
+    PADT_FROM_CLIENT,
+    /**
+     * Number of PPPoED messages sent to server.
+     */
+    PPPOED_PACKETS_TO_SERVER,
+    /**
+     * Number  of PPPoED messages received from server.
+     */
+    PPPOED_PACKETS_FROM_SERVER,
+    /**
+     * Number of MTU Exceeded errors generated by the PPPoED agent.
+     */
+    MTU_EXCEEDED,
+    /**
+     * Number of Generic Errors received from server.
+     */
+    GENERIC_ERROR_FROM_SERVER,
+    /**
+     * Number of Generic Errors received from client.
+     */
+    GENERIC_ERROR_FROM_CLIENT,
+    /**
+     * Number of ServiceName Errors received from server.
+     */
+    SERVICE_NAME_ERROR,
+    /**
+     * Number of AC-System Errors received from server.
+     */
+    AC_SYSTEM_ERROR;
+
+    /**
+     * Supported types of PPPoED agent counters.
+     */
+    public  static final Set<PppoeAgentCounterNames> SUPPORTED_COUNTERS = ImmutableSet.of(
+            PppoeAgentCounterNames.PADI, PppoeAgentCounterNames.PADO,
+            PppoeAgentCounterNames.PADR, PppoeAgentCounterNames.PADS,
+            PppoeAgentCounterNames.PADT_FROM_SERVER, PppoeAgentCounterNames.PADT_FROM_CLIENT,
+            PppoeAgentCounterNames.PPPOED_PACKETS_TO_SERVER, PppoeAgentCounterNames.PPPOED_PACKETS_FROM_SERVER,
+            PppoeAgentCounterNames.MTU_EXCEEDED, PppoeAgentCounterNames.GENERIC_ERROR_FROM_SERVER,
+            PppoeAgentCounterNames.GENERIC_ERROR_FROM_CLIENT, PppoeAgentCounterNames.SERVICE_NAME_ERROR,
+            PppoeAgentCounterNames.AC_SYSTEM_ERROR);
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCountersIdentifier.java b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCountersIdentifier.java
new file mode 100644
index 0000000..cca163f
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCountersIdentifier.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.impl;
+
+import org.opencord.pppoeagent.PppoeAgentEvent;
+
+import java.util.Objects;
+
+/**
+ * Represents PPPoED agent counters identifier.
+ */
+public final class PppoeAgentCountersIdentifier {
+    final String counterClassKey;
+    final Enum<PppoeAgentCounterNames> counterTypeKey;
+
+    /**
+     * Creates a default global counter identifier for a given counterType.
+     *
+     * @param counterTypeKey Identifies the supported type of pppoe agent counters
+     */
+    public PppoeAgentCountersIdentifier(PppoeAgentCounterNames counterTypeKey) {
+        this.counterClassKey = PppoeAgentEvent.GLOBAL_COUNTER;
+        this.counterTypeKey = counterTypeKey;
+    }
+
+    /**
+     * Creates a counter identifier. A counter is defined by the key pair [counterClass, counterType],
+     * where counterClass can be global or the subscriber ID and counterType is the supported pppoe counter.
+     *
+     * @param counterClassKey Identifies which class the counter is assigned (global or per subscriber)
+     * @param counterTypeKey Identifies the supported type of pppoed relay counters
+     */
+    public PppoeAgentCountersIdentifier(String counterClassKey, PppoeAgentCounterNames counterTypeKey) {
+        this.counterClassKey = counterClassKey;
+        this.counterTypeKey = counterTypeKey;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj instanceof PppoeAgentCountersIdentifier) {
+            final PppoeAgentCountersIdentifier other = (PppoeAgentCountersIdentifier) obj;
+            return Objects.equals(this.counterClassKey, other.counterClassKey)
+                    && Objects.equals(this.counterTypeKey, other.counterTypeKey);
+        }
+
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(counterClassKey, counterTypeKey);
+    }
+
+    @Override
+    public String toString() {
+        return "PppoeAgentCountersIdentifier{" +
+                "counterClassKey='" + counterClassKey + '\'' +
+                ", counterTypeKey=" + counterTypeKey +
+                '}';
+    }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCountersStore.java b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCountersStore.java
new file mode 100644
index 0000000..5b4ddfd
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCountersStore.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.pppoeagent.impl;
+
+import org.onosproject.store.Store;
+import org.opencord.pppoeagent.PppoeAgentEvent;
+import org.opencord.pppoeagent.PppoeAgentStoreDelegate;
+
+/**
+ * Represents a stored Pppoe Agent Counters. A counter entry is defined by the pair [counterClass, counterType],
+ * where counterClass can be maybe global or subscriber ID and counterType is the pppoe counter.
+ */
+public interface PppoeAgentCountersStore extends Store<PppoeAgentEvent, PppoeAgentStoreDelegate> {
+
+    String NAME = "PPPOE_Agent_stats";
+
+    /**
+     * Creates or updates PPPOE Agent counter.
+     *
+     * @param counterClass class of counters (global, per subscriber).
+     * @param counterType name of counter
+     */
+    void incrementCounter(String counterClass, PppoeAgentCounterNames counterType);
+
+    /**
+     * Sets the value of a PPPOE Agent counter.
+     *
+     * @param counterClass class of counters (global, per subscriber).
+     * @param counterType name of counter
+     * @param value The value of the counter
+     */
+    void setCounter(String counterClass, PppoeAgentCounterNames counterType, Long value);
+
+    /**
+     * Gets the current PPPoE Agent counter values.
+     *
+     * @return PPPoE Agent counter values
+     */
+    PppoeAgentStatistics getCounters();
+
+    /**
+     * Resets counter values for a given counter class.
+     *
+     * @param counterClass class of counters (global, per subscriber).
+     */
+    void resetCounters(String counterClass);
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentStatistics.java b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentStatistics.java
new file mode 100644
index 0000000..dce1ac7
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentStatistics.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.impl;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Snapshot of PPPoE Agent statistics.
+ */
+public class PppoeAgentStatistics {
+    private final ImmutableMap<PppoeAgentCountersIdentifier, Long> counters;
+    private PppoeAgentStatistics(ImmutableMap<PppoeAgentCountersIdentifier, Long> counters) {
+        this.counters = counters;
+    }
+    /**
+     * Creates a new empty statistics instance.
+     */
+    public PppoeAgentStatistics() {
+        counters = ImmutableMap.of();
+    }
+    /**
+     * Gets the value of the counter with the given ID. Defaults to 0 if counter is not present.
+     *
+     * @param id counter ID
+     * @return counter value
+     */
+    public long get(PppoeAgentCountersIdentifier id) {
+        return counters.getOrDefault(id, 0L);
+    }
+    /**
+     * Gets the map of counters.
+     *
+     * @return map of counters
+     */
+    public Map<PppoeAgentCountersIdentifier, Long> counters() {
+        return counters;
+    }
+    /**
+     * Creates a new statistics instance with the given counter values.
+     *
+     * @param counters counters
+     * @return statistics
+     */
+    public static PppoeAgentStatistics withCounters(Map<PppoeAgentCountersIdentifier, Long> counters) {
+        ImmutableMap.Builder<PppoeAgentCountersIdentifier, Long> builder = ImmutableMap.builder();
+        counters.forEach(builder::put);
+        return new PppoeAgentStatistics(builder.build());
+    }
+    /**
+     * Adds the given statistics instance to this one (sums the common counters) and returns
+     * a new instance containing the result.
+     *
+     * @param other other instance
+     * @return result
+     */
+    public PppoeAgentStatistics add(PppoeAgentStatistics other) {
+        ImmutableMap.Builder<PppoeAgentCountersIdentifier, Long> builder = ImmutableMap.builder();
+        Set<PppoeAgentCountersIdentifier> keys = Sets.newHashSet(other.counters.keySet());
+        counters.forEach((id, value) -> {
+            builder.put(id, value + other.counters.getOrDefault(id, 0L));
+            keys.remove(id);
+        });
+        keys.forEach(i -> builder.put(i, other.counters.get(i)));
+        return new PppoeAgentStatistics(builder.build());
+    }
+    @Override
+    public String toString() {
+        MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this.getClass());
+        counters.forEach((id, v) -> helper.add(id.toString(), v));
+        return helper.toString();
+    }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/SimplePppoeAgentCountersStore.java b/app/src/main/java/org/opencord/pppoeagent/impl/SimplePppoeAgentCountersStore.java
new file mode 100644
index 0000000..8be88b7
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/SimplePppoeAgentCountersStore.java
@@ -0,0 +1,305 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.impl;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.SafeRecurringTask;
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
+import org.opencord.pppoeagent.PppoeAgentEvent;
+import org.opencord.pppoeagent.PppoeAgentStoreDelegate;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+
+import org.slf4j.Logger;
+import java.nio.charset.StandardCharsets;
+
+import java.util.Dictionary;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.PUBLISH_COUNTERS_RATE;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.PUBLISH_COUNTERS_RATE_DEFAULT;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.SYNC_COUNTERS_RATE;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.SYNC_COUNTERS_RATE_DEFAULT;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * PPPoE Agent Counters Manager Component.
+ */
+@Component(immediate = true,
+        property = {
+                PUBLISH_COUNTERS_RATE + ":Integer=" + PUBLISH_COUNTERS_RATE_DEFAULT,
+                SYNC_COUNTERS_RATE + ":Integer=" + SYNC_COUNTERS_RATE_DEFAULT,
+        }
+)
+public class SimplePppoeAgentCountersStore extends AbstractStore<PppoeAgentEvent, PppoeAgentStoreDelegate>
+        implements PppoeAgentCountersStore {
+    private static final String PPPOE_STATISTICS_LEADERSHIP = "pppoeagent-statistics";
+    private static final MessageSubject RESET_SUBJECT = new MessageSubject("pppoeagent-statistics-reset");
+
+    private final Logger log = getLogger(getClass());
+    private ConcurrentMap<PppoeAgentCountersIdentifier, Long> countersMap;
+
+    private EventuallyConsistentMap<NodeId, PppoeAgentStatistics> statistics;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StorageService storageService;
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterService clusterService;
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected LeadershipService leadershipService;
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ComponentConfigService componentConfigService;
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterCommunicationService clusterCommunicationService;
+    protected int publishCountersRate = PUBLISH_COUNTERS_RATE_DEFAULT;
+    protected int syncCountersRate = SYNC_COUNTERS_RATE_DEFAULT;
+    KryoNamespace serializer = KryoNamespace.newBuilder()
+            .register(KryoNamespaces.API)
+            .register(PppoeAgentStatistics.class)
+            .register(PppoeAgentCountersIdentifier.class)
+            .register(PppoeAgentCounterNames.class)
+            .register(ClusterMessage.class)
+            .register(MessageSubject.class)
+            .build();
+    private ScheduledExecutorService executor;
+    private ScheduledFuture<?> publisherTask;
+    private ScheduledFuture<?> syncTask;
+    private AtomicBoolean dirty = new AtomicBoolean(true);
+
+    @Activate
+    public void activate(ComponentContext context) {
+        log.info("Activate PPPoE Agent Counters Manager");
+        countersMap = new ConcurrentHashMap<>();
+        componentConfigService.registerProperties(getClass());
+        modified(context);
+        statistics = storageService.<NodeId, PppoeAgentStatistics>eventuallyConsistentMapBuilder()
+                .withName("pppoeagent-statistics")
+                .withSerializer(serializer)
+                .withTimestampProvider((k, v) -> new WallClockTimestamp())
+                .build();
+        // Initialize counter values for the global counters
+        initCounters(PppoeAgentEvent.GLOBAL_COUNTER, statistics.get(clusterService.getLocalNode().id()));
+        syncStats();
+        leadershipService.runForLeadership(PPPOE_STATISTICS_LEADERSHIP);
+        executor = Executors.newScheduledThreadPool(1);
+        clusterCommunicationService.addSubscriber(RESET_SUBJECT, Serializer.using(serializer)::decode,
+                this::resetLocal, executor);
+        startSyncTask();
+        startPublishTask();
+    }
+
+    @Deactivate
+    public void deactivate() {
+        clusterCommunicationService.removeSubscriber(RESET_SUBJECT);
+        leadershipService.withdraw(PPPOE_STATISTICS_LEADERSHIP);
+        stopPublishTask();
+        stopSyncTask();
+        executor.shutdownNow();
+        componentConfigService.unregisterProperties(getClass(), false);
+    }
+    @Modified
+    public void modified(ComponentContext context) {
+        Dictionary<String, Object> properties = context.getProperties();
+        String s = Tools.get(properties, PUBLISH_COUNTERS_RATE);
+        int oldPublishCountersRate = publishCountersRate;
+        publishCountersRate = Strings.isNullOrEmpty(s) ? PUBLISH_COUNTERS_RATE_DEFAULT
+                : Integer.parseInt(s.trim());
+        if (oldPublishCountersRate != publishCountersRate) {
+            stopPublishTask();
+            startPublishTask();
+        }
+        s = Tools.get(properties, SYNC_COUNTERS_RATE);
+        int oldSyncCountersRate = syncCountersRate;
+        syncCountersRate = Strings.isNullOrEmpty(s) ? SYNC_COUNTERS_RATE_DEFAULT
+                : Integer.parseInt(s.trim());
+        if (oldSyncCountersRate != syncCountersRate) {
+            stopSyncTask();
+            startSyncTask();
+        }
+    }
+    private ScheduledFuture<?> startTask(Runnable r, int rate) {
+        return executor.scheduleAtFixedRate(SafeRecurringTask.wrap(r),
+                0, rate, TimeUnit.SECONDS);
+    }
+    private void stopTask(ScheduledFuture<?> task) {
+        task.cancel(true);
+    }
+    private void startSyncTask() {
+        syncTask = startTask(this::syncStats, syncCountersRate);
+    }
+    private void stopSyncTask() {
+        stopTask(syncTask);
+    }
+    private void startPublishTask() {
+        publisherTask = startTask(this::publishStats, publishCountersRate);
+    }
+    private void stopPublishTask() {
+        stopTask(publisherTask);
+    }
+
+    ImmutableMap<PppoeAgentCountersIdentifier, Long> getCountersMap() {
+        return ImmutableMap.copyOf(countersMap);
+    }
+
+    public PppoeAgentStatistics getCounters() {
+        return aggregate();
+    }
+
+    /**
+     * Initialize the supported counters map for the given counter class.
+     * @param counterClass class of counters (global, per subscriber)
+     * @param existingStats existing values to intialise the counters to
+     */
+    public void initCounters(String counterClass, PppoeAgentStatistics existingStats) {
+        checkNotNull(counterClass, "counter class can't be null");
+        for (PppoeAgentCounterNames counterType : PppoeAgentCounterNames.SUPPORTED_COUNTERS) {
+            PppoeAgentCountersIdentifier id = new PppoeAgentCountersIdentifier(counterClass, counterType);
+            countersMap.put(id, existingStats == null ? 0L : existingStats.get(id));
+        }
+    }
+
+    /**
+     * Inserts the counter entry if it is not already in the set otherwise increment the existing counter entry.
+     * @param counterClass class of counters (global, per subscriber)
+     * @param counterType name of counter
+     */
+    public void incrementCounter(String counterClass, PppoeAgentCounterNames counterType) {
+        checkNotNull(counterClass, "counter class can't be null");
+        if (PppoeAgentCounterNames.SUPPORTED_COUNTERS.contains(counterType)) {
+            PppoeAgentCountersIdentifier counterIdentifier =
+                    new PppoeAgentCountersIdentifier(counterClass, counterType);
+            countersMap.compute(counterIdentifier, (key, counterValue) ->
+                    (counterValue != null) ? counterValue + 1 : 1L);
+        } else {
+            log.error("Failed to increment counter class {} of type {}", counterClass, counterType);
+        }
+        dirty.set(true);
+    }
+
+    /**
+     * Reset the counters map for the given counter class.
+     * @param counterClass class of counters (global, per subscriber)
+     */
+    public void resetCounters(String counterClass) {
+        byte[] payload = counterClass.getBytes(StandardCharsets.UTF_8);
+        ClusterMessage reset = new ClusterMessage(clusterService.getLocalNode().id(), RESET_SUBJECT, payload);
+        clusterCommunicationService.broadcastIncludeSelf(reset, RESET_SUBJECT, Serializer.using(serializer)::encode);
+    }
+    private void resetLocal(ClusterMessage m) {
+        String counterClass = new String(m.payload(), StandardCharsets.UTF_8);
+        checkNotNull(counterClass, "counter class can't be null");
+        for (PppoeAgentCounterNames counterType : PppoeAgentCounterNames.SUPPORTED_COUNTERS) {
+            PppoeAgentCountersIdentifier counterIdentifier =
+                    new PppoeAgentCountersIdentifier(counterClass, counterType);
+            countersMap.computeIfPresent(counterIdentifier, (key, counterValue) -> 0L);
+        }
+        dirty.set(true);
+        syncStats();
+    }
+
+    /**
+     * Inserts the counter entry if it is not already in the set otherwise update the existing counter entry.
+     * @param counterClass class of counters (global, per subscriber).
+     * @param counterType name of counter
+     * @param value counter value
+     */
+    public void setCounter(String counterClass, PppoeAgentCounterNames counterType, Long value) {
+        checkNotNull(counterClass, "counter class can't be null");
+        if (PppoeAgentCounterNames.SUPPORTED_COUNTERS.contains(counterType)) {
+            PppoeAgentCountersIdentifier counterIdentifier =
+                    new PppoeAgentCountersIdentifier(counterClass, counterType);
+            countersMap.put(counterIdentifier, value);
+        } else {
+            log.error("Failed to increment counter class {} of type {}", counterClass, counterType);
+        }
+        dirty.set(true);
+        syncStats();
+    }
+
+    private PppoeAgentStatistics aggregate() {
+        return statistics.values().stream()
+                .reduce(new PppoeAgentStatistics(), PppoeAgentStatistics::add);
+    }
+    /**
+     * Creates a snapshot of the current in-memory statistics.
+     *
+     * @return snapshot of statistics
+     */
+    private PppoeAgentStatistics snapshot() {
+        return PppoeAgentStatistics.withCounters(countersMap);
+    }
+    /**
+     * Syncs in-memory stats to the eventually consistent map.
+     */
+    private void syncStats() {
+        if (dirty.get()) {
+            statistics.put(clusterService.getLocalNode().id(), snapshot());
+            dirty.set(false);
+        }
+    }
+    private void publishStats() {
+        // Only publish events if we are the cluster leader for PPPoE Agent stats
+        if (!Objects.equals(leadershipService.getLeader(PPPOE_STATISTICS_LEADERSHIP),
+                clusterService.getLocalNode().id())) {
+            return;
+        }
+        if (aggregate().counters() != null) {
+            aggregate().counters().forEach((counterKey, counterValue) -> {
+                // Subscriber-specific counters have the subscriber ID set
+                String subscriberId = null;
+                if (!counterKey.counterClassKey.equals(PppoeAgentEvent.GLOBAL_COUNTER)) {
+                    subscriberId = counterKey.counterClassKey;
+                }
+                if (delegate != null) {
+                    delegate.notify(new PppoeAgentEvent(PppoeAgentEvent.Type.STATS_UPDATE, null,
+                                                        counterKey.counterTypeKey.toString(), counterValue,
+                                       null, subscriberId));
+                }
+            });
+        } else {
+            log.debug("Ignoring 'publishStats' request since counters are null.");
+        }
+    }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/package-info.java b/app/src/main/java/org/opencord/pppoeagent/impl/package-info.java
new file mode 100644
index 0000000..84ceb21
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * PPPoE agent implementation.
+ */
+package org.opencord.pppoeagent.impl;
diff --git a/app/src/main/java/org/opencord/pppoeagent/rest/PppoeAgentWebResource.java b/app/src/main/java/org/opencord/pppoeagent/rest/PppoeAgentWebResource.java
new file mode 100644
index 0000000..56d5598
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/rest/PppoeAgentWebResource.java
@@ -0,0 +1,186 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.rest;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.util.Map;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.PPPoED;
+import org.onlab.util.ItemNotFoundException;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.Port;
+import org.onosproject.rest.AbstractWebResource;
+
+import org.opencord.pppoeagent.impl.PppoeAgentCounterNames;
+import org.opencord.pppoeagent.impl.PppoeAgentCountersStore;
+import org.opencord.pppoeagent.impl.PppoeAgentCountersIdentifier;
+import org.opencord.pppoeagent.impl.PppoeAgentStatistics;
+import org.opencord.pppoeagent.PppoeAgentEvent;
+import org.opencord.pppoeagent.PppoeAgentService;
+import org.opencord.pppoeagent.PppoeSessionInfo;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
+
+/**
+ * PppoeAgent web resource.
+ */
+@Path("pppoeagent-app")
+public class PppoeAgentWebResource extends AbstractWebResource {
+    private final ObjectNode root = mapper().createObjectNode();
+    private final ArrayNode node = root.putArray("entry");
+    private static final String SESSION_NOT_FOUND = "Session not found";
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    DeviceService deviceService = AbstractShellCommand.get(DeviceService.class);
+
+    /**
+     * Get session info object.
+     *
+     * @param mac Session MAC address
+     *
+     * @return 200 OK
+     */
+    @GET
+    @Path("/session/{mac}")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getSubscriber(@PathParam("mac") String mac) {
+        MacAddress macAddress = MacAddress.valueOf(mac);
+        PppoeAgentService pppoeAgent = get(PppoeAgentService.class);
+        PppoeSessionInfo entry = pppoeAgent.getSessionsMap().get(macAddress);
+        if (entry == null) {
+            throw new ItemNotFoundException(SESSION_NOT_FOUND);
+        }
+
+        try {
+            node.add(encodePppoeSessionInfo(entry, macAddress));
+            return ok(mapper().writeValueAsString(root)).build();
+        } catch (IllegalArgumentException e) {
+            log.error("Error while fetching PPPoE session info for MAC {} through REST API: {}", mac, e.getMessage());
+            return Response.status(INTERNAL_SERVER_ERROR).build();
+        } catch (JsonProcessingException e) {
+            log.error("Error assembling JSON response for PPPoE session info request for MAC {} " +
+                    "through REST API: {}", mac, e.getMessage());
+            return Response.status(INTERNAL_SERVER_ERROR).build();
+        }
+    }
+
+    /**
+     * Get all session info objects.
+     *
+     * @return 200 OK
+     */
+    @GET
+    @Path("/session")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getSubscribers() {
+        try {
+            PppoeAgentService pppoeAgent = get(PppoeAgentService.class);
+            pppoeAgent.getSessionsMap().forEach((mac, entry) -> {
+                node.add(encodePppoeSessionInfo(entry, mac));
+            });
+
+            return ok(mapper().writeValueAsString(root)).build();
+        } catch (Exception e) {
+            log.error("Error while fetching PPPoE sessions information through REST API: {}", e.getMessage());
+            return Response.status(INTERNAL_SERVER_ERROR).build();
+        }
+    }
+
+    private ObjectNode encodePppoeSessionInfo(PppoeSessionInfo entry, MacAddress macAddress) {
+        ConnectPoint cp = entry.getClientCp();
+        Port devicePort = deviceService.getPort(cp);
+        String portLabel = "uni-" + ((cp.port().toLong() & 0xF) + 1);
+        String subscriberId = devicePort != null ? devicePort.annotations().value(AnnotationKeys.PORT_NAME) :
+                "UNKNOWN";
+
+        return mapper().createObjectNode()
+                .put("macAddress", macAddress.toString())
+                .put("sessionId", entry.getSessionId())
+                .put("currentState", entry.getCurrentState())
+                .put("lastReceivedPacket", PPPoED.Type.getTypeByValue(entry.getPacketCode()).name())
+                .put("deviceId", cp.deviceId().toString())
+                .put("portNumber", cp.port().toString())
+                .put("portLabel", portLabel)
+                .put("subscriberId", subscriberId);
+    }
+
+    /**
+     * Gets PPPoE Agent counters for global context.
+     *
+     * @return 200 OK
+     */
+    @GET
+    @Path("/stats")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getPppoeStats() {
+        return getStats(PppoeAgentEvent.GLOBAL_COUNTER);
+    }
+
+    /**
+     * Gets PPPoE Agent counters for specific subscriber.
+     *
+     * @param subscriberId Id of subscriber.
+     *
+     * @return 200 OK
+     */
+    @GET
+    @Path("/stats/{subscriberId}")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getPppoeSubscriberStats(@PathParam("subscriberId") String subscriberId) {
+        return getStats(subscriberId);
+    }
+    private Response getStats(String key) {
+        PppoeAgentCountersStore pppoeCounters = get(PppoeAgentCountersStore.class);
+        try {
+            PppoeAgentStatistics pppoeStatistics = pppoeCounters.getCounters();
+            JsonNode completeNode = buildPppoeCounterNodeObject(key, pppoeStatistics.counters());
+            return ok(mapper().writeValueAsString(completeNode)).build();
+        } catch (JsonProcessingException e) {
+            log.error("Error while fetching PPPoE agent counter stats through REST API: {}", e.getMessage());
+            return Response.status(INTERNAL_SERVER_ERROR).build();
+        }
+    }
+
+    private JsonNode buildPppoeCounterNodeObject(String key, Map<PppoeAgentCountersIdentifier, Long> countersMap) {
+        ObjectNode entryNode = mapper().createObjectNode();
+        for (PppoeAgentCounterNames counterType : PppoeAgentCounterNames.SUPPORTED_COUNTERS) {
+            Long value = countersMap.get(new PppoeAgentCountersIdentifier(key, counterType));
+            if (value == null) {
+                continue;
+            }
+            entryNode = entryNode.put(counterType.name(), String.valueOf(value));
+        }
+        return mapper().createObjectNode().set(key, entryNode);
+    }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/rest/package-info.java b/app/src/main/java/org/opencord/pppoeagent/rest/package-info.java
new file mode 100644
index 0000000..70a18d3
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/rest/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Rest interface for PppoeAgent.
+ */
+package org.opencord.pppoeagent.rest;
diff --git a/app/src/main/webapp/WEB-INF/web.xml b/app/src/main/webapp/WEB-INF/web.xml
new file mode 100644
index 0000000..9bb27a0
--- /dev/null
+++ b/app/src/main/webapp/WEB-INF/web.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright 2021-present Open Networking Foundation
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee"
+         xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
+         xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
+         id="ONOS" version="2.5">
+    <display-name>PPPoE Agent REST API v1.0</display-name>
+
+    <servlet>
+        <servlet-name>JAX-RS Service</servlet-name>
+        <servlet-class>org.glassfish.jersey.servlet.ServletContainer</servlet-class>
+        <init-param>
+            <param-name>jersey.config.server.provider.classnames</param-name>
+            <param-value>
+                org.opencord.pppoeagent.rest.PppoeAgentWebResource
+            </param-value>
+        </init-param>
+        <load-on-startup>1</load-on-startup>
+    </servlet>
+
+    <servlet-mapping>
+        <servlet-name>JAX-RS Service</servlet-name>
+        <url-pattern>/*</url-pattern>
+    </servlet-mapping>
+</web-app>
diff --git a/app/src/test/java/org/opencord/pppoeagent/impl/PppoeAgentTest.java b/app/src/test/java/org/opencord/pppoeagent/impl/PppoeAgentTest.java
new file mode 100644
index 0000000..6298acf
--- /dev/null
+++ b/app/src/test/java/org/opencord/pppoeagent/impl/PppoeAgentTest.java
@@ -0,0 +1,421 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.pppoeagent.impl;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.onlab.junit.TestUtils;
+import org.onlab.osgi.ComponentContextAdapter;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.PPPoED;
+import org.onlab.packet.PPPoEDTag;
+import org.onlab.packet.VlanId;
+
+import org.onosproject.net.Device;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterServiceAdapter;
+import org.onosproject.cluster.LeadershipServiceAdapter;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.TestStorageService;
+
+import org.opencord.pppoeagent.PppoeAgentEvent;
+import org.opencord.pppoeagent.PPPoEDVendorSpecificTag;
+import org.opencord.pppoeagent.PppoeSessionInfo;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
+
+import static org.easymock.EasyMock.createMock;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+public class PppoeAgentTest extends PppoeAgentTestBase {
+    private PppoeAgent pppoeAgent;
+    private SimplePppoeAgentCountersStore store;
+
+    ComponentConfigService mockConfigService =
+            createMock(ComponentConfigService.class);
+
+    /**
+     * Sets up the services required by the PPPoE agent app.
+     */
+    @Before
+    public void setUp() {
+        pppoeAgent = new PppoeAgent();
+        pppoeAgent.cfgService = new MockNetworkConfigRegistry();
+        pppoeAgent.coreService = new MockCoreServiceAdapter();
+        pppoeAgent.packetService = new MockPacketService();
+        pppoeAgent.componentConfigService = mockConfigService;
+        pppoeAgent.deviceService = new MockDeviceService();
+        pppoeAgent.sadisService = new MockSadisService();
+        pppoeAgent.subsService = pppoeAgent.sadisService.getSubscriberInfoService();
+        pppoeAgent.mastershipService = new MockMastershipService();
+        pppoeAgent.storageService = new TestStorageService();
+        pppoeAgent.pppoeAgentCounters = this.store;
+
+        store = new SimplePppoeAgentCountersStore();
+        store.storageService = new TestStorageService();
+        store.clusterService = new ClusterServiceAdapter();
+        store.leadershipService = new LeadershipServiceAdapter();
+        store.clusterCommunicationService = new MockClusterCommunicationService<>();
+        store.componentConfigService = mockConfigService;
+        TestUtils.setField(store, "eventDispatcher", new MockEventDispatcher());
+        store.activate(new MockComponentContext());
+
+        pppoeAgent.pppoeAgentCounters = this.store;
+
+        TestUtils.setField(pppoeAgent, "eventDispatcher", new MockEventDispatcher());
+        TestUtils.setField(pppoeAgent, "packetProcessorExecutor", MoreExecutors.newDirectExecutorService());
+
+        pppoeAgent.activate(new ComponentContextAdapter());
+    }
+
+    /**
+     * Tears down the PPPoE agent app.
+     */
+    @After
+    public void tearDown() {
+        pppoeAgent.deactivate();
+    }
+
+    @Test
+    public void testPppoePadi() {
+        testPppoeUpstreamPacket(PPPoED.PPPOED_CODE_PADI);
+    }
+
+    @Test
+    public void testPppoePado() {
+        testPppoeDownstreamPacket(PPPoED.PPPOED_CODE_PADO);
+    }
+
+    @Test
+    public void testPppoePadr() {
+        testPppoeUpstreamPacket(PPPoED.PPPOED_CODE_PADR);
+    }
+
+    @Test
+    public void testPppoePads() {
+        testPppoeDownstreamPacket(PPPoED.PPPOED_CODE_PADS);
+    }
+
+    @Test
+    public void testPppoePadt() {
+        // To simulate a successful PADT from subscriber a session entry is needed.
+        PppoeSessionInfo sessionInfo = new PppoeSessionInfo(DEFAULT_CONNECT_POINT, SERVER_CONNECT_POINT,
+                                                            PPPoED.PPPOED_CODE_PADS, (short) 1,
+                                                            pppoeAgent.subsService.get(CLIENT_NAS_PORT_ID),
+                                                            CLIENT_MAC);
+        putInfoOnSessionMap(CLIENT_MAC, sessionInfo);
+        assertTrue(pppoeAgent.getSessionsMap().containsKey(CLIENT_MAC));
+
+        Ethernet padt = constructPppoedPacket(PPPoED.PPPOED_CODE_PADT, CLIENT_MAC, MacAddress.BROADCAST,
+                                              CLIENT_C_TAG, CLIENT_S_TAG, (short) 1);
+        sendPacket(padt, DEFAULT_CONNECT_POINT);
+        Ethernet processedPadt = (Ethernet) getPacket();
+        assertNotNull(processedPadt);
+        assertEquals(padt, processedPadt);
+        assertFalse(pppoeAgent.getSessionsMap().containsKey(CLIENT_MAC));
+        PppoeAgentEvent e = getEvent();
+        assertNotNull(e);
+        assertEquals(PppoeAgentEvent.Type.TERMINATE, e.type());
+        assertEquals(DEFAULT_CONNECT_POINT, e.getConnectPoint());
+        assertEquals(CLIENT_MAC, e.getSubscriberMacAddress());
+
+        // Simulating PADT from server.
+        putInfoOnSessionMap(CLIENT_MAC, sessionInfo);
+        assertTrue(pppoeAgent.getSessionsMap().containsKey(CLIENT_MAC));
+        padt = constructPppoedPacket(PPPoED.PPPOED_CODE_PADT, SERVER_MAC, CLIENT_MAC,
+                                     CLIENT_C_TAG, CLIENT_S_TAG, (short) 1);
+        sendPacket(padt, SERVER_CONNECT_POINT);
+        processedPadt = (Ethernet) getPacket();
+        assertNotNull(processedPadt);
+        assertEquals(padt, processedPadt);
+        assertFalse(pppoeAgent.getSessionsMap().containsKey(CLIENT_MAC));
+        e = getEvent();
+        assertNotNull(e);
+        assertEquals(PppoeAgentEvent.Type.TERMINATE, e.type());
+        assertEquals(SERVER_CONNECT_POINT, e.getConnectPoint());
+        assertEquals(CLIENT_MAC, e.getSubscriberMacAddress());
+
+        // Simulating PADT from client for unknown session (the packet must not be processed)..
+        padt = constructPppoedPacket(PPPoED.PPPOED_CODE_PADT, CLIENT_MAC, MacAddress.BROADCAST,
+                                     CLIENT_C_TAG, CLIENT_S_TAG, (short) 1);
+        sendPacket(padt, DEFAULT_CONNECT_POINT);
+        processedPadt = (Ethernet) getPacket();
+        assertNull(processedPadt);
+
+        // Simulating PADT from server for unknown session (the packet must not be processed).
+        padt = constructPppoedPacket(PPPoED.PPPOED_CODE_PADT, SERVER_MAC, CLIENT_MAC,
+                                     CLIENT_C_TAG, CLIENT_S_TAG, (short) 1);
+        sendPacket(padt, SERVER_CONNECT_POINT);
+        processedPadt = (Ethernet) getPacket();
+        assertNull(processedPadt);
+    }
+
+    @Test
+    public void testPppoeCircuitIdValidation() {
+        Ethernet packet = constructPppoedPacket(PPPoED.PPPOED_CODE_PADI, CLIENT_MAC, MacAddress.BROADCAST,
+                                                CLIENT_C_TAG, CLIENT_S_TAG, (short) 0);
+        // Send packet with a different port number, so there will be a circuit-id mismatch.
+        sendPacket(packet, new ConnectPoint(DEVICE_ID, PortNumber.portNumber(4096L)));
+        Ethernet processedPacket = (Ethernet) getPacket();
+        assertNull(processedPacket);
+        PppoeAgentEvent e = getEvent();
+        assertNotNull(e);
+        assertEquals(PppoeAgentEvent.Type.INVALID_CID, e.type());
+
+        // Now send it from the default connect point, which should generate the valid circuit-id.
+        sendPacket(packet, DEFAULT_CONNECT_POINT);
+        processedPacket = (Ethernet) getPacket();
+        assertNotNull(processedPacket);
+        PPPoED pppoeLayer = (PPPoED) processedPacket.getPayload();
+        assertNotNull(pppoeLayer);
+        PPPoEDTag tag = pppoeLayer.getTag(PPPoEDTag.PPPOED_TAG_VENDOR_SPECIFIC);
+        assertNotNull(tag);
+        PPPoEDVendorSpecificTag vendorSpecificTag = PPPoEDVendorSpecificTag.fromByteArray(tag.getValue());
+        assertNotNull(vendorSpecificTag);
+
+        // Checks if the configured circuit-id matches with the built one.
+        assertEquals(CLIENT_CIRCUIT_ID, vendorSpecificTag.getCircuitId());
+    }
+
+    @Test
+    public void testPppoeCounters() {
+        short sessionId = (short) 0;
+        Ethernet padi = constructPppoedPacket(PPPoED.PPPOED_CODE_PADI, CLIENT_MAC, MacAddress.BROADCAST,
+                                              CLIENT_C_TAG, CLIENT_S_TAG, sessionId);
+        Ethernet pado = constructPppoedPacket(PPPoED.PPPOED_CODE_PADO, SERVER_MAC, CLIENT_MAC,
+                                              CLIENT_C_TAG, CLIENT_S_TAG, sessionId);
+        Ethernet padr = constructPppoedPacket(PPPoED.PPPOED_CODE_PADR, CLIENT_MAC, MacAddress.BROADCAST,
+                                              CLIENT_C_TAG, CLIENT_S_TAG, sessionId);
+        sessionId++;
+        Ethernet pads = constructPppoedPacket(PPPoED.PPPOED_CODE_PADS, SERVER_MAC, CLIENT_MAC,
+                                              CLIENT_C_TAG, CLIENT_S_TAG, sessionId);
+
+        List.of(new CounterTester(PppoeAgentCounterNames.PADI, 6, padi, DEFAULT_CONNECT_POINT),
+                new CounterTester(PppoeAgentCounterNames.PADO, 2, pado, SERVER_CONNECT_POINT),
+                new CounterTester(PppoeAgentCounterNames.PADR, 5, padr, DEFAULT_CONNECT_POINT),
+                new CounterTester(PppoeAgentCounterNames.PADS, 3, pads, SERVER_CONNECT_POINT),
+                new CounterTester(PppoeAgentCounterNames.PPPOED_PACKETS_FROM_SERVER, 5, null, null),
+                new CounterTester(PppoeAgentCounterNames.PPPOED_PACKETS_TO_SERVER, 11, null, null),
+                new CounterTester(PppoeAgentCounterNames.AC_SYSTEM_ERROR, 0, null, null),
+                new CounterTester(PppoeAgentCounterNames.GENERIC_ERROR_FROM_CLIENT, 0, null, null),
+                new CounterTester(PppoeAgentCounterNames.GENERIC_ERROR_FROM_SERVER, 0, null, null),
+                new CounterTester(PppoeAgentCounterNames.MTU_EXCEEDED, 0, null, null),
+                new CounterTester(PppoeAgentCounterNames.SERVICE_NAME_ERROR, 0, null, null))
+        .forEach(CounterTester::test);
+    }
+
+    @Test
+    public void testSessionsMap() {
+        assertEquals(0, pppoeAgent.getSessionsMap().size());
+        Ethernet packet = constructPppoedPacket(PPPoED.PPPOED_CODE_PADI, CLIENT_MAC, MacAddress.BROADCAST,
+                                                CLIENT_C_TAG, CLIENT_S_TAG, (short) 0);
+        sendPacket(packet, DEFAULT_CONNECT_POINT);
+        assertEquals(1, pppoeAgent.getSessionsMap().size());
+
+        int randomPacketsNumber = 15;
+        sendMultiplePadi(randomPacketsNumber);
+        assertEquals(randomPacketsNumber + 1, pppoeAgent.getSessionsMap().size());
+        PppoeSessionInfo sessionInfo = pppoeAgent.getSessionsMap().get(CLIENT_MAC);
+        assertSessionInfo(sessionInfo, PPPoED.PPPOED_CODE_PADI, (short) 0);
+
+        packet = constructPppoedPacket(PPPoED.PPPOED_CODE_PADT, CLIENT_MAC, MacAddress.BROADCAST,
+                                       CLIENT_C_TAG, CLIENT_S_TAG, (short) 0);
+        sendPacket(packet, DEFAULT_CONNECT_POINT);
+
+        assertEquals(randomPacketsNumber, pppoeAgent.getSessionsMap().size());
+    }
+
+    @Test
+    public void testDeviceEvents() {
+        // Guarantee map is empty.
+        assertEquals(0, pppoeAgent.getSessionsMap().size());
+
+        // Fill sessionsMap by sending 10 PADI packets for random mac addresses.
+        int numPackets = 10;
+        sendMultiplePadi(numPackets);
+        assertEquals(numPackets, pppoeAgent.getSessionsMap().size());
+
+        // Generate PORT_REMOVED event and inject into the device listener.
+        DeviceListener deviceListener = TestUtils.getField(pppoeAgent, "deviceListener");
+        Device device = pppoeAgent.deviceService.getDevice(DEVICE_ID);
+        DeviceEvent deviceEvent = new DeviceEvent(DeviceEvent.Type.PORT_REMOVED,
+                                                  device,
+                                                  new MockPort(PortNumber.portNumber(1L)));
+        deviceListener.event(deviceEvent);
+
+        // Check if session map is empty again.
+        assertEquals(0, pppoeAgent.getSessionsMap().size());
+
+        // Perform the same test but for PORT_UPDATED event.
+        sendMultiplePadi(numPackets);
+        assertEquals(numPackets, pppoeAgent.getSessionsMap().size());
+        deviceEvent = new DeviceEvent(DeviceEvent.Type.PORT_UPDATED,
+                                      device,
+                                      new MockPort(PortNumber.portNumber(1L), false));
+        deviceListener.event(deviceEvent);
+        assertEquals(0, pppoeAgent.getSessionsMap().size());
+
+        // Same test for DEVICE_REMOVED.
+        sendMultiplePadi(numPackets);
+        assertEquals(numPackets, pppoeAgent.getSessionsMap().size());
+        deviceEvent = new DeviceEvent(DeviceEvent.Type.DEVICE_REMOVED, device, null);
+        deviceListener.event(deviceEvent);
+        assertEquals(0, pppoeAgent.getSessionsMap().size());
+    }
+
+    private void sendMultiplePadi(int num) {
+        for (int i = 0; i < num; i++) {
+            MacAddress macAddress;
+            // A trick to guarantee the Mac address won't repeat (this case may never occur).
+            do {
+                macAddress = randomizeMacAddress();
+            } while (pppoeAgent.getSessionsMap().containsKey(macAddress));
+
+            Ethernet packet = constructPppoedPacket(PPPoED.PPPOED_CODE_PADI, macAddress, MacAddress.BROADCAST,
+                    CLIENT_C_TAG, CLIENT_S_TAG, (short) 0);
+            sendPacket(packet, DEFAULT_CONNECT_POINT);
+        }
+    }
+
+    private void testPppoeUpstreamPacket(byte packetCode) {
+        Ethernet packet = constructPppoedPacket(packetCode, CLIENT_MAC, MacAddress.BROADCAST,
+                                                CLIENT_C_TAG, CLIENT_S_TAG, (short) 0);
+        sendPacket(packet, DEFAULT_CONNECT_POINT);
+
+        Ethernet processedPacket = (Ethernet) getPacket();
+        assertNotNull(processedPacket);
+
+        PPPoED pppoedLayer = (PPPoED) processedPacket.getPayload();
+        assertNotNull(pppoedLayer);
+
+        List<PPPoEDTag> pppoedTagList = pppoedLayer.getTags();
+        assertEquals(1, pppoedTagList.size());
+
+        PPPoEDTag ppPoEDTag = pppoedTagList.get(0);
+        assertEquals(PPPoEDTag.PPPOED_TAG_VENDOR_SPECIFIC, ppPoEDTag.getType());
+
+        PPPoEDVendorSpecificTag vendorSpecificTag = PPPoEDVendorSpecificTag.fromByteArray(ppPoEDTag.getValue());
+        assertEquals(CLIENT_CIRCUIT_ID, vendorSpecificTag.getCircuitId());
+        assertEquals(CLIENT_REMOTE_ID, vendorSpecificTag.getRemoteId());
+        assertEquals(Integer.valueOf(PPPoEDVendorSpecificTag.BBF_IANA_VENDOR_ID), vendorSpecificTag.getVendorId());
+
+        // The only difference between the original and processed is the tag list,
+        // so after removing it the packets must be equal.
+        pppoedLayer.setPayload(null);
+        pppoedLayer.setPayloadLength((short) 0);
+        processedPacket.setPayload(pppoedLayer);
+        assertEquals(packet, processedPacket);
+
+        PppoeSessionInfo sessionInfo = pppoeAgent.getSessionsMap().get(CLIENT_MAC);
+        assertNotNull(sessionInfo);
+        assertSessionInfo(sessionInfo, packetCode, (short) 0);
+    }
+
+    private void testPppoeDownstreamPacket(byte packetCode) {
+        // Simulating a session entry of a previous packet.
+        byte previousPacketCode = packetCode == PPPoED.PPPOED_CODE_PADO ? PPPoED.PPPOED_CODE_PADI :
+                                                                         PPPoED.PPPOED_CODE_PADR;
+
+        SubscriberAndDeviceInformation deviceInfo = pppoeAgent.subsService.get(CLIENT_NAS_PORT_ID);
+
+        putInfoOnSessionMap(CLIENT_MAC, new PppoeSessionInfo(DEFAULT_CONNECT_POINT, SERVER_CONNECT_POINT,
+                                                             previousPacketCode, (short) 0, deviceInfo, CLIENT_MAC));
+
+        short sessionId = (short) (packetCode == PPPoED.PPPOED_CODE_PADS ? 1 : 0);
+        Ethernet packet = constructPppoedPacket(packetCode, SERVER_MAC, CLIENT_MAC,
+                                                CLIENT_C_TAG, CLIENT_S_TAG, sessionId);
+        sendPacket(packet, SERVER_CONNECT_POINT);
+
+        Ethernet processedPacket = (Ethernet) getPacket();
+        assertNotNull(processedPacket);
+
+        // sTag is removed before sending the packet to client.
+        packet.setQinQVID(VlanId.UNTAGGED);
+        assertEquals(packet, processedPacket);
+
+        PppoeSessionInfo sessionInfo = pppoeAgent.getSessionsMap().get(CLIENT_MAC);
+        assertNotNull(sessionInfo);
+        assertSessionInfo(sessionInfo, packetCode, sessionId);
+    }
+
+    private void assertSessionInfo(PppoeSessionInfo sessionInfo, Byte packetCode, short sessionId) {
+        assertEquals(packetCode, sessionInfo.getPacketCode());
+        assertEquals(sessionId, sessionInfo.getSessionId());
+        assertEquals(DEFAULT_CONNECT_POINT, sessionInfo.getClientCp());
+        assertEquals(CLIENT_MAC, sessionInfo.getClientMac());
+    }
+
+    private void putInfoOnSessionMap(MacAddress key, PppoeSessionInfo sessionInfo) {
+        ConsistentMap<MacAddress, PppoeSessionInfo> sessionsMap = TestUtils.getField(pppoeAgent, "sessionsMap");
+        sessionsMap.put(key, sessionInfo);
+    }
+
+    class CounterTester {
+        CounterTester(String subscriber, PppoeAgentCounterNames counter,
+                      long expectedValue, Ethernet packetModel,
+                      ConnectPoint cp) {
+            this.subscriber = subscriber;
+            this.counter = counter;
+            this.expectedValue = expectedValue;
+            this.packetModel = packetModel;
+            this.cp = cp;
+        }
+
+        CounterTester(PppoeAgentCounterNames counter, long expectedValue,
+                      Ethernet packetModel, ConnectPoint cp) {
+            this(PppoeAgentEvent.GLOBAL_COUNTER, counter, expectedValue, packetModel, cp);
+        }
+
+        String subscriber;
+        PppoeAgentCounterNames counter;
+        long expectedValue;
+        Ethernet packetModel;
+        ConnectPoint cp;
+
+        void sendModel() {
+            for (int i = 0; i < expectedValue; i++) {
+                sendPacket(packetModel, cp);
+            }
+        }
+
+        void assertCounterValue() {
+            long actualValue = store.getCountersMap()
+                    .get(new PppoeAgentCountersIdentifier(subscriber, counter));
+            assertEquals(expectedValue, actualValue);
+        }
+
+        void test() {
+            if (packetModel != null && cp != null) {
+                sendModel();
+            }
+            assertCounterValue();
+        }
+    }
+}
diff --git a/app/src/test/java/org/opencord/pppoeagent/impl/PppoeAgentTestBase.java b/app/src/test/java/org/opencord/pppoeagent/impl/PppoeAgentTestBase.java
new file mode 100644
index 0000000..59dc216
--- /dev/null
+++ b/app/src/test/java/org/opencord/pppoeagent/impl/PppoeAgentTestBase.java
@@ -0,0 +1,751 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.pppoeagent.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.TimeUnit;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.time.Duration;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.onlab.packet.BasePacket;
+import org.onlab.packet.ChassisId;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.Ip4Address;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.PPPoED;
+import org.onlab.packet.VlanId;
+
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreServiceAdapter;
+import org.onosproject.core.DefaultApplicationId;
+import org.onosproject.event.DefaultEventSinkRegistry;
+import org.onosproject.event.Event;
+import org.onosproject.event.EventDeliveryService;
+import org.onosproject.event.EventSink;
+import org.onosproject.mastership.MastershipServiceAdapter;
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.Annotations;
+import org.onosproject.net.config.Config;
+import org.onosproject.net.config.NetworkConfigRegistryAdapter;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.DefaultDevice;
+import org.onosproject.net.Device;
+import org.onosproject.net.device.DeviceServiceAdapter;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Element;
+import org.onosproject.net.packet.DefaultInboundPacket;
+import org.onosproject.net.packet.DefaultPacketContext;
+import org.onosproject.net.packet.InboundPacket;
+import org.onosproject.net.packet.OutboundPacket;
+import org.onosproject.net.packet.PacketContext;
+import org.onosproject.net.packet.PacketProcessor;
+import org.onosproject.net.packet.PacketServiceAdapter;
+import org.onosproject.net.provider.ProviderId;
+import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+
+import org.opencord.pppoeagent.PppoeAgentEvent;
+import org.opencord.sadis.BandwidthProfileInformation;
+import org.opencord.sadis.BaseInformationService;
+import org.opencord.sadis.SadisService;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
+import org.opencord.sadis.UniTagInformation;
+
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.ComponentInstance;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Common methods for PpppoeAgent app tests.
+ */
+public class PppoeAgentTestBase {
+    static final int UPLINK_PORT = 5;
+    static final String OLT_DEV_ID = "of:00000000000000aa";
+    static final String OLT_SERIAL_NUMBER = "OLT123456789";
+
+    static final VlanId CLIENT_C_TAG = VlanId.vlanId((short) 999);
+    static final VlanId CLIENT_C_TAG_2 = VlanId.vlanId((short) 998);
+    static final VlanId CLIENT_S_TAG = VlanId.vlanId((short) 111);
+    static final String CLIENT_ID_1 = "SUBSCRIBER_ID_1";
+    static final short CLIENT_C_PBIT = 6;
+    static final String CLIENT_NAS_PORT_ID = "ONU123456789";
+    static final String CLIENT_REMOTE_ID = "remote0";
+
+    static final MacAddress CLIENT_MAC = MacAddress.valueOf("B8:26:D4:09:E5:D1");
+    static final MacAddress SERVER_MAC = MacAddress.valueOf("74:86:7A:FB:07:86");
+    static final MacAddress OLT_MAC_ADDRESS = MacAddress.valueOf("01:02:03:04:05:06");
+
+    static final ConnectPoint DEFAULT_CONNECT_POINT = ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1);
+    static final String CLIENT_CIRCUIT_ID = String.format("%s 0/%s:%s", OLT_SERIAL_NUMBER,
+                                                         (DEFAULT_CONNECT_POINT.port().toLong() >> 12) + 1,
+                                                          CLIENT_NAS_PORT_ID);
+
+    static final DeviceId DEVICE_ID = DeviceId.deviceId(OLT_DEV_ID);
+    static final String SCHEME_NAME = "pppoeagent";
+
+    static final ConnectPoint SERVER_CONNECT_POINT = ConnectPoint.deviceConnectPoint("of:00000000000000aa/5");
+
+    static final DefaultAnnotations DEVICE_ANNOTATIONS = DefaultAnnotations.builder()
+            .set(AnnotationKeys.PROTOCOL, SCHEME_NAME.toUpperCase()).build();
+
+    List<BasePacket> savedPackets = new LinkedList<>();
+    PacketProcessor packetProcessor;
+
+    /**
+     * Saves the given packet onto the saved packets list.
+     *
+     * @param packet packet to save
+     */
+    void savePacket(BasePacket packet) {
+        savedPackets.add(packet);
+    }
+
+    /**
+     * Gets and removes the packet in the 1st position of savedPackets list.
+     *
+     * @return the packet in 1st position of savedPackets list.
+     */
+    BasePacket getPacket() {
+        return savedPackets.size() > 0 ? savedPackets.remove(0) : null;
+    }
+
+    /**
+     * Gets the last generated event.
+     *
+     * @return the last generated pppoe agent event.
+     */
+    PppoeAgentEvent getEvent() {
+        List<Event> savedEvents = MockEventDispatcher.eventList;
+        return savedEvents.size() > 0 ? (PppoeAgentEvent) savedEvents.get(savedEvents.size() - 1) : null;
+    }
+
+    /**
+     * Sends an Ethernet packet to the process method of the Packet Processor.
+     *
+     * @param pkt Ethernet packet
+     * @param cp ConnectPoint cp
+     */
+    void sendPacket(Ethernet pkt, ConnectPoint cp) {
+        final ByteBuffer byteBuffer = ByteBuffer.wrap(pkt.serialize());
+        InboundPacket inPacket = new DefaultInboundPacket(cp, pkt, byteBuffer);
+
+        PacketContext context = new MockPacketContext(127L, inPacket, null, false);
+        packetProcessor.process(context);
+    }
+
+    /**
+     * Mocks core service adaptor that provides an appId.
+     */
+    class MockCoreServiceAdapter extends CoreServiceAdapter {
+
+        @Override
+        public ApplicationId registerApplication(String name) {
+            return new DefaultApplicationId(10, name);
+        }
+    }
+
+    /**
+     * Mocks device service to provide the of device object.
+     */
+    class MockDeviceService extends DeviceServiceAdapter {
+
+        private ProviderId providerId = new ProviderId("of", "foo");
+        private final Device device1 = new PppoeAgentTestBase.MockDevice(providerId, DEVICE_ID, Device.Type.SWITCH,
+                "foo.inc", "0", "0", OLT_SERIAL_NUMBER, new ChassisId(),
+                DEVICE_ANNOTATIONS);
+
+        @Override
+        public Device getDevice(DeviceId devId) {
+            return device1;
+        }
+
+        @Override
+        public Port getPort(ConnectPoint cp) {
+            return new PppoeAgentTestBase.MockPort(cp.port());
+        }
+
+        @Override
+        public Port getPort(DeviceId deviceId, PortNumber portNumber) {
+            return new PppoeAgentTestBase.MockPort(portNumber);
+        }
+
+        @Override
+        public boolean isAvailable(DeviceId d) {
+            return true;
+        }
+    }
+
+    /**
+     * Mocks device object.
+     */
+    class MockDevice extends DefaultDevice {
+
+        public MockDevice(ProviderId providerId, DeviceId id, Type type,
+                          String manufacturer, String hwVersion, String swVersion,
+                          String serialNumber, ChassisId chassisId, Annotations... annotations) {
+            super(providerId, id, type, manufacturer, hwVersion, swVersion, serialNumber,
+                    chassisId, annotations);
+        }
+    }
+
+    /**
+     * Mocks port object.
+     */
+    class  MockPort implements Port {
+        private PortNumber portNumber;
+        private boolean isEnabled = true;
+        MockPort(PortNumber portNumber) {
+            this.portNumber = portNumber;
+        }
+
+        MockPort(PortNumber portNumber, boolean isEnabled) {
+            this.portNumber = portNumber;
+            this.isEnabled = isEnabled;
+        }
+
+        @Override
+        public boolean isEnabled() {
+            return this.isEnabled;
+        }
+        @Override
+        public long portSpeed() {
+            return 1000;
+        }
+        @Override
+        public Element element() {
+            return null;
+        }
+        @Override
+        public PortNumber number() {
+            return this.portNumber;
+        }
+        @Override
+        public Annotations annotations() {
+            return new MockAnnotations();
+        }
+        @Override
+        public Type type() {
+            return Port.Type.FIBER;
+        }
+
+        private class MockAnnotations implements Annotations {
+
+            @Override
+            public String value(String val) {
+                return CLIENT_NAS_PORT_ID;
+            }
+            @Override
+            public Set<String> keys() {
+                return null;
+            }
+        }
+    }
+
+    /**
+     * Mocks mastership service to enable local-master operations.
+     */
+    class MockMastershipService extends MastershipServiceAdapter {
+        @Override
+        public boolean isLocalMaster(DeviceId d) {
+            return true;
+        }
+    }
+
+    /**
+     * Keeps a reference to the PacketProcessor and saves the OutboundPackets.
+     */
+    class MockPacketService extends PacketServiceAdapter {
+
+        @Override
+        public void addProcessor(PacketProcessor processor, int priority) {
+            packetProcessor = processor;
+        }
+
+        @Override
+        public void emit(OutboundPacket packet) {
+            try {
+                Ethernet eth = Ethernet.deserializer().deserialize(packet.data().array(),
+                        0, packet.data().array().length);
+                savePacket(eth);
+            } catch (Exception e) {
+                fail(e.getMessage());
+            }
+        }
+    }
+
+    /**
+     * Mocks Sadis service.
+     */
+    class MockSadisService implements SadisService {
+        @Override
+        public BaseInformationService<SubscriberAndDeviceInformation> getSubscriberInfoService() {
+            return new PppoeAgentTestBase.MockSubService();
+        }
+
+        @Override
+        public BaseInformationService<BandwidthProfileInformation> getBandwidthProfileService() {
+            return null;
+        }
+    }
+
+    /**
+     * Mocks Sadis content with a device and a subscriber entry.
+     */
+    class MockSubService implements BaseInformationService<SubscriberAndDeviceInformation> {
+
+        PppoeAgentTestBase.MockSubscriberAndDeviceInformation device =
+                new PppoeAgentTestBase.MockSubscriberAndDeviceInformation(OLT_DEV_ID, VlanId.NONE, VlanId.NONE,
+                                                                          VlanId.NONE, null, null,
+                                                                          OLT_MAC_ADDRESS,
+                                                                          Ip4Address.valueOf("10.10.10.10"),
+                                                                          UPLINK_PORT, null);
+
+        PppoeAgentTestBase.MockSubscriberAndDeviceInformation sub =
+                new PppoeAgentTestBase.MockSubscriberAndDeviceInformation(CLIENT_ID_1, CLIENT_C_TAG, CLIENT_C_TAG_2,
+                                                                          CLIENT_S_TAG, CLIENT_NAS_PORT_ID,
+                                                                          CLIENT_CIRCUIT_ID, null, null,
+                                                                -1, CLIENT_REMOTE_ID);
+
+        @Override
+        public SubscriberAndDeviceInformation get(String id) {
+            if (id.equals(OLT_SERIAL_NUMBER)) {
+                return device;
+            } else {
+                return  sub;
+            }
+        }
+
+        @Override
+        public void invalidateAll() {}
+        @Override
+        public void invalidateId(String id) {}
+        @Override
+        public SubscriberAndDeviceInformation getfromCache(String id) {
+            return null;
+        }
+    }
+
+    /**
+     * Mock Sadis object to populate service.
+     */
+    class MockSubscriberAndDeviceInformation extends SubscriberAndDeviceInformation {
+
+        MockSubscriberAndDeviceInformation(String id, VlanId cTag, VlanId cTag2,
+                                           VlanId sTag, String nasPortId,
+                                           String circuitId, MacAddress hardId,
+                                           Ip4Address ipAddress, int uplinkPort,
+                                           String remoteId) {
+            this.setHardwareIdentifier(hardId);
+            this.setId(id);
+            this.setIPAddress(ipAddress);
+            this.setNasPortId(nasPortId);
+            this.setCircuitId(circuitId);
+            this.setUplinkPort(uplinkPort);
+            this.setRemoteId(remoteId);
+
+            List<UniTagInformation> uniTagInformationList = new ArrayList<>();
+
+            UniTagInformation uniTagInformation = new UniTagInformation.Builder()
+                    .setPonCTag(cTag)
+                    .setPonSTag(sTag)
+                    .setUsPonCTagPriority(CLIENT_C_PBIT)
+                    .build();
+
+            UniTagInformation uniTagInformation2 = new UniTagInformation.Builder()
+                    .setPonCTag(cTag2)
+                    .setPonSTag(sTag)
+                    .setUsPonCTagPriority(CLIENT_C_PBIT)
+                    .build();
+
+            uniTagInformationList.add(uniTagInformation);
+            uniTagInformationList.add(uniTagInformation2);
+            this.setUniTagList(uniTagInformationList);
+        }
+    }
+
+    /**
+     * Mocks component context.
+     */
+    class MockComponentContext implements ComponentContext {
+        @Override
+        public Dictionary<String, Object> getProperties() {
+            Dictionary<String, Object> cfgDict = new Hashtable<String, Object>();
+            return cfgDict;
+        }
+
+        @Override
+        public Object locateService(String name) {
+            return null;
+        }
+
+        @Override
+        public Object locateService(String name, ServiceReference reference) {
+            return null;
+        }
+
+        @Override
+        public Object[] locateServices(String name) {
+            return null;
+        }
+
+        @Override
+        public BundleContext getBundleContext() {
+            return null;
+        }
+
+        @Override
+        public Bundle getUsingBundle() {
+            return null;
+        }
+
+        @Override
+        public ComponentInstance getComponentInstance() {
+            return null;
+        }
+
+        @Override
+        public void enableComponent(String name) {
+        }
+
+        @Override
+        public void disableComponent(String name) {
+        }
+
+        @Override
+        public ServiceReference getServiceReference() {
+            return null;
+        }
+    }
+
+    /**
+     * Mocks the network config registry.
+     */
+    @SuppressWarnings("unchecked")
+    class MockNetworkConfigRegistry
+            extends NetworkConfigRegistryAdapter {
+        @Override
+        public <S, C extends Config<S>> C getConfig(S subject, Class<C> configClass) {
+            PppoeAgentConfig pppoeAgentConfig = new MockPppoeAgentConfig();
+            return (C) pppoeAgentConfig;
+        }
+    }
+
+    /**
+     * Mocks the network config registry.
+     */
+    class MockPppoeAgentConfig extends PppoeAgentConfig {
+        @Override
+        public boolean getUseOltUplinkForServerPktInOut() {
+            return true;
+        }
+    }
+
+    /**
+     * Mocks the DefaultPacketContext.
+     */
+    final class MockPacketContext extends DefaultPacketContext {
+
+        private MockPacketContext(long time, InboundPacket inPkt,
+                                  OutboundPacket outPkt, boolean block) {
+            super(time, inPkt, outPkt, block);
+        }
+
+        @Override
+        public void send() {
+            // We don't send anything out.
+        }
+    }
+
+    /**
+     * Creates a mock for event delivery service.
+     */
+    static class MockEventDispatcher extends DefaultEventSinkRegistry
+            implements EventDeliveryService {
+        static List<Event> eventList = new LinkedList<>();
+        @Override
+        @SuppressWarnings("unchecked")
+        public synchronized void post(Event event) {
+            EventSink sink = getSink(event.getClass());
+            checkState(sink != null, "No sink for event %s", event);
+            sink.process(event);
+            eventList.add(event);
+        }
+
+        @Override
+        public void setDispatchTimeLimit(long millis) {
+        }
+
+        @Override
+        public long getDispatchTimeLimit() {
+            return 0;
+        }
+    }
+
+    /**
+     * Creates a mock object for a scheduled executor service.
+     */
+    class MockExecutor implements ScheduledExecutorService {
+        private ScheduledExecutorService executor;
+
+        MockExecutor(ScheduledExecutorService executor) {
+            this.executor = executor;
+        }
+
+        String lastMethodCalled = "";
+        long lastInitialDelay;
+        long lastDelay;
+        TimeUnit lastUnit;
+
+        public void assertLastMethodCalled(String method, long initialDelay, long delay, TimeUnit unit) {
+            assertEquals(method, lastMethodCalled);
+            assertEquals(initialDelay, lastInitialDelay);
+            assertEquals(delay, lastDelay);
+            assertEquals(unit, lastUnit);
+        }
+
+        @Override
+        public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+            lastMethodCalled = "scheduleRunnable";
+            lastDelay = delay;
+            lastUnit = unit;
+            return null;
+        }
+
+        @Override
+        public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+            lastMethodCalled = "scheduleCallable";
+            lastDelay = delay;
+            lastUnit = unit;
+            return null;
+        }
+
+        @Override
+        public ScheduledFuture<?> scheduleAtFixedRate(
+                Runnable command, long initialDelay, long period, TimeUnit unit) {
+            lastMethodCalled = "scheduleAtFixedRate";
+            lastInitialDelay = initialDelay;
+            lastDelay = period;
+            lastUnit = unit;
+            return null;
+        }
+
+        @Override
+        public ScheduledFuture<?> scheduleWithFixedDelay(
+                Runnable command, long initialDelay, long delay, TimeUnit unit) {
+            lastMethodCalled = "scheduleWithFixedDelay";
+            lastInitialDelay = initialDelay;
+            lastDelay = delay;
+            lastUnit = unit;
+            command.run();
+            return null;
+        }
+
+        @Override
+        public boolean awaitTermination(long timeout, TimeUnit unit) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+                throws InterruptedException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(
+                Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+                throws InterruptedException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+                throws ExecutionException, InterruptedException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+                throws ExecutionException, InterruptedException, TimeoutException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean isShutdown() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean isTerminated() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void shutdown() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public List<Runnable> shutdownNow() {
+            return null;
+        }
+
+        @Override
+        public <T> Future<T> submit(Callable<T> task) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Future<?> submit(Runnable task) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public <T> Future<T> submit(Runnable task, T result) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void execute(Runnable command) {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    /**
+     * Creates a PPPoED message encapsulated by an Ethernet object.
+     *
+     * @param type        PPoED message type.
+     * @param source      source address.
+     * @param destination destination address of the message.
+     * @param cVlan       inner vlan.
+     * @param sVlan       outer vlan.
+     * @param sessionId   session-id number.
+     *
+     * @return Ethernet packet with PPPoED payload.
+     */
+    Ethernet constructPppoedPacket(byte type, MacAddress source, MacAddress destination,
+                                   VlanId cVlan, VlanId sVlan, short sessionId) {
+        Ethernet pppoedPacket = new Ethernet();
+        pppoedPacket.setSourceMACAddress(source);
+        pppoedPacket.setDestinationMACAddress(destination);
+        pppoedPacket.setEtherType(Ethernet.TYPE_PPPOED);
+        pppoedPacket.setVlanID(cVlan.toShort());
+        pppoedPacket.setPriorityCode((byte) CLIENT_C_PBIT);
+        pppoedPacket.setQinQTPID(Ethernet.TYPE_VLAN);
+        pppoedPacket.setQinQVID(sVlan.toShort());
+
+        PPPoED pppoedLayer = new PPPoED();
+        pppoedLayer.setCode(type);
+        pppoedLayer.setSessionId(sessionId);
+        pppoedPacket.setPayload(pppoedLayer);
+
+        return pppoedPacket;
+    }
+
+    public class MockClusterCommunicationService<M> implements ClusterCommunicationService {
+        private Consumer handler;
+        @Override
+        public void addSubscriber(MessageSubject subject,
+                                  ClusterMessageHandler subscriber, ExecutorService executor) {
+        }
+        @Override
+        public <M> void broadcast(M message, MessageSubject subject, Function<M, byte[]> encoder) {
+        }
+        @Override
+        public <M> void broadcastIncludeSelf(M message, MessageSubject subject, Function<M, byte[]> encoder) {
+            handler.accept(message);
+        }
+        @Override
+        public <M> CompletableFuture<Void> unicast(M message, MessageSubject subject,
+                                                   Function<M, byte[]> encoder, NodeId toNodeId) {
+            return null;
+        }
+        @Override
+        public <M> void multicast(M message, MessageSubject subject,
+                                  Function<M, byte[]> encoder, Set<NodeId> nodeIds) {
+        }
+        @Override
+        public <M, R> CompletableFuture<R> sendAndReceive(M message, MessageSubject subject,
+                                                          Function<M, byte[]> encoder,
+                                                          Function<byte[], R> decoder, NodeId toNodeId) {
+            return null;
+        }
+        @Override
+        public <M, R> CompletableFuture<R> sendAndReceive(M message, MessageSubject subject,
+                                                          Function<M, byte[]> encoder, Function<byte[], R> decoder,
+                                                          NodeId toNodeId, Duration timeout) {
+            return null;
+        }
+        @Override
+        public <M, R> void addSubscriber(MessageSubject subject, Function<byte[], M> decoder,
+                                         Function<M, R> handler, Function<R, byte[]> encoder, Executor executor) {
+        }
+        @Override
+        public <M, R> void addSubscriber(MessageSubject subject, Function<byte[], M> decoder,
+                                         Function<M, CompletableFuture<R>> handler, Function<R, byte[]> encoder) {
+        }
+        @Override
+        public <M> void addSubscriber(MessageSubject subject, Function<byte[], M> decoder,
+                                      Consumer<M> handler, Executor executor) {
+            this.handler = handler;
+        }
+        @Override
+        public void removeSubscriber(MessageSubject subject) {
+        }
+    }
+
+    /**
+     * Creates a random MAC address.
+     *
+     * @return random MAC address object.
+     */
+    MacAddress randomizeMacAddress() {
+        byte[] mac = new byte[6];
+        Random r = new Random();
+        r.nextBytes(mac);
+        return MacAddress.valueOf(mac);
+    }
+}
\ No newline at end of file