[VOL-3661] PPPoE IA application - initial commit

Change-Id: Idaf23f8736cba955fe8a3049b8fc9c85b3cd3ab9
Signed-off-by: Gustavo Silva <gsilva@furukawalatam.com>
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..0b6585a
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,10 @@
+*~
+*.class
+.classpath
+.project
+.settings
+.checkstyle
+target
+*.iml
+.idea
+.DS_Store
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..6111801
--- /dev/null
+++ b/README.md
@@ -0,0 +1,10 @@
+PPPoEAgent : PPPoE Intermediate Agent application
+=================================================
+
+### What is PPPoEAgent?
+As described in TR-101 Issue 2 - 3.9.2, the PPPoE Intermediate Agent supports the PPPoE access method and is a function placed on the Access Node in order to insert access loop identification.
+
+### Description
+The PPPoE Intermediate Agent intercepts all upstream PPPoE discovery stage packets, i.e. the PADI, PADR and upstream PADT packets, but does not modify the source or destination MAC address of these PPPoE discovery packets.
+Upon receipt of a PADI or PADR packet sent by the PPPoE client, the Intermediate Agent adds a PPPoE TAG to the packet to be sent upstream. The TAG contains the identification of the access loop on which the PADI or PADR packet was received.
+If a PADI or PADR packet exceeds the Ethernet MTU after adding the access loop identification TAG, the Intermediate Agent must drop the packet, and issue the corresponding PADO or PADS response with a Generic-Error TAG to the sender.
\ No newline at end of file
diff --git a/api/pom.xml b/api/pom.xml
new file mode 100644
index 0000000..96631bb
--- /dev/null
+++ b/api/pom.xml
@@ -0,0 +1,68 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright 2021-present Open Networking Foundation
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>pppoeagent</artifactId>
+        <groupId>org.opencord</groupId>
+        <version>1.0.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>pppoeagent-api</artifactId>
+    <packaging>bundle</packaging>
+    <description>PPPoE Intermediate Agent API</description>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-api</artifactId>
+            <version>${onos.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onlab-misc</artifactId>
+            <version>${onos.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.opencord</groupId>
+            <artifactId>sadis-api</artifactId>
+            <version>${sadis.api.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-cli</artifactId>
+            <version>${onos.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-source-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/api/src/main/java/org/opencord/pppoeagent/PPPoEDVendorSpecificTag.java b/api/src/main/java/org/opencord/pppoeagent/PPPoEDVendorSpecificTag.java
new file mode 100644
index 0000000..8605637
--- /dev/null
+++ b/api/src/main/java/org/opencord/pppoeagent/PPPoEDVendorSpecificTag.java
@@ -0,0 +1,187 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+/**
+ * Represents the "Vendor-Specific" PPPoED tag.
+ * This class is used by the PPPoE agent in order to read/build the tag that carries the
+ * vendor-id, circuit-id and remote-id information, which are attached to the upstrem packets.
+ */
+public class PPPoEDVendorSpecificTag {
+    public static final int BBF_IANA_VENDOR_ID = 3561;
+    private static final byte CIRCUIT_ID_OPTION = 1;
+    private static final byte REMOTE_ID_OPTION = 2;
+
+    private int vendorId = BBF_IANA_VENDOR_ID;
+    private String circuitId = null;
+    private String remoteId = null;
+
+    /**
+     * Creates an empty Vendor-Specific tag object.
+     *
+     */
+    public PPPoEDVendorSpecificTag() {
+    }
+
+    /**
+     * Creates a new Vendor-Specific tag object.
+     *
+     * @param circuitId circuit-id information
+     * @param remoteId remote-id information
+     */
+    public PPPoEDVendorSpecificTag(String circuitId, String remoteId) {
+        this.circuitId = circuitId;
+        this.remoteId = remoteId;
+    }
+
+    /**
+     * Sets the vendor-id.
+     *
+     * @param value vendor-id to be set.
+     */
+    public void setVendorId(int value) {
+        this.vendorId = value;
+    }
+
+    /**
+     * Gets the vendor-id.
+     *
+     * @return the vendor-id.
+     */
+    public Integer getVendorId() {
+        return this.vendorId;
+    }
+
+    /**
+     * Sets the circuit-id.
+     *
+     * @param value circuit-id to be set.
+     */
+    public void setCircuitId(String value) {
+        this.circuitId = value;
+    }
+
+    /**
+     * Gets the circuit-id.
+     *
+     * @return the circuit-id.
+     */
+    public String getCircuitId() {
+        return this.circuitId;
+    }
+
+    /**
+     * Sets the remote-id.
+     *
+     * @param value remote-id to be set.
+     */
+    public void setRemoteId(String value) {
+        this.remoteId = value;
+    }
+
+    /**
+     * Gets the remote-id.
+     *
+     * @return the remote-id.
+     */
+    public String getRemoteId() {
+        return this.remoteId;
+    }
+
+    /**
+     * Returns the representation of the PPPoED vendor-specific tag as byte array.
+     * @return returns byte array
+     */
+    public byte[] toByteArray() {
+        ByteArrayOutputStream buf = new ByteArrayOutputStream();
+
+        byte[] bytes = ByteBuffer.allocate(4).putInt(this.vendorId).array();
+        buf.write(bytes, 0, bytes.length);
+
+        // Add sub option if set
+        if (circuitId != null) {
+            buf.write(CIRCUIT_ID_OPTION);
+            buf.write((byte) circuitId.length());
+            bytes = circuitId.getBytes(StandardCharsets.UTF_8);
+            buf.write(bytes, 0, bytes.length);
+        }
+
+        // Add sub option if set
+        if (remoteId != null) {
+            buf.write(REMOTE_ID_OPTION);
+            buf.write((byte) remoteId.length());
+            bytes = remoteId.getBytes(StandardCharsets.UTF_8);
+            buf.write(bytes, 0, bytes.length);
+        }
+
+        return buf.toByteArray();
+    }
+
+    /**
+     * Returns a PPPoEDVendorSpecificTag object from a byte array.
+     * @param data byte array data to convert to PPPoEDVendorSpecificTag object.
+     * @return vendor specific tag from given byte array.
+     * */
+    public static PPPoEDVendorSpecificTag fromByteArray(byte[] data) {
+        int position = 0;
+        final int vendorIdLength = 4;
+
+        PPPoEDVendorSpecificTag vendorSpecificTag = new PPPoEDVendorSpecificTag();
+
+        if (data.length < vendorIdLength) {
+            return vendorSpecificTag;
+        }
+
+        int vId = ByteBuffer.wrap(data, position, vendorIdLength).getInt();
+        vendorSpecificTag.setVendorId(vId);
+
+        position += vendorIdLength;
+
+        while (data.length > position) {
+            byte code = data[position];
+            position++;
+
+            if (data.length < position) {
+                break;
+            }
+
+            int length = (int) data[position];
+            position++;
+
+            if (data.length < (position + length)) {
+                break;
+            }
+
+            String clvString = new String(Arrays.copyOfRange(data, position, length + position),
+                                          StandardCharsets.UTF_8);
+
+            position += length;
+
+            if (code == CIRCUIT_ID_OPTION) {
+                vendorSpecificTag.setCircuitId(clvString);
+            } else if (code == REMOTE_ID_OPTION) {
+                vendorSpecificTag.setRemoteId(clvString);
+            }
+        }
+
+        return vendorSpecificTag;
+    }
+}
diff --git a/api/src/main/java/org/opencord/pppoeagent/PppoeAgentEvent.java b/api/src/main/java/org/opencord/pppoeagent/PppoeAgentEvent.java
new file mode 100644
index 0000000..ec491b1
--- /dev/null
+++ b/api/src/main/java/org/opencord/pppoeagent/PppoeAgentEvent.java
@@ -0,0 +1,204 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.pppoeagent;
+
+import org.onlab.packet.MacAddress;
+import org.onosproject.event.AbstractEvent;
+import org.onosproject.net.ConnectPoint;
+/**
+ * PppoeAgent event.
+ */
+public class PppoeAgentEvent extends AbstractEvent<PppoeAgentEvent.Type, PppoeSessionInfo> {
+    private final ConnectPoint connectPoint;
+    private final MacAddress subscriberMacAddress;
+    private final String counterName;
+    private final Long counterValue;
+    private final String subscriberId;
+
+    // Session terminates may have an error tag with a 'reason' message, this field is meant to track that info.
+    private final String reason;
+
+    public static final String GLOBAL_COUNTER = "global";
+
+    /**
+     * Type of the event.
+     */
+    public enum Type {
+        /**
+         * PPPoE discovery negotiation started.
+         */
+        START,
+
+        /**
+         * PPPoE server is responding to client packets - session is under negotiation.
+         */
+        NEGOTIATION,
+
+        /**
+         * PPPoE discovery negotiation is complete, session is established.
+         */
+        SESSION_ESTABLISHED,
+
+        /**
+         * Client or server event to indicate end of a session.
+         */
+        TERMINATE,
+
+        /**
+         * PPPoE stats update.
+         */
+        STATS_UPDATE,
+
+        /**
+         * Circuit-id mismatch.
+         */
+        INVALID_CID,
+
+        /**
+         * Default value for unknown event.
+         */
+        UNKNOWN
+    }
+
+
+    /**
+     * Creates a new PPPoE counters event.
+     *
+     * @param type type of the event
+     * @param sessionInfo session info
+     * @param counterName name of specific counter
+     * @param counterValue value of specific counter
+     * @param subscriberMacAddress the subscriber MAC address information
+     * @param subscriberId id of subscriber
+     */
+    public PppoeAgentEvent(Type type, PppoeSessionInfo sessionInfo, String counterName, Long counterValue,
+                           MacAddress subscriberMacAddress, String subscriberId) {
+        super(type, sessionInfo);
+        this.counterName = counterName;
+        this.counterValue = counterValue;
+        this.subscriberMacAddress = subscriberMacAddress;
+        this.subscriberId = subscriberId;
+        this.connectPoint = null;
+        this.reason = null;
+    }
+
+    /**
+     * Creates a new generic PPPoE event.
+     *
+     * @param type type of the event
+     * @param sessionInfo session info
+     * @param connectPoint connect point the client is on
+     * @param subscriberMacAddress the subscriber MAC address information
+     */
+    public PppoeAgentEvent(Type type, PppoeSessionInfo sessionInfo, ConnectPoint connectPoint,
+                           MacAddress subscriberMacAddress) {
+        super(type, sessionInfo);
+        this.connectPoint = connectPoint;
+        this.subscriberMacAddress = subscriberMacAddress;
+        this.counterName = null;
+        this.counterValue = null;
+        this.subscriberId = null;
+        this.reason = null;
+    }
+
+    /**
+     * Creates a new PPPOE event with a reason (PADT packets may have a 'reason' field).
+     *
+     * @param type type of the event
+     * @param sessionInfo session info
+     * @param connectPoint connect point the client is on
+     * @param subscriberMacAddress the subscriber MAC address information
+     * @param reason events such TERMINATE may have reason field
+     */
+    public PppoeAgentEvent(Type type, PppoeSessionInfo sessionInfo, ConnectPoint connectPoint,
+                           MacAddress subscriberMacAddress, String reason) {
+        super(type, sessionInfo);
+        this.connectPoint = connectPoint;
+        this.subscriberMacAddress = subscriberMacAddress;
+        this.reason = reason;
+        this.counterName = null;
+        this.counterValue = null;
+        this.subscriberId = null;
+    }
+
+
+    /**
+     * Gets the PPPoE client connect point.
+     *
+     * @return connect point
+     */
+    public ConnectPoint getConnectPoint() {
+        return connectPoint;
+    }
+
+    /**
+     * Gets the subscriber MAC address.
+     *
+     * @return the MAC address from subscriber
+     */
+    public MacAddress getSubscriberMacAddress() {
+        return subscriberMacAddress;
+    }
+
+    /**
+     * Gets the event reason.
+     *
+     * @return event reason.
+     */
+    public String getReason() {
+        return reason;
+    }
+
+    /**
+     * Gets the counter name.
+     *
+     * @return counter name.
+     */
+    public String getCounterName() {
+        return counterName;
+    }
+
+    /**
+     * Gets the counter value.
+     *
+     * @return counter value.
+     */
+    public Long getCounterValue() {
+        return counterValue;
+    }
+
+    /**
+     * Gets the subscriber identifier information.
+     *
+     * @return the Id from subscriber
+     */
+    public String getSubscriberId() {
+        return subscriberId;
+    }
+
+    @Override
+    public String toString() {
+        return "PppoeAgentEvent{" +
+                "connectPoint=" + connectPoint +
+                ", subscriberMacAddress=" + subscriberMacAddress +
+                ", reason='" + reason + '\'' +
+                ", counterName=" + counterName +
+                ", counterValue=" + counterValue +
+                ", subscriberId='" + subscriberId + '\'' +
+                '}';
+    }
+}
diff --git a/api/src/main/java/org/opencord/pppoeagent/PppoeAgentListener.java b/api/src/main/java/org/opencord/pppoeagent/PppoeAgentListener.java
new file mode 100644
index 0000000..03d3501
--- /dev/null
+++ b/api/src/main/java/org/opencord/pppoeagent/PppoeAgentListener.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.pppoeagent;
+
+import org.onosproject.event.EventListener;
+
+/**
+ * Listener for PPPoE agents events.
+ */
+public interface PppoeAgentListener extends EventListener<PppoeAgentEvent> {
+}
diff --git a/api/src/main/java/org/opencord/pppoeagent/PppoeAgentService.java b/api/src/main/java/org/opencord/pppoeagent/PppoeAgentService.java
new file mode 100644
index 0000000..616607a
--- /dev/null
+++ b/api/src/main/java/org/opencord/pppoeagent/PppoeAgentService.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.pppoeagent;
+
+import java.util.Map;
+import org.onlab.packet.MacAddress;
+import org.onosproject.event.ListenerService;
+
+/**
+ * PPPoE Agent service.
+ */
+public interface PppoeAgentService extends
+        ListenerService<PppoeAgentEvent, PppoeAgentListener> {
+    Map<MacAddress, PppoeSessionInfo> getSessionsMap();
+
+    /**
+    * Removes all PPPoE agent session entries.
+    */
+    void clearSessionsMap();
+}
diff --git a/api/src/main/java/org/opencord/pppoeagent/PppoeAgentStoreDelegate.java b/api/src/main/java/org/opencord/pppoeagent/PppoeAgentStoreDelegate.java
new file mode 100644
index 0000000..3d2f72e
--- /dev/null
+++ b/api/src/main/java/org/opencord/pppoeagent/PppoeAgentStoreDelegate.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.pppoeagent;
+
+import org.onosproject.store.StoreDelegate;
+/**
+ * Store delegate for PPPoE Agent store.
+ */
+public interface PppoeAgentStoreDelegate extends StoreDelegate<PppoeAgentEvent> {
+}
\ No newline at end of file
diff --git a/api/src/main/java/org/opencord/pppoeagent/PppoeSessionInfo.java b/api/src/main/java/org/opencord/pppoeagent/PppoeSessionInfo.java
new file mode 100644
index 0000000..81d7d09
--- /dev/null
+++ b/api/src/main/java/org/opencord/pppoeagent/PppoeSessionInfo.java
@@ -0,0 +1,204 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent;
+
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.PPPoED;
+import org.onosproject.net.ConnectPoint;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
+
+/**
+ * PppoeAgent session information.
+ */
+public class PppoeSessionInfo {
+    private ConnectPoint clientCp;
+    private ConnectPoint serverCp;
+    private Byte packetCode;
+    private short sessionId;
+    SubscriberAndDeviceInformation subscriber;
+    MacAddress clientMac;
+
+    /**
+     * Creates a new PPPoE session information object.
+     *
+     * @param clientCp   PPPoE client connect-point.
+     * @param serverCp   PPPoE server connect-point.
+     * @param packetCode The packet code of last PPPOED received message.
+     * @param sessionId  session-id value.
+     * @param subscriber Sadis object of PPPoE client.
+     * @param clientMac  MAC address of PPPoE client.
+     */
+    public PppoeSessionInfo(ConnectPoint clientCp, ConnectPoint serverCp, Byte packetCode, short sessionId,
+                            SubscriberAndDeviceInformation subscriber, MacAddress clientMac) {
+        this.clientCp = clientCp;
+        this.serverCp = serverCp;
+        this.packetCode = packetCode;
+        this.sessionId = sessionId;
+        this.subscriber = subscriber;
+        this.clientMac = clientMac;
+    }
+
+    /**
+     * Creates an empty PPPoE session information object.
+     */
+    public PppoeSessionInfo() {
+    }
+
+    /**
+     * Gets the PPPoE client connect-point.
+     *
+     * @return client connect-point.
+     */
+    public ConnectPoint getClientCp() {
+        return clientCp;
+    }
+
+    /**
+     * Sets the PPPoE client connect-point.
+     *
+     * @param clientCp client connect-point.
+     */
+    public void setClientCp(ConnectPoint clientCp) {
+        this.clientCp = clientCp;
+    }
+
+    /**
+     * Gets the PPPoE server connect-point.
+     *
+     * @return server connect-point.
+     */
+    public ConnectPoint getServerCp() {
+        return serverCp;
+    }
+
+    /**
+     * Sets the PPPoE server connect-point.
+     *
+     * @param serverCp server connect-point.
+     */
+    public void setServerCp(ConnectPoint serverCp) {
+        this.serverCp = serverCp;
+    }
+
+    /**
+     * Gets the PPPoE client SADIS object.
+     *
+     * @return client SADIS object.
+     */
+    public SubscriberAndDeviceInformation getSubscriber() {
+        return subscriber;
+    }
+
+    /**
+     * Sets the PPPoE client SADIS object.
+     *
+     * @param subscriber client SADIS object.
+     */
+    public void setSubscriber(SubscriberAndDeviceInformation subscriber) {
+        this.subscriber = subscriber;
+    }
+
+    /**
+     * Gets the PPPoE client MAC address.
+     *
+     * @return client MAC address.
+     */
+    public MacAddress getClientMac() {
+        return clientMac;
+    }
+
+    /**
+     * Sets the PPPoE client MAC address.
+     *
+     * @param clientMac MAC address.
+     */
+    public void setClientMac(MacAddress clientMac) {
+        this.clientMac = clientMac;
+    }
+
+    /**
+     * Gets the PPPoE session-id.
+     *
+     * @return PPPoE session-id.
+     */
+    public short getSessionId() {
+        return sessionId;
+    }
+
+    /**
+     * Sets the PPPoE session-id.
+     *
+     * @param sessionId PPPoE session-id.
+     */
+    public void setSessionId(short sessionId) {
+        this.sessionId = sessionId;
+    }
+
+    /**
+     * Gets the PPPoED packet code of the last received message.
+     *
+     * @return last received PPPoED code.
+     */
+    public Byte getPacketCode() {
+        return packetCode;
+    }
+
+    /**
+     * Sets the PPPoED packet code from the last received message.
+     *
+     * @param packetCode PPPoED packet code.
+     */
+    public void setPacketCode(byte packetCode) {
+        this.packetCode = packetCode;
+    }
+
+    /**
+     * Gets a string to represent the current session state based on the last received packet code.
+     * This function uses conveniently the name of some PPPoEAgentEvent types to represent the state.
+     *
+     * @return PPPOE session state.
+     */
+    public String getCurrentState() {
+        PPPoED.Type lastReceivedPkt = PPPoED.Type.getTypeByValue(this.packetCode);
+        switch (lastReceivedPkt) {
+            case PADI:
+                return PppoeAgentEvent.Type.START.name();
+            case PADR:
+            case PADO:
+                return PppoeAgentEvent.Type.NEGOTIATION.name();
+            case PADS:
+                return PppoeAgentEvent.Type.SESSION_ESTABLISHED.name();
+            case PADT:
+                // This case might never happen (entry is being removed on PADT messages).
+                return PppoeAgentEvent.Type.TERMINATE.name();
+            default:
+                return PppoeAgentEvent.Type.UNKNOWN.name();
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "PppoeSessionInfo{" +
+                "clientCp=" + clientCp +
+                ", serverCp=" + serverCp +
+                ", packetCode=" + packetCode +
+                ", sessionId=" + sessionId +
+                ", subscriber=" + subscriber +
+                ", clientMac=" + clientMac +
+                '}';
+    }
+}
+
diff --git a/api/src/main/java/org/opencord/pppoeagent/package-info.java b/api/src/main/java/org/opencord/pppoeagent/package-info.java
new file mode 100644
index 0000000..22b3633
--- /dev/null
+++ b/api/src/main/java/org/opencord/pppoeagent/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * API for PPPoE agent.
+ */
+package org.opencord.pppoeagent;
diff --git a/api/src/main/java/org/opencord/pppoeagent/util/CircuitIdBuilder.java b/api/src/main/java/org/opencord/pppoeagent/util/CircuitIdBuilder.java
new file mode 100644
index 0000000..f955704
--- /dev/null
+++ b/api/src/main/java/org/opencord/pppoeagent/util/CircuitIdBuilder.java
@@ -0,0 +1,437 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.util;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.lang3.Range;
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.Port;
+import org.onosproject.net.device.DeviceService;
+import org.opencord.sadis.BaseInformationService;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
+import org.onosproject.net.Device;
+import org.opencord.sadis.UniTagInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Circuit-id builder util.
+ */
+public class CircuitIdBuilder {
+    //#region Circuit-id Builder
+    private CircuitIdConfig circuitIdConfig;
+    private DeviceService deviceService;
+    private BaseInformationService<SubscriberAndDeviceInformation> subsService;
+    private ConnectPoint connectPoint;
+    private String[] exclusiveExpressions;
+    private Map<CircuitIdFieldName, String> customSeparators;
+    private UniTagInformation uniTagInformation;
+
+    private final ArrayList<CircuitIdField> availableFields = new ArrayList<>(
+        Arrays.asList(new CircuitIdAccessNodeIdentifier(),
+                new CircuitIdNetworkTechnology(),
+                new CircuitIdSlot(),
+                new CircuitIdPort(),
+                new CircuitIdOnuSerialNumber(),
+                new CircuitIdUniPortNumber(),
+                new CircuitIdSvId(),
+                new CircuitIdCvId())
+    );
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    /**
+     * Creates a new CircuitId builder with default configuration.
+     */
+    public CircuitIdBuilder() {
+        // Instantiate the circuit-id config Object.
+        circuitIdConfig = new CircuitIdConfig();
+        circuitIdConfig.setSeparator("/");
+        // This is a default list with exclusive expressions to validate the field value.
+        exclusiveExpressions = new String[] {circuitIdConfig.getSeparator()};
+
+        // At the end we set the default field list - in the future these fields will be defined by the operator.
+        circuitIdConfig.setFieldListWithDefault(availableFields);
+        customSeparators = new HashMap<>();
+    }
+
+    /**
+     * Sets the connect-point.
+     *
+     * @param value connect-point.
+     * @return circuit-id builder.
+     */
+    public CircuitIdBuilder setConnectPoint(ConnectPoint value) {
+        this.connectPoint = value;
+        return this;
+    }
+
+    /**
+     * Sets the device service.
+     *
+     * @param value device service.
+     * @return circuit-id builder.
+     */
+    public CircuitIdBuilder setDeviceService(DeviceService value) {
+        this.deviceService = value;
+        return this;
+    }
+
+    /**
+     * Sets the SADIS service.
+     *
+     * @param value SADIS service.
+     * @return circuit-id builder.
+     */
+    public CircuitIdBuilder setSubsService(BaseInformationService<SubscriberAndDeviceInformation> value) {
+        this.subsService = value;
+        return this;
+    }
+
+    /**
+     * Sets the UniTag information.
+     *
+     * @param value UniTag information.
+     * @return circuit-id builder.
+     */
+    public CircuitIdBuilder setUniTagInformation(UniTagInformation value) {
+        this.uniTagInformation = value;
+        return this;
+    }
+
+    /**
+     * Adds a custom separator.
+     *
+     * @param field the circuit-id field associated to the custom separator.
+     * @param separator the custom separator to be added.
+     * @return circuit-id builder.
+     */
+    public CircuitIdBuilder addCustomSeparator(CircuitIdFieldName field, String separator) {
+        if (!circuitIdConfig.getSeparator().equals(separator)) {
+            customSeparators.put(field, separator);
+        } else {
+            log.warn("Not defining custom separator '{}' for {} field since it's equals to the configured separator.",
+                    separator, field.name());
+        }
+
+        return this;
+    }
+
+    /**
+     * Gets the custom separators.
+     *
+     * @return a map with the circuit-id fields and its custom separators.
+     */
+    public Map<CircuitIdFieldName, String> getCustomSeparators() {
+        return customSeparators;
+    }
+
+    /**
+     * Gets the circuit-id configuration object.
+     *
+     * @return the configuration for circuit-id builder.
+     */
+    public CircuitIdConfig getCircuitIdConfig() {
+        return this.circuitIdConfig;
+    }
+
+    /**
+     * Sets the circuit-id configuration.
+     *
+     * @param fieldNameList the list of desired fields.
+     * @return circuit-id builder.
+     */
+    public CircuitIdBuilder setCircuitIdConfig(ArrayList<CircuitIdFieldName> fieldNameList) {
+        circuitIdConfig.setFieldList(fieldNameList, availableFields);
+        return this;
+    }
+
+    /**
+     * Generates the circuit-id based on the provided configuration and default/custom separators.
+     *
+     * @return circuit-id.
+     */
+    public String build() {
+        String circuitId = "";
+
+        // Get the field list.
+        ArrayList<CircuitIdField> fieldList = circuitIdConfig.getFieldList();
+
+        // Check if it's valid.
+        if (fieldList == null || fieldList.size() <= 0) {
+            log.error("Failed to build the circuit Id: there's no entries in the field list.");
+            return "";
+        }
+
+        // This list is filtered to ignore prefix fields.
+        ArrayList<CircuitIdField> filteredFieldList = fieldList;
+
+        // Go throughout the field list to build the string.
+        for (int i = 0; i <  filteredFieldList.size(); i++) {
+            CircuitIdField field = filteredFieldList.get(i);
+
+            String value = "";
+            try {
+                value = field.build();
+            } catch (MissingParameterException | FieldValidationException e) {
+                log.error(e.getMessage());
+                return "";
+            }
+
+            // Concat the obtained value with the rest of id.
+            circuitId = circuitId.concat(value);
+
+            // If this is not the last "for" iteration, isolate with the separator.
+            if (i != (filteredFieldList.size() - 1)) {
+                String separator = customSeparators.containsKey(field.getFieldName()) ?
+                        customSeparators.get(field.getFieldName()) : circuitIdConfig.getSeparator();
+
+                circuitId = circuitId.concat(separator);
+            }
+        }
+        // At the end, it returns the fully built string.
+        return circuitId;
+    }
+    //#endregion
+
+    //#region Field Classes
+    private SubscriberAndDeviceInformation getSadisEntry() {
+        Port p = deviceService.getPort(connectPoint);
+        String subscriber = p.annotations().value(AnnotationKeys.PORT_NAME);
+        return subsService.get(subscriber);
+    }
+
+    private class CircuitIdAccessNodeIdentifier extends CircuitIdField {
+        CircuitIdAccessNodeIdentifier() {
+            this.setFieldName(CircuitIdFieldName.AcessNodeIdentifier)
+                    .setIsNumber(false)
+                    .setExclusiveExpressions(exclusiveExpressions);
+        }
+
+        @Override
+        String build() throws MissingParameterException,
+                              FieldValidationException {
+            // This field must be the serial number of the OLT which the packet came from.
+            // So we try to recover this information using the device Id of the connect point.
+            Device device = deviceService.getDevice(connectPoint.deviceId());
+
+            if (device == null) {
+                String errorMsg = String.format("Device not found at device service: %s.", connectPoint.deviceId());
+                throw new MissingParameterException(errorMsg);
+            }
+
+            String accessNodeId = device.serialNumber();
+
+            if (isValid(accessNodeId)) {
+                return accessNodeId;
+            } else {
+                throw new FieldValidationException(this, accessNodeId);
+            }
+        }
+    }
+
+    private static class CircuitIdSlot extends CircuitIdField {
+        CircuitIdSlot() {
+            this.setFieldName(CircuitIdFieldName.Slot)
+                    .setIsNumber(true)
+                    .setMaxLength(2)
+                    .setRange(Range.between(0L, 99L));
+        }
+
+        @Override
+        String build() {
+            // At first, this will be hard-coded as "0". But this may change in future implementation.
+            return "0";
+        }
+    }
+
+    private class CircuitIdPort extends CircuitIdField {
+        CircuitIdPort() {
+            this.setFieldName(CircuitIdFieldName.Port)
+                    .setIsNumber(true)
+                    .setMaxLength(3)
+                    .setRange(Range.between(1L, 256L));
+        }
+
+        @Override
+        String build() throws MissingParameterException,
+                              FieldValidationException {
+            String port = "";
+
+            // If there's no connect-point we can't build this field.
+            if (connectPoint == null) {
+                String errorMsg = "ConnectPoint not passed to circuit-id builder - can't build PORT field.";
+                throw new MissingParameterException(errorMsg);
+            }
+
+            long connectPointPort = connectPoint.port().toLong();
+            int ponPort = ((int) connectPointPort >> 12) + 1;
+
+            port = String.valueOf(ponPort);
+            if (isValid(port)) {
+                return port;
+            } else {
+                throw new FieldValidationException(this, port);
+            }
+        }
+    }
+
+    private class CircuitIdOnuSerialNumber extends CircuitIdField {
+        CircuitIdOnuSerialNumber() {
+            this.setFieldName(CircuitIdFieldName.OnuSerialNumber)
+                    .setIsNumber(false)
+                    .setExclusiveExpressions(exclusiveExpressions);
+        }
+
+        @Override
+        String build() throws MissingParameterException,
+                              FieldValidationException {
+
+            if (deviceService == null) {
+                String errorMsg = "Device service not passed to circuit-id builder - can't build ONU SN field.";
+                throw new MissingParameterException(errorMsg);
+            }
+
+            Port p = deviceService.getPort(connectPoint);
+            String onuSn = p.annotations().value(AnnotationKeys.PORT_NAME);
+
+            // The reason of the following block is that in some cases, the UNI port number can be
+            // appended in the ONU serial number. So it's required to remove it.
+            CircuitIdUniPortNumber uniPortNumberField = new CircuitIdUniPortNumber();
+            String uniPortNumber = uniPortNumberField.build();
+
+            if (uniPortNumberField.isValid(uniPortNumber)) {
+                // Build the suffix it shouldn't have based on the UNI port number.
+                uniPortNumber = "-" + (Integer.parseInt(uniPortNumber));
+
+                // Checks if the serial number contains this suffix.
+                if (onuSn.substring(onuSn.length() - uniPortNumber.length()).equals(uniPortNumber)) {
+                    // Remove it if this is the case.
+                    onuSn = onuSn.substring(0, onuSn.indexOf(uniPortNumber));
+                }
+
+            } else {
+                log.debug("Failed to get the UNI port number, the ONU serial number could be wrong;");
+            }
+
+
+            if (isValid(onuSn)) {
+                return onuSn;
+            } else {
+                throw new FieldValidationException(this, onuSn);
+            }
+        }
+    }
+
+    private class CircuitIdUniPortNumber extends CircuitIdField {
+        CircuitIdUniPortNumber() {
+            this.setFieldName(CircuitIdFieldName.UniPortNumber)
+                    .setIsNumber(true)
+                    .setMaxLength(2)
+                    .setRange(Range.between(1L, 99L));
+        }
+
+        @Override
+        String build() throws MissingParameterException,
+                              FieldValidationException {
+            // If there's no connect-point we can't build this field.
+            if (connectPoint == null) {
+                String errorMsg = "ConnectPoint not passed to circuit-id builder - can't build UNI PORT NUMBER field.";
+                throw new MissingParameterException(errorMsg);
+            }
+
+            long connectPointPort = connectPoint.port().toLong();
+            int uniPortNumber = ((int) connectPointPort & 0xF) + 1;
+
+            String uniPortString = String.valueOf(uniPortNumber);
+
+            if (isValid(uniPortString)) {
+                return uniPortString;
+            } else {
+                throw new FieldValidationException(this, uniPortString);
+            }
+        }
+    }
+
+    private class CircuitIdSvId extends CircuitIdField {
+        CircuitIdSvId() {
+            this.setFieldName(CircuitIdFieldName.SvID)
+                    .setIsNumber(true)
+                    .setMaxLength(4)
+                    .setRange(Range.between(0L, 4095L));
+        }
+
+        @Override
+        String build() throws MissingParameterException,
+                              FieldValidationException {
+            if (uniTagInformation == null) {
+                String errorMsg = String.format("UNI TAG info not found for %s while looking for S-TAG.", connectPoint);
+                throw new MissingParameterException(errorMsg);
+            }
+
+            String sVid = uniTagInformation.getPonSTag().toString();
+            if (isValid(sVid)) {
+                return sVid;
+            } else {
+                throw new FieldValidationException(this, sVid);
+            }
+        }
+    }
+
+     private class CircuitIdCvId extends CircuitIdField {
+        CircuitIdCvId() {
+            this.setFieldName(CircuitIdFieldName.CvID)
+                    .setIsNumber(true)
+                    .setMaxLength(4)
+                    .setRange(Range.between(0L, 4095L));
+        }
+
+        @Override
+        String build() throws MissingParameterException,
+                              FieldValidationException {
+            if (uniTagInformation == null) {
+                String errorMsg = String.format("UNI TAG info not found for %s looking for C-TAG", connectPoint);
+                throw new MissingParameterException(errorMsg);
+            }
+
+            String cVid = uniTagInformation.getPonCTag().toString();
+            if (isValid(cVid)) {
+                return cVid;
+            } else {
+                throw new FieldValidationException(this, cVid);
+            }
+        }
+    }
+
+    private class CircuitIdNetworkTechnology extends CircuitIdField {
+        CircuitIdNetworkTechnology() {
+            this.setFieldName(CircuitIdFieldName.NetworkTechnology)
+                    .setIsNumber(false)
+                    .setExclusiveExpressions(exclusiveExpressions);
+        }
+
+        // For now this is fixed.
+        @Override
+        String build() {
+            return "xpon";
+        }
+    }
+    //#endregion
+}
\ No newline at end of file
diff --git a/api/src/main/java/org/opencord/pppoeagent/util/CircuitIdConfig.java b/api/src/main/java/org/opencord/pppoeagent/util/CircuitIdConfig.java
new file mode 100644
index 0000000..3efc369
--- /dev/null
+++ b/api/src/main/java/org/opencord/pppoeagent/util/CircuitIdConfig.java
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Configuration options for circuit-id builder.
+ */
+public class CircuitIdConfig {
+    private String separator;
+    private ArrayList<CircuitIdField> fieldList;
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    // This list defines the default configuration for circuit-id.
+    // Circuit-id will follow the same order of this list.
+    private final ArrayList<CircuitIdFieldName> defaultFieldSelection = new ArrayList<>(
+            Arrays.asList(CircuitIdFieldName.AcessNodeIdentifier,
+                    CircuitIdFieldName.NetworkTechnology,
+                    CircuitIdFieldName.Slot,
+                    CircuitIdFieldName.Port,
+                    CircuitIdFieldName.OnuSerialNumber,
+                    CircuitIdFieldName.UniPortNumber,
+                    CircuitIdFieldName.SvID,
+                    CircuitIdFieldName.CvID)
+    );
+
+    /**
+     * Gets the default separator.
+     *
+     * @return separator.
+     */
+    public String getSeparator() {
+        return separator;
+    }
+
+    /**
+     * Sets the default separator.
+     *
+     * @param  value separator.
+     * @return circuit-id configuration object.
+     */
+    public CircuitIdConfig setSeparator(String value) {
+        this.separator = value;
+        return this;
+    }
+
+    /**
+     * Gets the circuit-id field list.
+     *
+     * @return circuit-id field list.
+     */
+    public ArrayList<CircuitIdField> getFieldList() {
+        return fieldList;
+    }
+
+    private CircuitIdConfig setFieldList(ArrayList<CircuitIdField> value) {
+        this.fieldList = value;
+        return this;
+    }
+
+    /**
+     * Sets the field list considering a list of supported fields.
+     *
+     * @param fieldSelection   desired field list.
+     * @param availableFields  available field list.
+     * @return circuit-id config object.
+     */
+    public CircuitIdConfig setFieldList(ArrayList<CircuitIdFieldName> fieldSelection,
+                                        ArrayList<CircuitIdField> availableFields) {
+        // Find out the fields based on the field selection for this config.
+        ArrayList<CircuitIdField> value = fieldSelection
+                .stream()
+                .map(fieldName -> availableFields
+                        .stream()
+                        .filter(field -> field.getFieldName()
+                                .equals(fieldName))
+                        .findFirst()
+                        .orElse(null))
+                .collect(Collectors.toCollection(ArrayList::new));
+
+        boolean foundAllFields = value.stream().noneMatch(Objects::isNull);
+
+        // Ignores if it found all fields.
+        if (!foundAllFields) {
+            // Otherwise, it log and remove null entries.
+            log.warn("Some desired fields are not available.");
+            value.removeAll(Collections.singleton(null));
+        }
+
+        setFieldList(value);
+        return this;
+    }
+
+    /**
+     * Sets the field list with the default values considering a list of available fields.
+     *
+     * @param availableFields available field list.
+     * @return circuit-id config object.
+     */
+    public CircuitIdConfig setFieldListWithDefault(ArrayList<CircuitIdField> availableFields) {
+        return setFieldList(this.getDefaultFieldSelection(), availableFields);
+    }
+
+    private ArrayList<CircuitIdFieldName> getDefaultFieldSelection() {
+        return defaultFieldSelection;
+    }
+}
\ No newline at end of file
diff --git a/api/src/main/java/org/opencord/pppoeagent/util/CircuitIdField.java b/api/src/main/java/org/opencord/pppoeagent/util/CircuitIdField.java
new file mode 100644
index 0000000..f44aaf3
--- /dev/null
+++ b/api/src/main/java/org/opencord/pppoeagent/util/CircuitIdField.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.util;
+
+import org.apache.commons.lang3.Range;
+
+import java.util.Arrays;
+
+/**
+ *  Circuit-id field abstract model.
+ *  This is a model that shouldn't be instantiated, for new fields please create a class that extends this one.
+ */
+public abstract class CircuitIdField {
+    // Definitions to perform general validation.
+    private Integer maxLength;
+    private boolean isRangeLike;
+    private Range<Long> range;
+    private CircuitIdFieldName fieldName;
+    private boolean isNumber;
+
+    // This is used to avoid conflicts in string values,
+    // Example: when a field value contains the same char as the separator.
+    private String[] exclusiveExpressions;
+
+    CircuitIdFieldName getFieldName() {
+        return this.fieldName;
+    }
+
+    CircuitIdField setFieldName(CircuitIdFieldName value) {
+        this.fieldName = value;
+        return this;
+    }
+
+    Integer getMaxLength() {
+        return this.maxLength;
+    }
+
+    CircuitIdField setMaxLength(int value) {
+        this.maxLength = value;
+        return this;
+    }
+
+    Range<Long> getRange() {
+        return isRangeLike ? range : null;
+    }
+
+    CircuitIdField setRange(Range<Long> value) {
+        this.isRangeLike = value != null;
+        this.range = value;
+        return this;
+    }
+
+    CircuitIdField setIsNumber(boolean value) {
+        this.isNumber = value;
+        return this;
+    }
+
+    CircuitIdField setExclusiveExpressions(String[] value) {
+        this.exclusiveExpressions = value;
+        return this;
+    }
+
+    // It's recommended to call this method after building the field value, this will
+    // perform general validations based on the definitions of the field (range, maxLength, isNumber, (...))
+    boolean isValid(String value) {
+        // This is the flag is the output of the assertions.
+        boolean challenge = true;
+
+        if (value == null) {
+            return false;
+        }
+
+        // This code block is only in the case we're validating a number:
+        long longValue = 0L;
+        if (isNumber) {
+            try {
+                // We try to parse the value.
+                longValue = Long.parseLong(value);
+                // If there's a range defined.
+                if (isRangeLike) {
+                    // We verify if the value is at the defined range.
+                    challenge &= range.contains(longValue);
+                }
+            } catch (NumberFormatException e) {
+                // If something goes wrong in the conversion, it means the number is invalid.
+                return false;
+            }
+        } else if (exclusiveExpressions != null) {
+            // When exclusive expressions are defined, it verifies if the value contains one of the expressions.
+            // If this is the case, it's an invalid value.
+            challenge = Arrays.stream(exclusiveExpressions)
+                    .noneMatch(exp -> value.contains(exp));
+        }
+
+        // At the end, it verifies if the value respects the max length. But only if the max length is defined.
+        if (maxLength != null) {
+            challenge &= value.length() <= maxLength;
+        }
+        return challenge;
+    }
+
+    // Each field should implement its own build method.
+    String build() throws MissingParameterException, FieldValidationException {
+        return "";
+    }
+}
\ No newline at end of file
diff --git a/api/src/main/java/org/opencord/pppoeagent/util/CircuitIdFieldName.java b/api/src/main/java/org/opencord/pppoeagent/util/CircuitIdFieldName.java
new file mode 100644
index 0000000..14cbab9
--- /dev/null
+++ b/api/src/main/java/org/opencord/pppoeagent/util/CircuitIdFieldName.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.util;
+
+/**
+ * Represents the field names for circuit-id.
+ */
+public enum CircuitIdFieldName {
+    AcessNodeIdentifier(1),
+    Slot(2),
+    Port(3),
+    OnuSerialNumber(4),
+    UniPortNumber(5),
+    SvID(6),
+    CvID(7),
+    NetworkTechnology(8);
+    private final int value;
+    CircuitIdFieldName(int value) {
+        this.value = value;
+    }
+}
\ No newline at end of file
diff --git a/api/src/main/java/org/opencord/pppoeagent/util/FieldValidationException.java b/api/src/main/java/org/opencord/pppoeagent/util/FieldValidationException.java
new file mode 100644
index 0000000..8754e74
--- /dev/null
+++ b/api/src/main/java/org/opencord/pppoeagent/util/FieldValidationException.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.util;
+
+/**
+ * Exception meant to be thrown when circuit-id builder builds a field which the result
+ * is not valid according to the validation criteria.
+ */
+public class FieldValidationException extends Exception {
+    public FieldValidationException(CircuitIdField field, String value) {
+        super(String.format("Failed to build the circuit ID. %s field is not valid: %s",
+                field.getFieldName().name(), value));
+    }
+
+
+}
\ No newline at end of file
diff --git a/api/src/main/java/org/opencord/pppoeagent/util/MissingParameterException.java b/api/src/main/java/org/opencord/pppoeagent/util/MissingParameterException.java
new file mode 100644
index 0000000..b6f8ec5
--- /dev/null
+++ b/api/src/main/java/org/opencord/pppoeagent/util/MissingParameterException.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.util;
+
+/**
+ * Exception meant to be thrown when circuit-id builder tries to build a
+ * field but there are missing parameterization.
+ */
+public class MissingParameterException extends Exception {
+    public MissingParameterException(String message) {
+        super(message);
+    }
+}
\ No newline at end of file
diff --git a/api/src/main/java/org/opencord/pppoeagent/util/package-info.java b/api/src/main/java/org/opencord/pppoeagent/util/package-info.java
new file mode 100644
index 0000000..6e02984
--- /dev/null
+++ b/api/src/main/java/org/opencord/pppoeagent/util/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * PPPoE agent util api.
+ */
+package org.opencord.pppoeagent.util;
diff --git a/app/app.xml b/app/app.xml
new file mode 100644
index 0000000..d441b88
--- /dev/null
+++ b/app/app.xml
@@ -0,0 +1,24 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright 2021-present Open Networking Foundation
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<app name="org.opencord.pppoeagent" origin="Open Networking Foundation" version="${project.version}"
+     category="vOlt" url="http://onosproject.org" title="PPPoE Agent"
+     featuresRepo="mvn:${project.groupId}/${project.artifactId}/${project.version}/xml/features"
+     features="${project.artifactId}" apps="org.opencord.sadis">
+    <description>${project.description}</description>
+    <artifact>mvn:${project.groupId}/${project.artifactId}/${project.version}</artifact>
+    <artifact>mvn:${project.groupId}/pppoeagent-api/${project.version}</artifact>
+</app>
diff --git a/app/features.xml b/app/features.xml
new file mode 100644
index 0000000..7b8b9a3
--- /dev/null
+++ b/app/features.xml
@@ -0,0 +1,24 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+  ~ Copyright 2021-present Open Networking Foundation
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<features xmlns="http://karaf.apache.org/xmlns/features/v1.2.0" name="${project.artifactId}-${project.version}">
+    <feature name="${project.artifactId}" version="${project.version}"
+             description="${project.description}">
+        <feature>onos-api</feature>
+        <bundle>mvn:${project.groupId}/pppoeagent-api/${project.version}</bundle>
+        <bundle>mvn:${project.groupId}/${project.artifactId}/${project.version}</bundle>
+    </feature>
+</features>
diff --git a/app/pom.xml b/app/pom.xml
new file mode 100644
index 0000000..187a9b7
--- /dev/null
+++ b/app/pom.xml
@@ -0,0 +1,190 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright 2021-present Open Networking Foundation
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <parent>
+        <groupId>org.opencord</groupId>
+        <artifactId>pppoeagent</artifactId>
+        <version>1.0.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>pppoeagent-app</artifactId>
+
+    <packaging>bundle</packaging>
+    <description>PPPoE Intermediate Agent application</description>
+
+    <properties>
+        <web.context>/onos/pppoeagent</web.context>
+        <api.version>1.0.0-SNAPSHOT</api.version>
+        <api.title>PPPoE Agent REST API</api.title>
+        <api.description>REST API to query PPPoE agent sessions information.</api.description>
+        <api.package>org.opencord.pppoeagent.rest</api.package>
+        <onos.app.requires>
+            org.opencord.sadis
+        </onos.app.requires>
+        <onos.app.name>org.opencord.pppoeagent</onos.app.name>
+        <onos.app.title>PPPoE Intermediate Agent App</onos.app.title>
+        <onos.app.category>vOLT</onos.app.category>
+        <onos.app.url>http://opencord.org</onos.app.url>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <onos.app.readme>PPPoE Intermediate Agent</onos.app.readme>
+        <onos.app.origin>Furukawa Electric LatAm</onos.app.origin>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-api</artifactId>
+            <version>${onos.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-core-serializers</artifactId>
+            <version>${onos.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onlab-rest</artifactId>
+            <version>${onos.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.opencord</groupId>
+            <artifactId>pppoeagent-api</artifactId>
+            <version>1.0.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onlab-junit</artifactId>
+            <version>${onos.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-cli</artifactId>
+            <version>${onos.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-api</artifactId>
+            <version>${onos.version}</version>
+            <scope>test</scope>
+            <classifier>tests</classifier>
+        </dependency>
+
+        <dependency>
+            <groupId>org.opencord</groupId>
+            <artifactId>sadis-api</artifactId>
+            <version>${sadis.api.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-core-common</artifactId>
+            <version>${onos.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onlab-misc</artifactId>
+            <version>${onos.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-api</artifactId>
+            <version>${onos.version}</version>
+            <scope>test</scope>
+            <classifier>tests</classifier>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onlab-osgi</artifactId>
+            <version>${onos.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.karaf.shell</groupId>
+            <artifactId>org.apache.karaf.shell.core</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.karaf.shell</groupId>
+            <artifactId>org.apache.karaf.shell.console</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>javax.ws.rs</groupId>
+            <artifactId>javax.ws.rs-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.onosproject</groupId>
+                <artifactId>onos-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <configuration>
+                    <instructions>
+                        <_wab>src/main/webapp/</_wab>
+                        <Include-Resource>
+                            WEB-INF/classes/apidoc/swagger.json=target/swagger.json,
+                            {maven-resources}
+                        </Include-Resource>
+                        <Import-Package>
+                            *,org.glassfish.jersey.servlet
+                        </Import-Package>
+                        <Web-ContextPath>${web.context}</Web-ContextPath>
+                        <Karaf-Commands>org.opencord.pppoeagent.cli</Karaf-Commands>
+                    </instructions>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/app/src/main/java/org/opencord/pppoeagent/cli/PppoeAgentShowUsersCommand.java b/app/src/main/java/org/opencord/pppoeagent/cli/PppoeAgentShowUsersCommand.java
new file mode 100644
index 0000000..a67ffca
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/cli/PppoeAgentShowUsersCommand.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.pppoeagent.cli;
+
+import org.opencord.pppoeagent.PppoeAgentService;
+import org.opencord.pppoeagent.PppoeSessionInfo;
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.PPPoED;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.Port;
+import org.onosproject.net.device.DeviceService;
+
+/**
+ *  Shows the PPPoE sessions/users learned by the agent.
+ */
+@Service
+@Command(scope = "pppoe", name = "pppoe-users",
+        description = "Shows the PPPoE users")
+public class PppoeAgentShowUsersCommand extends AbstractShellCommand {
+
+    @Argument(index = 0, name = "mac", description = "MAC address related to PPPoE session.",
+            required = false, multiValued = false)
+    private String macStr = null;
+
+    @Override
+    protected void doExecute() {
+        MacAddress macAddress = null;
+        if (macStr != null && !macStr.isEmpty()) {
+            try {
+                macAddress = MacAddress.valueOf(macStr);
+            } catch (IllegalArgumentException e) {
+                log.error(e.getMessage());
+                return;
+            }
+        }
+
+        DeviceService deviceService = AbstractShellCommand.get(DeviceService.class);
+        PppoeAgentService pppoeAgentService = AbstractShellCommand.get(PppoeAgentService.class);
+
+        if (macAddress != null) {
+            PppoeSessionInfo singleInfo = pppoeAgentService.getSessionsMap().get(macAddress);
+            if (singleInfo != null) {
+                Port devicePort = deviceService.getPort(singleInfo.getClientCp());
+                printPppoeInfo(macAddress, singleInfo, devicePort);
+            } else {
+                print("No session information found for provided MAC address %s", macAddress.toString());
+            }
+        } else {
+            pppoeAgentService.getSessionsMap().forEach((mac, sessionInfo) -> {
+                final Port devicePortFinal = deviceService.getPort(sessionInfo.getClientCp());
+                printPppoeInfo(mac, sessionInfo, devicePortFinal);
+            });
+        }
+    }
+
+    private void printPppoeInfo(MacAddress macAddr, PppoeSessionInfo sessionInfo, Port devicePort) {
+        PPPoED.Type lastReceivedPkt = PPPoED.Type.getTypeByValue(sessionInfo.getPacketCode());
+        ConnectPoint cp = sessionInfo.getClientCp();
+        String subscriberId = devicePort != null ? devicePort.annotations().value(AnnotationKeys.PORT_NAME) :
+                "UNKNOWN";
+
+        print("MacAddress=%s,SessionId=%s,CurrentState=%s,LastReceivedPacket=%s,DeviceId=%s,PortNumber=%s," +
+                        "SubscriberId=%s",
+                macAddr.toString(), String.valueOf(sessionInfo.getSessionId()),
+                sessionInfo.getCurrentState(), lastReceivedPkt.name(),
+                cp.deviceId().toString(), cp.port().toString(), subscriberId);
+    }
+}
diff --git a/app/src/main/java/org/opencord/pppoeagent/cli/PppoeAgentStatsCommand.java b/app/src/main/java/org/opencord/pppoeagent/cli/PppoeAgentStatsCommand.java
new file mode 100644
index 0000000..885bf36
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/cli/PppoeAgentStatsCommand.java
@@ -0,0 +1,137 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.pppoeagent.cli;
+
+import org.opencord.pppoeagent.PppoeAgentEvent;
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.Option;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import java.util.Collections;
+import java.util.Map;
+import org.opencord.pppoeagent.impl.PppoeAgentCountersIdentifier;
+import org.opencord.pppoeagent.impl.PppoeAgentCountersStore;
+import org.opencord.pppoeagent.impl.PppoeAgentCounterNames;
+
+/**
+ * Display/Reset the PPPoE Agent application statistics.
+ */
+@Service
+@Command(scope = "pppoe", name = "pppoeagent-stats",
+        description = "Display or Reset the PPPoE Agent application statistics.")
+public class PppoeAgentStatsCommand extends AbstractShellCommand {
+    private static final String CONFIRM_PHRASE = "please";
+
+    @Option(name = "-r", aliases = "--reset", description = "Resets a specific counter.\n",
+            required = false, multiValued = false)
+    PppoeAgentCounterNames reset = null;
+
+    @Option(name = "-R", aliases = "--reset-all", description = "Resets all counters.\n",
+            required = false, multiValued = false)
+    boolean resetAll = false;
+
+    @Option(name = "-s", aliases = "--subscriberId", description = "Subscriber Id.\n",
+            required = false, multiValued = false)
+    String subscriberId = null;
+
+    @Option(name = "-p", aliases = "--please", description = "Confirmation phrase.",
+            required = false, multiValued = false)
+    String please = null;
+
+    @Argument(index = 0, name = "counter",
+            description = "The counter to display. In case not specified, all counters will be displayed.",
+            required = false, multiValued = false)
+    PppoeAgentCounterNames counter = null;
+
+    @Override
+    protected void doExecute() {
+        PppoeAgentCountersStore pppoeCounters = AbstractShellCommand.get(PppoeAgentCountersStore.class);
+
+        if ((subscriberId == null) || (subscriberId.equals("global"))) {
+            // All subscriber Ids
+            subscriberId = PppoeAgentEvent.GLOBAL_COUNTER;
+        }
+
+        if (resetAll || reset != null) {
+            if (please == null || !please.equals(CONFIRM_PHRASE)) {
+                print("WARNING: Be aware that you are going to reset the counters. " +
+                        "Enter confirmation phrase to continue.");
+                return;
+            }
+            if (resetAll) {
+                // Reset all counters.
+                pppoeCounters.resetCounters(subscriberId);
+            } else {
+                // Reset the specified counter.
+                pppoeCounters.setCounter(subscriberId, reset, (long) 0);
+            }
+        } else {
+            Map<PppoeAgentCountersIdentifier, Long> countersMap = pppoeCounters.getCounters().counters();
+            if (countersMap.size() > 0) {
+                if (counter == null) {
+                    String jsonString = "";
+                    if (outputJson()) {
+                        jsonString = String.format("{\"%s\":{", pppoeCounters.NAME);
+                    } else {
+                        print("%s [%s] :", pppoeCounters.NAME, subscriberId);
+                    }
+                    PppoeAgentCounterNames[] counters = PppoeAgentCounterNames.values();
+                    for (int i = 0; i < counters.length; i++) {
+                        PppoeAgentCounterNames counterType = counters[i];
+                        Long value = countersMap.get(new PppoeAgentCountersIdentifier(subscriberId, counterType));
+                        if (value == null) {
+                            value = 0L;
+                        }
+                        if (outputJson()) {
+                            jsonString += String.format("\"%s\":%d", counterType, value);
+                            if (i < counters.length - 1) {
+                                jsonString += ",";
+                            }
+                        } else {
+                            printCounter(counterType, value);
+                        }
+                    }
+                    if (outputJson()) {
+                        jsonString += "}}";
+                        print("%s", jsonString);
+                    }
+                } else {
+                    // Show only the specified counter
+                    Long value = countersMap.get(new PppoeAgentCountersIdentifier(subscriberId, counter));
+                    if (value == null) {
+                        value = 0L;
+                    }
+                    if (outputJson()) {
+                        print("{\"%s\":%d}", counter, value);
+                    } else {
+                        printCounter(counter, value);
+                    }
+                }
+            } else {
+                print("No PPPoE Agent Counters were created yet for counter class [%s]",
+                        PppoeAgentEvent.GLOBAL_COUNTER);
+            }
+        }
+    }
+
+    void printCounter(PppoeAgentCounterNames c, long value) {
+        // print in non-JSON format
+        print("  %s %s %-4d", c,
+                String.join("", Collections.nCopies(50 - c.toString().length(), ".")), value);
+    }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/cli/package-info.java b/app/src/main/java/org/opencord/pppoeagent/cli/package-info.java
new file mode 100644
index 0000000..515908f
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/cli/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * CLI commands for the PPPoE agent.
+ */
+package org.opencord.pppoeagent.cli;
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/OsgiPropertyConstants.java b/app/src/main/java/org/opencord/pppoeagent/impl/OsgiPropertyConstants.java
new file mode 100644
index 0000000..65c8310
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/OsgiPropertyConstants.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.pppoeagent.impl;
+
+/**
+ * Constants for default values of configurable properties.
+ */
+public final class OsgiPropertyConstants {
+
+    private OsgiPropertyConstants() {
+    }
+
+    public static final String ENABLE_CIRCUIT_ID_VALIDATION = "enableCircuitIdValidation";
+    public static final boolean ENABLE_CIRCUIT_ID_VALIDATION_DEFAULT = true;
+
+    public static final String PPPOE_COUNTERS_TOPIC = "pppoeCountersTopic";
+    public static final String PPPOE_COUNTERS_TOPIC_DEFAULT = "onos.pppoe.stats.kpis";
+
+    public static final String PUBLISH_COUNTERS_RATE = "publishCountersRate";
+    public static final int PUBLISH_COUNTERS_RATE_DEFAULT = 10;
+
+    public static final String PPPOE_MAX_MTU = "pppoeMaxMtu";
+    public static final int PPPOE_MAX_MTU_DEFAULT = 1500;
+
+    public static final String PACKET_PROCESSOR_THREADS = "packetProcessorThreads";
+    public static final int PACKET_PROCESSOR_THREADS_DEFAULT = 10;
+
+    public static final String SYNC_COUNTERS_RATE = "syncCountersRate";
+    public static final int SYNC_COUNTERS_RATE_DEFAULT = 5;
+}
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgent.java b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgent.java
new file mode 100644
index 0000000..67c71da
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgent.java
@@ -0,0 +1,929 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.impl;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableSet;
+
+import com.google.common.collect.Sets;
+import org.opencord.pppoeagent.PppoeAgentEvent;
+import org.opencord.pppoeagent.PppoeAgentListener;
+import org.opencord.pppoeagent.PppoeAgentService;
+import org.opencord.pppoeagent.PppoeAgentStoreDelegate;
+import org.opencord.pppoeagent.PPPoEDVendorSpecificTag;
+import org.opencord.pppoeagent.PppoeSessionInfo;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Serializer;
+import com.google.common.collect.ImmutableMap;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+
+import org.onlab.util.KryoNamespace;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.PPPoED;
+import org.onlab.packet.PPPoEDTag;
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.event.AbstractListenerManager;
+import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipListener;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.config.ConfigFactory;
+import org.onosproject.net.config.NetworkConfigEvent;
+import org.onosproject.net.config.NetworkConfigListener;
+import org.onosproject.net.config.NetworkConfigRegistry;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.packet.DefaultOutboundPacket;
+import org.onosproject.net.packet.OutboundPacket;
+import org.onosproject.net.packet.PacketContext;
+import org.onosproject.net.packet.PacketProcessor;
+import org.onosproject.net.packet.PacketService;
+import org.opencord.sadis.BaseInformationService;
+import org.opencord.sadis.SadisService;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
+import org.opencord.sadis.UniTagInformation;
+import org.opencord.pppoeagent.util.CircuitIdBuilder;
+import org.opencord.pppoeagent.util.CircuitIdFieldName;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import org.onosproject.store.service.Versioned;
+
+import static java.util.concurrent.Executors.newFixedThreadPool;
+
+import static org.onlab.packet.PPPoED.PPPOED_CODE_PADI;
+import static org.onlab.packet.PPPoED.PPPOED_CODE_PADO;
+import static org.onlab.packet.PPPoED.PPPOED_CODE_PADR;
+import static org.onlab.packet.PPPoED.PPPOED_CODE_PADS;
+import static org.onlab.packet.PPPoED.PPPOED_CODE_PADT;
+import static org.onlab.packet.PPPoEDTag.PPPOED_TAG_AC_SYSTEM_ERROR;
+import static org.onlab.packet.PPPoEDTag.PPPOED_TAG_GENERIC_ERROR;
+import static org.onlab.packet.PPPoEDTag.PPPOED_TAG_SERVICE_NAME_ERROR;
+import static org.onlab.packet.PPPoEDTag.PPPOED_TAG_VENDOR_SPECIFIC;
+
+import static org.onlab.util.Tools.getIntegerProperty;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
+
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.PPPOE_MAX_MTU;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.PPPOE_MAX_MTU_DEFAULT;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.ENABLE_CIRCUIT_ID_VALIDATION;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.ENABLE_CIRCUIT_ID_VALIDATION_DEFAULT;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.PACKET_PROCESSOR_THREADS;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.PACKET_PROCESSOR_THREADS_DEFAULT;
+
+/**
+ * PPPoE Intermediate Agent application.
+ */
+@Component(immediate = true,
+property = {
+        PPPOE_MAX_MTU + ":Integer=" + PPPOE_MAX_MTU_DEFAULT,
+        ENABLE_CIRCUIT_ID_VALIDATION + ":Boolean=" + ENABLE_CIRCUIT_ID_VALIDATION_DEFAULT,
+        PACKET_PROCESSOR_THREADS + ":Integer=" + PACKET_PROCESSOR_THREADS_DEFAULT,
+})
+public class PppoeAgent
+        extends AbstractListenerManager<PppoeAgentEvent, PppoeAgentListener>
+        implements PppoeAgentService {
+    private static final String APP_NAME = "org.opencord.pppoeagent";
+    private static final short QINQ_VID_NONE = (short) -1;
+
+    private final InternalConfigListener cfgListener = new InternalConfigListener();
+    private final Set<ConfigFactory> factories = ImmutableSet.of(
+            new ConfigFactory<>(APP_SUBJECT_FACTORY, PppoeAgentConfig.class, "pppoeagent") {
+                @Override
+                public PppoeAgentConfig createConfig() {
+                    return new PppoeAgentConfig();
+                }
+            }
+    );
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected CoreService coreService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected NetworkConfigRegistry cfgService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected PacketService packetService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ComponentConfigService componentConfigService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected SadisService sadisService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected DeviceService deviceService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected MastershipService mastershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected PppoeAgentCountersStore pppoeAgentCounters;
+
+    // OSGi Properties
+    protected int pppoeMaxMtu = PPPOE_MAX_MTU_DEFAULT;
+    protected boolean enableCircuitIdValidation = ENABLE_CIRCUIT_ID_VALIDATION_DEFAULT;
+    protected int packetProcessorThreads = PACKET_PROCESSOR_THREADS_DEFAULT;
+
+    private ApplicationId appId;
+    private InnerDeviceListener deviceListener = new InnerDeviceListener();
+    private InnerMastershipListener changeListener = new InnerMastershipListener();
+    private PppoeAgentPacketProcessor pppoeAgentPacketProcessor = new PppoeAgentPacketProcessor();
+    protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
+    private PppoeAgentStoreDelegate delegate = new InnerPppoeAgentStoreDelegate();
+
+    Set<ConnectPoint> pppoeConnectPoints;
+    protected AtomicReference<ConnectPoint> pppoeServerConnectPoint = new AtomicReference<>();
+    protected boolean useOltUplink = false;
+
+    static ConsistentMap<MacAddress, PppoeSessionInfo> sessionsMap;
+
+    @Override
+    public Map<MacAddress, PppoeSessionInfo> getSessionsMap() {
+        return ImmutableMap.copyOf(sessionsMap.asJavaMap());
+    }
+
+    @Override
+    public void clearSessionsMap() {
+        sessionsMap.clear();
+    }
+
+    private final ArrayList<CircuitIdFieldName> circuitIdfields = new ArrayList<>(Arrays.asList(
+            CircuitIdFieldName.AcessNodeIdentifier,
+            CircuitIdFieldName.Slot,
+            CircuitIdFieldName.Port,
+            CircuitIdFieldName.OnuSerialNumber));
+
+    protected ExecutorService packetProcessorExecutor;
+
+    @Activate
+    protected void activate(ComponentContext context) {
+        eventDispatcher.addSink(PppoeAgentEvent.class, listenerRegistry);
+
+        appId = coreService.registerApplication(APP_NAME);
+        cfgService.addListener(cfgListener);
+        componentConfigService.registerProperties(getClass());
+
+        KryoNamespace serializer = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .register(PppoeSessionInfo.class)
+                .register(MacAddress.class)
+                .register(SubscriberAndDeviceInformation.class)
+                .register(UniTagInformation.class)
+                .register(ConnectPoint.class)
+                .build();
+
+        sessionsMap = storageService.<MacAddress, PppoeSessionInfo>consistentMapBuilder()
+                .withName("pppoeagent-sessions")
+                .withSerializer(Serializer.using(serializer))
+                .withApplicationId(appId)
+                .build();
+
+        factories.forEach(cfgService::registerConfigFactory);
+        deviceService.addListener(deviceListener);
+        subsService = sadisService.getSubscriberInfoService();
+        mastershipService.addListener(changeListener);
+        pppoeAgentCounters.setDelegate(delegate);
+        updateConfig();
+        packetService.addProcessor(pppoeAgentPacketProcessor, PacketProcessor.director(0));
+        if (context != null) {
+            modified(context);
+        }
+        log.info("PPPoE Intermediate Agent Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        cfgService.removeListener(cfgListener);
+        factories.forEach(cfgService::unregisterConfigFactory);
+        packetService.removeProcessor(pppoeAgentPacketProcessor);
+        packetProcessorExecutor.shutdown();
+        componentConfigService.unregisterProperties(getClass(), false);
+        deviceService.removeListener(deviceListener);
+        eventDispatcher.removeSink(PppoeAgentEvent.class);
+        pppoeAgentCounters.unsetDelegate(delegate);
+        log.info("PPPoE Intermediate Agent Stopped");
+    }
+
+    private void updateConfig() {
+        PppoeAgentConfig cfg = cfgService.getConfig(appId, PppoeAgentConfig.class);
+        if (cfg == null) {
+            log.warn("PPPoE server info not available");
+            return;
+        }
+
+        synchronized (this) {
+            pppoeConnectPoints = Sets.newConcurrentHashSet(cfg.getPppoeServerConnectPoint());
+        }
+        useOltUplink = cfg.getUseOltUplinkForServerPktInOut();
+    }
+
+    /**
+     * Returns whether the passed port is the uplink port of the olt device.
+     */
+    private boolean isUplinkPortOfOlt(DeviceId dId, Port p) {
+        log.debug("isUplinkPortOfOlt: DeviceId: {} Port: {}", dId, p);
+        if (!mastershipService.isLocalMaster(dId)) {
+            return false;
+        }
+
+        Device d = deviceService.getDevice(dId);
+        SubscriberAndDeviceInformation deviceInfo = subsService.get(d.serialNumber());
+        if (deviceInfo != null) {
+            return (deviceInfo.uplinkPort() == p.number().toLong());
+        }
+
+        return false;
+    }
+
+    /**
+     * Returns the connectPoint which is the uplink port of the OLT.
+     */
+    private ConnectPoint getUplinkConnectPointOfOlt(DeviceId dId) {
+        Device d = deviceService.getDevice(dId);
+        SubscriberAndDeviceInformation deviceInfo = subsService.get(d.serialNumber());
+        log.debug("getUplinkConnectPointOfOlt DeviceId: {} devInfo: {}", dId, deviceInfo);
+        if (deviceInfo != null) {
+            PortNumber pNum = PortNumber.portNumber(deviceInfo.uplinkPort());
+            Port port = deviceService.getPort(d.id(), pNum);
+            if (port != null) {
+                return new ConnectPoint(d.id(), pNum);
+            }
+        }
+        return null;
+    }
+
+    @Modified
+    protected void modified(ComponentContext context) {
+        Dictionary<?, ?> properties = context.getProperties();
+
+        Integer newPppoeMaxMtu = getIntegerProperty(properties, PPPOE_MAX_MTU);
+        if (newPppoeMaxMtu != null) {
+            if (newPppoeMaxMtu != pppoeMaxMtu && newPppoeMaxMtu >= 0) {
+                log.info("PPPPOE MTU modified from {} to {}", pppoeMaxMtu, newPppoeMaxMtu);
+                pppoeMaxMtu = newPppoeMaxMtu;
+            } else if (newPppoeMaxMtu < 0) {
+                log.error("Invalid newPppoeMaxMtu : {}, defaulting to 1492", newPppoeMaxMtu);
+                pppoeMaxMtu = PPPOE_MAX_MTU_DEFAULT;
+            }
+        }
+
+        Boolean newEnableCircuitIdValidation = Tools.isPropertyEnabled(properties, ENABLE_CIRCUIT_ID_VALIDATION);
+        if (newEnableCircuitIdValidation != null) {
+            if (enableCircuitIdValidation != newEnableCircuitIdValidation) {
+                log.info("Property enableCircuitIdValidation modified from {} to {}",
+                        enableCircuitIdValidation, newEnableCircuitIdValidation);
+                enableCircuitIdValidation = newEnableCircuitIdValidation;
+            }
+        }
+
+        String s = Tools.get(properties, PACKET_PROCESSOR_THREADS);
+
+        int oldpacketProcessorThreads = packetProcessorThreads;
+        packetProcessorThreads = Strings.isNullOrEmpty(s) ? oldpacketProcessorThreads
+                : Integer.parseInt(s.trim());
+        if (packetProcessorExecutor == null || oldpacketProcessorThreads != packetProcessorThreads) {
+            if (packetProcessorExecutor != null) {
+                packetProcessorExecutor.shutdown();
+            }
+            packetProcessorExecutor = newFixedThreadPool(packetProcessorThreads,
+                    groupedThreads("onos/pppoe",
+                            "pppoe-packet-%d", log));
+        }
+    }
+
+    /**
+     * Selects a connect point through an available device for which it is the master.
+     */
+    private void selectServerConnectPoint() {
+        synchronized (this) {
+            pppoeServerConnectPoint.set(null);
+            if (pppoeConnectPoints != null) {
+                // find a connect point through a device for which we are master
+                for (ConnectPoint cp: pppoeConnectPoints) {
+                    if (mastershipService.isLocalMaster(cp.deviceId())) {
+                        if (deviceService.isAvailable(cp.deviceId())) {
+                            pppoeServerConnectPoint.set(cp);
+                        }
+                        log.info("PPPOE connectPoint selected is {}", cp);
+                        break;
+                    }
+                }
+            }
+            log.info("PPPOE Server connectPoint is {}", pppoeServerConnectPoint.get());
+            if (pppoeServerConnectPoint.get() == null) {
+                log.error("Master of none, can't relay PPPOE messages to server");
+            }
+        }
+    }
+
+    private SubscriberAndDeviceInformation getSubscriber(ConnectPoint cp) {
+        Port p = deviceService.getPort(cp);
+        String subscriberId = p.annotations().value(AnnotationKeys.PORT_NAME);
+        return subsService.get(subscriberId);
+    }
+
+    private SubscriberAndDeviceInformation getDevice(PacketContext context) {
+        String serialNo = deviceService.getDevice(context.inPacket().
+                receivedFrom().deviceId()).serialNumber();
+
+        return subsService.get(serialNo);
+    }
+
+    private UniTagInformation getUnitagInformationFromPacketContext(PacketContext context,
+                                                                    SubscriberAndDeviceInformation sub) {
+        return sub.uniTagList()
+                .stream()
+                .filter(u -> u.getPonCTag().toShort() == context.inPacket().parsed().getVlanID())
+                .findFirst()
+                .orElse(null);
+    }
+
+    private boolean removeSessionsByConnectPoint(ConnectPoint cp) {
+        boolean removed = false;
+        for (MacAddress key : sessionsMap.keySet()) {
+            PppoeSessionInfo entry = sessionsMap.asJavaMap().get(key);
+            if (entry.getClientCp().equals(cp)) {
+                sessionsMap.remove(key);
+                removed = true;
+            }
+        }
+        return removed;
+    }
+
+    private boolean removeSessionsByDevice(DeviceId devid) {
+        boolean removed = false;
+        for (MacAddress key : sessionsMap.keySet()) {
+            PppoeSessionInfo entry = sessionsMap.asJavaMap().get(key);
+            if (entry.getClientCp().deviceId().equals(devid)) {
+                sessionsMap.remove(key);
+                removed = true;
+            }
+        }
+        return removed;
+    }
+
+    private class PppoeAgentPacketProcessor implements PacketProcessor {
+        @Override
+        public void process(PacketContext context) {
+            packetProcessorExecutor.execute(() -> {
+                processInternal(context);
+            });
+        }
+
+        private void processInternal(PacketContext context) {
+            Ethernet packet = (Ethernet) context.inPacket().parsed().clone();
+            if (packet.getEtherType() == Ethernet.TYPE_PPPOED) {
+                processPppoedPacket(context, packet);
+            }
+        }
+
+        private void processPppoedPacket(PacketContext context, Ethernet packet) {
+            PPPoED pppoed = (PPPoED) packet.getPayload();
+            if (pppoed == null) {
+                log.warn("PPPoED payload is null");
+                return;
+            }
+
+            final byte pppoedCode = pppoed.getCode();
+            final short sessionId = pppoed.getSessionId();
+            final MacAddress clientMacAddress;
+            final ConnectPoint packetCp = context.inPacket().receivedFrom();
+            boolean serverMessage = false;
+
+            // Get the client MAC address
+            switch (pppoedCode) {
+                case PPPOED_CODE_PADT: {
+                    if (sessionsMap.containsKey(packet.getDestinationMAC())) {
+                        clientMacAddress = packet.getDestinationMAC();
+                        serverMessage = true;
+                    } else if (sessionsMap.containsKey(packet.getSourceMAC())) {
+                        clientMacAddress = packet.getSourceMAC();
+                    } else {
+                        // In the unlikely case of receiving a PADT without an existing session
+                        log.warn("PADT received for unknown session. Dropping packet.");
+                        return;
+                    }
+                    break;
+                }
+                case PPPOED_CODE_PADI:
+                case PPPOED_CODE_PADR: {
+                    clientMacAddress = packet.getSourceMAC();
+                    break;
+                }
+                default: {
+                    clientMacAddress = packet.getDestinationMAC();
+                    serverMessage = true;
+                    break;
+                }
+            }
+
+            SubscriberAndDeviceInformation subsInfo;
+            if (serverMessage) {
+                if (!sessionsMap.containsKey(clientMacAddress)) {
+                    log.error("PPPoED message received from server without an existing session. Message not relayed.");
+                    return;
+                } else {
+                    PppoeSessionInfo sessInfo = sessionsMap.get(clientMacAddress).value();
+                    subsInfo = getSubscriber(sessInfo.getClientCp());
+                }
+            } else {
+                subsInfo = getSubscriber(packetCp);
+            }
+
+            if (subsInfo == null) {
+                log.error("No Sadis info for subscriber on connect point {}. Message not relayed.", packetCp);
+                return;
+            }
+
+            log.trace("{} received from {} at {} with client mac: {}",
+                    PPPoED.Type.getTypeByValue(pppoedCode).toString(),
+                    serverMessage ? "server" : "client", packetCp, clientMacAddress);
+
+            if (log.isTraceEnabled()) {
+                log.trace("PPPoED message received from {} at {} {}",
+                        serverMessage ? "server" : "client", packetCp, packet);
+            }
+
+            // In case of PADI, force the removal of the previous session entry
+            if ((pppoedCode == PPPOED_CODE_PADI) && (sessionsMap.containsKey(clientMacAddress))) {
+                log.trace("PADI received from MAC: {} with existing session data. Removing the existing data.",
+                        clientMacAddress.toString());
+                sessionsMap.remove(clientMacAddress);
+            }
+
+            // Fill the session map entry
+            PppoeSessionInfo sessionInfo;
+            if (!sessionsMap.containsKey(clientMacAddress)) {
+                if (!serverMessage)  {
+                    ConnectPoint serverCp = getServerConnectPoint(packetCp.deviceId());
+                    SubscriberAndDeviceInformation subscriber = getSubscriber(packetCp);
+                    sessionInfo = new PppoeSessionInfo(packetCp, serverCp, pppoedCode,
+                            sessionId, subscriber, clientMacAddress);
+                    sessionsMap.put(clientMacAddress, sessionInfo);
+                } else {
+                    // This case was already covered.
+                    return;
+                }
+            } else {
+                sessionInfo = sessionsMap.get(clientMacAddress).value();
+            }
+
+            switch (pppoedCode) {
+                case PPPOED_CODE_PADI:
+                case PPPOED_CODE_PADR:
+                    updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+                            pppoedCode == PPPOED_CODE_PADI ? PppoeAgentCounterNames.PADI : PppoeAgentCounterNames.PADR);
+
+                    Ethernet padir = processPacketFromClient(context, packet, pppoed, sessionInfo, clientMacAddress);
+                    if (padir != null) {
+                        if (padir.serialize().length <= pppoeMaxMtu) {
+                            forwardPacketToServer(padir, sessionInfo);
+                        } else {
+                            log.debug("MTU message size: {} exceeded configured pppoeMaxMtu: {}. Dropping Packet.",
+                                    padir.serialize().length, pppoeMaxMtu);
+                            forwardPacketToClient(errorToClient(packet, pppoed, "MTU message size exceeded"),
+                                                                sessionInfo, clientMacAddress);
+                            updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+                                                          PppoeAgentCounterNames.MTU_EXCEEDED);
+                        }
+                    }
+                    break;
+                case PPPOED_CODE_PADO:
+                case PPPOED_CODE_PADS:
+                    updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+                            pppoedCode == PPPOED_CODE_PADO ? PppoeAgentCounterNames.PADO : PppoeAgentCounterNames.PADS);
+                    Ethernet pados = processPacketFromServer(packet, pppoed, sessionInfo, clientMacAddress);
+                    if (pados != null) {
+                        forwardPacketToClient(pados, sessionInfo, clientMacAddress);
+                    }
+                    break;
+                case PPPOED_CODE_PADT:
+                    if (serverMessage) {
+                        updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+                                                      PppoeAgentCounterNames.PADT_FROM_SERVER);
+                        forwardPacketToClient(packet, sessionInfo, clientMacAddress);
+                    } else {
+                        updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+                                                      PppoeAgentCounterNames.PADT_FROM_CLIENT);
+                        forwardPacketToServer(packet, sessionInfo);
+                    }
+
+                    String reason = "";
+                    PPPoEDTag genericErrorTag = pppoed.getTags()
+                            .stream()
+                            .filter(tag -> tag.getType() == PPPOED_TAG_GENERIC_ERROR)
+                            .findFirst()
+                            .orElse(null);
+
+                    if (genericErrorTag != null) {
+                        reason = new String(genericErrorTag.getValue(), StandardCharsets.UTF_8);
+                    }
+                    log.debug("PADT sessionId:{}  client MAC:{}  Terminate reason:{}.",
+                            Integer.toHexString(sessionId & 0xFFFF), clientMacAddress, reason);
+
+                    boolean knownSessionId = sessionInfo.getSessionId() == sessionId;
+                    if (knownSessionId) {
+                        PppoeSessionInfo removedSessionInfo = Versioned
+                                .valueOrNull(sessionsMap.remove(clientMacAddress));
+                        if (removedSessionInfo != null) {
+                            post(new PppoeAgentEvent(PppoeAgentEvent.Type.TERMINATE, removedSessionInfo,
+                                                     packetCp, clientMacAddress, reason));
+                        }
+                    } else {
+                        log.warn("PADT received for a known MAC address but for unknown session.");
+                    }
+                    break;
+                default:
+                    break;
+            }
+        }
+
+        private Ethernet processPacketFromClient(PacketContext context, Ethernet ethernetPacket, PPPoED pppoed,
+                                                 PppoeSessionInfo sessionInfo, MacAddress clientMacAddress) {
+            byte pppoedCode = pppoed.getCode();
+
+            sessionInfo.setPacketCode(pppoedCode);
+            sessionsMap.put(clientMacAddress, sessionInfo);
+
+            // Update Counters
+            for (PPPoEDTag tag : pppoed.getTags()) {
+                if (tag.getType() == PPPOED_TAG_GENERIC_ERROR) {
+                    updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+                            PppoeAgentCounterNames.GENERIC_ERROR_FROM_CLIENT);
+                    break;
+                }
+            }
+
+            Ethernet ethFwd = ethernetPacket;
+
+            // If this is a PADI packet, there'll be a START event.
+            if (pppoedCode == PPPOED_CODE_PADI) {
+                post(new PppoeAgentEvent(PppoeAgentEvent.Type.START, sessionInfo, sessionInfo.getClientCp(),
+                        clientMacAddress));
+            }
+
+            // Creates the vendor specific tag.
+            String circuitId = getCircuitId(sessionInfo.getClientCp());
+            if (circuitId == null) {
+                log.error("Failed to build circuid-id for client on connect point {}. Dropping packet.",
+                        sessionInfo.getClientCp());
+                return null;
+            }
+
+            // Checks whether the circuit-id is valid, if it's not it drops the packet.
+            if (!isCircuitIdValid(circuitId, sessionInfo.getSubscriber())) {
+                log.warn("Invalid circuit ID, dropping packet.");
+                PppoeAgentEvent invalidCidEvent = new PppoeAgentEvent(PppoeAgentEvent.Type.INVALID_CID, sessionInfo,
+                        context.inPacket().receivedFrom(), clientMacAddress);
+                post(invalidCidEvent);
+                return null;
+            }
+
+            String remoteId = sessionInfo.getSubscriber().remoteId();
+            byte[] vendorSpecificTag = new PPPoEDVendorSpecificTag(circuitId, remoteId).toByteArray();
+
+            // According to TR-101, R-149 (by Broadband Forum), agent must REPLACE vendor-specific tag that may come
+            // from client message with its own tag.
+            // The following block ensures that agent removes any previous vendor-specific tag.
+            List<PPPoEDTag> originalTags = pppoed.getTags();
+            if (originalTags != null) {
+                PPPoEDTag originalVendorSpecificTag = originalTags.stream()
+                        .filter(tag -> tag.getType() == PPPOED_TAG_VENDOR_SPECIFIC)
+                        .findFirst()
+                        .orElse(null);
+
+                if (originalVendorSpecificTag != null) {
+                    int tagToRemoveLength = originalVendorSpecificTag.getLength();
+                    originalTags.removeIf(tag -> tag.getType() == PPPOED_TAG_VENDOR_SPECIFIC);
+                    pppoed.setPayloadLength((short) (pppoed.getPayloadLength() - tagToRemoveLength));
+                }
+            }
+
+            pppoed.setTag(PPPOED_TAG_VENDOR_SPECIFIC, vendorSpecificTag);
+
+            ethFwd.setPayload(pppoed);
+            ethFwd.setQinQTPID(Ethernet.TYPE_VLAN);
+
+            UniTagInformation uniTagInformation = getUnitagInformationFromPacketContext(context,
+                    sessionInfo.getSubscriber());
+            if (uniTagInformation == null) {
+                log.warn("Missing service information for connectPoint {} / cTag {}",
+                        context.inPacket().receivedFrom(),  context.inPacket().parsed().getVlanID());
+                return null;
+            }
+            ethFwd.setQinQVID(uniTagInformation.getPonSTag().toShort());
+            ethFwd.setPad(true);
+            return ethFwd;
+        }
+
+        private Ethernet processPacketFromServer(Ethernet ethernetPacket, PPPoED pppoed,
+                                                 PppoeSessionInfo sessionInfo, MacAddress clientMacAddress) {
+            // Update counters
+            List<PPPoEDTag> tags = pppoed.getTags();
+            for (PPPoEDTag tag : tags) {
+                switch (tag.getType()) {
+                    case PPPOED_TAG_GENERIC_ERROR:
+                        updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+                                PppoeAgentCounterNames.GENERIC_ERROR_FROM_SERVER);
+                        break;
+                    case PPPOED_TAG_SERVICE_NAME_ERROR:
+                        updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+                                PppoeAgentCounterNames.SERVICE_NAME_ERROR);
+                        break;
+                    case PPPOED_TAG_AC_SYSTEM_ERROR:
+                        updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+                                PppoeAgentCounterNames.AC_SYSTEM_ERROR);
+                        break;
+                    default:
+                        break;
+                }
+            }
+
+            byte pppoedCode = pppoed.getCode();
+
+            if (pppoedCode == PPPOED_CODE_PADS) {
+                log.debug("PADS sessionId:{}  client MAC:{}", Integer.toHexString(pppoed.getSessionId() & 0xFFFF),
+                        clientMacAddress);
+                sessionInfo.setSessionId(pppoed.getSessionId());
+            }
+            sessionInfo.setPacketCode(pppoedCode);
+            sessionsMap.put(clientMacAddress, sessionInfo);
+
+            PppoeAgentEvent.Type eventType = pppoedCode == PPPOED_CODE_PADS ?
+                    PppoeAgentEvent.Type.SESSION_ESTABLISHED :
+                    PppoeAgentEvent.Type.NEGOTIATION;
+
+            post(new PppoeAgentEvent(eventType, sessionInfo, sessionInfo.getClientCp(), clientMacAddress));
+
+            ethernetPacket.setQinQVID(QINQ_VID_NONE);
+            ethernetPacket.setPad(true);
+            return ethernetPacket;
+        }
+
+        private void updatePppoeAgentCountersStore(SubscriberAndDeviceInformation sub,
+                                                   PppoeAgentCounterNames counterType) {
+            // Update global counter stats
+            pppoeAgentCounters.incrementCounter(PppoeAgentEvent.GLOBAL_COUNTER, counterType);
+            if (sub == null) {
+                log.warn("Counter not updated as subscriber info not found.");
+            } else {
+                // Update subscriber counter stats
+                pppoeAgentCounters.incrementCounter(sub.id(), counterType);
+            }
+        }
+
+        private String getCircuitId(ConnectPoint cp) {
+            return new CircuitIdBuilder()
+                    .setConnectPoint(cp)
+                    .setDeviceService(deviceService)
+                    .setSubsService(subsService)
+                    .setCircuitIdConfig(circuitIdfields)
+                    .addCustomSeparator(CircuitIdFieldName.AcessNodeIdentifier, " ")
+                    .addCustomSeparator(CircuitIdFieldName.Port, ":")
+                    .build();
+        }
+
+        protected ConnectPoint getServerConnectPoint(DeviceId deviceId) {
+            ConnectPoint serverCp;
+            if (!useOltUplink) {
+                serverCp = pppoeServerConnectPoint.get();
+            } else {
+                serverCp = getUplinkConnectPointOfOlt(deviceId);
+            }
+            return serverCp;
+        }
+
+        private boolean isCircuitIdValid(String cId, SubscriberAndDeviceInformation entry) {
+            if (!enableCircuitIdValidation) {
+                log.debug("Circuit ID validation is disabled.");
+                return true;
+            }
+
+            if (entry == null) {
+                log.error("SubscriberAndDeviceInformation cannot be null.");
+                return false;
+            }
+
+            if (entry.circuitId() == null || entry.circuitId().isEmpty()) {
+                log.debug("Circuit ID not configured in SADIS entry. No check is done.");
+                return true;
+            } else {
+                if (cId.equals(entry.circuitId())) {
+                    log.info("Circuit ID in packet: {} matched the configured entry on SADIS.", cId);
+                    return true;
+                } else {
+                    log.warn("Circuit ID in packet: {} did not match the configured entry on SADIS: {}.",
+                            cId, entry.circuitId());
+                    return false;
+                }
+            }
+        }
+
+        private void forwardPacketToServer(Ethernet packet, PppoeSessionInfo sessionInfo) {
+            ConnectPoint toSendTo = sessionInfo.getServerCp();
+            if (toSendTo != null) {
+                log.info("Sending PPPOE packet to server at {}", toSendTo);
+                TrafficTreatment t = DefaultTrafficTreatment.builder().setOutput(toSendTo.port()).build();
+                OutboundPacket o = new DefaultOutboundPacket(toSendTo.deviceId(), t,
+                        ByteBuffer.wrap(packet.serialize()));
+                if (log.isTraceEnabled()) {
+                    log.trace("Relaying packet to pppoe server at {} {}", toSendTo, packet);
+                }
+                packetService.emit(o);
+
+                updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+                        PppoeAgentCounterNames.PPPOED_PACKETS_TO_SERVER);
+            } else {
+                log.error("No connect point to send msg to PPPOE Server");
+            }
+        }
+
+        private void forwardPacketToClient(Ethernet packet, PppoeSessionInfo sessionInfo, MacAddress clientMacAddress) {
+            ConnectPoint subCp = sessionInfo.getClientCp();
+            if (subCp == null) {
+                log.error("Dropping PPPOE packet, can't find a connectpoint for MAC {}", clientMacAddress);
+                return;
+            }
+
+            log.info("Sending PPPOE packet to client at {}", subCp);
+            TrafficTreatment t = DefaultTrafficTreatment.builder()
+                    .setOutput(subCp.port()).build();
+            OutboundPacket o = new DefaultOutboundPacket(
+                    subCp.deviceId(), t, ByteBuffer.wrap(packet.serialize()));
+            if (log.isTraceEnabled()) {
+                log.trace("Relaying packet to pppoe client at {} {}", subCp, packet);
+            }
+
+            packetService.emit(o);
+
+            updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+                    PppoeAgentCounterNames.PPPOED_PACKETS_FROM_SERVER);
+        }
+
+        private Ethernet errorToClient(Ethernet packet, PPPoED p, String errString) {
+            PPPoED err = new PPPoED();
+            err.setVersion(p.getVersion());
+            err.setType(p.getType());
+            switch (p.getCode()) {
+                case PPPOED_CODE_PADI:
+                    err.setCode(PPPOED_CODE_PADO);
+                    break;
+                case PPPOED_CODE_PADR:
+                    err.setCode(PPPOED_CODE_PADS);
+                    break;
+                default:
+                    break;
+            }
+            err.setCode(p.getCode());
+            err.setSessionId(p.getSessionId());
+            err.setTag(PPPOED_TAG_GENERIC_ERROR, errString.getBytes(StandardCharsets.UTF_8));
+
+            Ethernet ethPacket = new Ethernet();
+            ethPacket.setPayload(err);
+            ethPacket.setSourceMACAddress(packet.getDestinationMACAddress());
+            ethPacket.setDestinationMACAddress(packet.getSourceMACAddress());
+            ethPacket.setQinQVID(QINQ_VID_NONE);
+            ethPacket.setPad(true);
+
+            return ethPacket;
+        }
+    }
+
+    private class InternalConfigListener implements NetworkConfigListener {
+        @Override
+        public void event(NetworkConfigEvent event) {
+
+            if ((event.type() == NetworkConfigEvent.Type.CONFIG_ADDED ||
+                    event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED) &&
+                    event.configClass().equals(PppoeAgentConfig.class)) {
+                updateConfig();
+                log.info("Reconfigured");
+            }
+        }
+    }
+
+    /**
+     * Handles Mastership changes for the devices which connect to the PPPOE server.
+     */
+    private class InnerMastershipListener implements MastershipListener {
+        @Override
+        public void event(MastershipEvent event) {
+            if (!useOltUplink) {
+                if (pppoeServerConnectPoint.get() != null &&
+                        pppoeServerConnectPoint.get().deviceId().equals(event.subject())) {
+                    log.trace("Mastership Event recevived for {}", event.subject());
+                    // mastership of the device for our connect point has changed, reselect
+                    selectServerConnectPoint();
+                }
+            }
+        }
+    }
+
+    private class InnerDeviceListener implements DeviceListener {
+        @Override
+        public void event(DeviceEvent event) {
+            DeviceId devId = event.subject().id();
+
+            if (log.isTraceEnabled() && !event.type().equals(DeviceEvent.Type.PORT_STATS_UPDATED)) {
+                log.trace("Device Event received for {} event {}", event.subject(), event.type());
+            }
+
+            // Handle events from any other device
+            switch (event.type()) {
+                case PORT_UPDATED:
+                    if (!event.port().isEnabled()) {
+                        ConnectPoint cp = new ConnectPoint(devId, event.port().number());
+                        removeSessionsByConnectPoint(cp);
+                    }
+                    break;
+                case PORT_REMOVED:
+                    // Remove all entries related to this port from sessions map
+                    ConnectPoint cp = new ConnectPoint(devId, event.port().number());
+                    removeSessionsByConnectPoint(cp);
+                    break;
+                case DEVICE_REMOVED:
+                    // Remove all entries related to this device from sessions map
+                    removeSessionsByDevice(devId);
+                    break;
+                default:
+                    break;
+            }
+        }
+    }
+
+    private class InnerPppoeAgentStoreDelegate implements PppoeAgentStoreDelegate {
+        @Override
+        public void notify(PppoeAgentEvent event) {
+            if (event.type().equals(PppoeAgentEvent.Type.STATS_UPDATE)) {
+                PppoeAgentEvent toPost = event;
+                if (event.getSubscriberId() != null) {
+                    // infuse the event with the allocation info before posting
+                    PppoeSessionInfo info = Versioned.valueOrNull(
+                            sessionsMap.stream().filter(entry -> event.getSubscriberId()
+                                    .equals(entry.getValue().value().getSubscriber().id()))
+                                    .map(Map.Entry::getValue)
+                                    .findFirst()
+                                    .orElse(null));
+                    if (info == null) {
+                        log.debug("Not handling STATS_UPDATE event for session that no longer exists. {}.", event);
+                        return;
+                    }
+
+                    toPost = new PppoeAgentEvent(event.type(), info, event.getCounterName(), event.getCounterValue(),
+                                                 info.getClientMac(), event.getSubscriberId());
+                }
+                post(toPost);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentConfig.java b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentConfig.java
new file mode 100644
index 0000000..5d14c76
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentConfig.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.impl;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.google.common.collect.ImmutableSet;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.config.Config;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class PppoeAgentConfig extends Config<ApplicationId> {
+    private static final String PPPOE_CONNECT_POINTS = "pppoeServerConnectPoints";
+    private static final String USE_OLT_ULPORT_FOR_PKT_INOUT = "useOltUplinkForServerPktInOut";
+
+    protected static final Boolean DEFAULT_USE_OLT_ULPORT_FOR_PKT_INOUT = true;
+
+    @Override
+    public boolean isValid() {
+        return hasOnlyFields(PPPOE_CONNECT_POINTS, USE_OLT_ULPORT_FOR_PKT_INOUT);
+    }
+
+    /**
+     * Returns whether the app would use the uplink port of OLT for sending/receving
+     * messages to/from the server.
+     *
+     * @return true if OLT uplink port is to be used, false otherwise
+     */
+    public boolean getUseOltUplinkForServerPktInOut() {
+        if (object == null) {
+            return DEFAULT_USE_OLT_ULPORT_FOR_PKT_INOUT;
+        }
+        if (!object.has(USE_OLT_ULPORT_FOR_PKT_INOUT)) {
+            return DEFAULT_USE_OLT_ULPORT_FOR_PKT_INOUT;
+        }
+        return object.path(USE_OLT_ULPORT_FOR_PKT_INOUT).asBoolean();
+    }
+
+    /**
+     * Returns the pppoe server connect points.
+     *
+     * @return pppoe server connect points
+     */
+    public Set<ConnectPoint> getPppoeServerConnectPoint() {
+        if (object == null) {
+            return new HashSet<ConnectPoint>();
+        }
+
+        if (!object.has(PPPOE_CONNECT_POINTS)) {
+            return ImmutableSet.of();
+        }
+
+        ImmutableSet.Builder<ConnectPoint> builder = ImmutableSet.builder();
+        ArrayNode arrayNode = (ArrayNode) object.path(PPPOE_CONNECT_POINTS);
+        for (JsonNode jsonNode : arrayNode) {
+            String portName = jsonNode.asText(null);
+            if (portName == null) {
+                return null;
+            }
+            try {
+                builder.add(ConnectPoint.deviceConnectPoint(portName));
+            } catch (IllegalArgumentException e) {
+                return null;
+            }
+        }
+        return builder.build();
+    }
+
+
+}
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCounterNames.java b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCounterNames.java
new file mode 100644
index 0000000..0d58078
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCounterNames.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.opencord.pppoeagent.impl;
+
+import com.google.common.collect.ImmutableSet;
+
+import java.util.Set;
+
+/**
+ * Represents PPPoE agent counters type.
+ */
+public enum PppoeAgentCounterNames {
+    /**
+     * Number of PADI messages received from client.
+     */
+    PADI,
+    /**
+     * Number of PADO messages received from server.
+     */
+    PADO,
+    /**
+     * Number of PADR messages received from client.
+     */
+    PADR,
+    /**
+     * Number of PADS messages received from server.
+     */
+    PADS,
+    /**
+     * Number of PADT messages received from server.
+     */
+    PADT_FROM_SERVER,
+    /**
+     * Number of PADT messages received from client.
+     */
+    PADT_FROM_CLIENT,
+    /**
+     * Number of PPPoED messages sent to server.
+     */
+    PPPOED_PACKETS_TO_SERVER,
+    /**
+     * Number  of PPPoED messages received from server.
+     */
+    PPPOED_PACKETS_FROM_SERVER,
+    /**
+     * Number of MTU Exceeded errors generated by the PPPoED agent.
+     */
+    MTU_EXCEEDED,
+    /**
+     * Number of Generic Errors received from server.
+     */
+    GENERIC_ERROR_FROM_SERVER,
+    /**
+     * Number of Generic Errors received from client.
+     */
+    GENERIC_ERROR_FROM_CLIENT,
+    /**
+     * Number of ServiceName Errors received from server.
+     */
+    SERVICE_NAME_ERROR,
+    /**
+     * Number of AC-System Errors received from server.
+     */
+    AC_SYSTEM_ERROR;
+
+    /**
+     * Supported types of PPPoED agent counters.
+     */
+    public  static final Set<PppoeAgentCounterNames> SUPPORTED_COUNTERS = ImmutableSet.of(
+            PppoeAgentCounterNames.PADI, PppoeAgentCounterNames.PADO,
+            PppoeAgentCounterNames.PADR, PppoeAgentCounterNames.PADS,
+            PppoeAgentCounterNames.PADT_FROM_SERVER, PppoeAgentCounterNames.PADT_FROM_CLIENT,
+            PppoeAgentCounterNames.PPPOED_PACKETS_TO_SERVER, PppoeAgentCounterNames.PPPOED_PACKETS_FROM_SERVER,
+            PppoeAgentCounterNames.MTU_EXCEEDED, PppoeAgentCounterNames.GENERIC_ERROR_FROM_SERVER,
+            PppoeAgentCounterNames.GENERIC_ERROR_FROM_CLIENT, PppoeAgentCounterNames.SERVICE_NAME_ERROR,
+            PppoeAgentCounterNames.AC_SYSTEM_ERROR);
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCountersIdentifier.java b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCountersIdentifier.java
new file mode 100644
index 0000000..cca163f
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCountersIdentifier.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.impl;
+
+import org.opencord.pppoeagent.PppoeAgentEvent;
+
+import java.util.Objects;
+
+/**
+ * Represents PPPoED agent counters identifier.
+ */
+public final class PppoeAgentCountersIdentifier {
+    final String counterClassKey;
+    final Enum<PppoeAgentCounterNames> counterTypeKey;
+
+    /**
+     * Creates a default global counter identifier for a given counterType.
+     *
+     * @param counterTypeKey Identifies the supported type of pppoe agent counters
+     */
+    public PppoeAgentCountersIdentifier(PppoeAgentCounterNames counterTypeKey) {
+        this.counterClassKey = PppoeAgentEvent.GLOBAL_COUNTER;
+        this.counterTypeKey = counterTypeKey;
+    }
+
+    /**
+     * Creates a counter identifier. A counter is defined by the key pair [counterClass, counterType],
+     * where counterClass can be global or the subscriber ID and counterType is the supported pppoe counter.
+     *
+     * @param counterClassKey Identifies which class the counter is assigned (global or per subscriber)
+     * @param counterTypeKey Identifies the supported type of pppoed relay counters
+     */
+    public PppoeAgentCountersIdentifier(String counterClassKey, PppoeAgentCounterNames counterTypeKey) {
+        this.counterClassKey = counterClassKey;
+        this.counterTypeKey = counterTypeKey;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj instanceof PppoeAgentCountersIdentifier) {
+            final PppoeAgentCountersIdentifier other = (PppoeAgentCountersIdentifier) obj;
+            return Objects.equals(this.counterClassKey, other.counterClassKey)
+                    && Objects.equals(this.counterTypeKey, other.counterTypeKey);
+        }
+
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(counterClassKey, counterTypeKey);
+    }
+
+    @Override
+    public String toString() {
+        return "PppoeAgentCountersIdentifier{" +
+                "counterClassKey='" + counterClassKey + '\'' +
+                ", counterTypeKey=" + counterTypeKey +
+                '}';
+    }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCountersStore.java b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCountersStore.java
new file mode 100644
index 0000000..5b4ddfd
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCountersStore.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.pppoeagent.impl;
+
+import org.onosproject.store.Store;
+import org.opencord.pppoeagent.PppoeAgentEvent;
+import org.opencord.pppoeagent.PppoeAgentStoreDelegate;
+
+/**
+ * Represents a stored Pppoe Agent Counters. A counter entry is defined by the pair [counterClass, counterType],
+ * where counterClass can be maybe global or subscriber ID and counterType is the pppoe counter.
+ */
+public interface PppoeAgentCountersStore extends Store<PppoeAgentEvent, PppoeAgentStoreDelegate> {
+
+    String NAME = "PPPOE_Agent_stats";
+
+    /**
+     * Creates or updates PPPOE Agent counter.
+     *
+     * @param counterClass class of counters (global, per subscriber).
+     * @param counterType name of counter
+     */
+    void incrementCounter(String counterClass, PppoeAgentCounterNames counterType);
+
+    /**
+     * Sets the value of a PPPOE Agent counter.
+     *
+     * @param counterClass class of counters (global, per subscriber).
+     * @param counterType name of counter
+     * @param value The value of the counter
+     */
+    void setCounter(String counterClass, PppoeAgentCounterNames counterType, Long value);
+
+    /**
+     * Gets the current PPPoE Agent counter values.
+     *
+     * @return PPPoE Agent counter values
+     */
+    PppoeAgentStatistics getCounters();
+
+    /**
+     * Resets counter values for a given counter class.
+     *
+     * @param counterClass class of counters (global, per subscriber).
+     */
+    void resetCounters(String counterClass);
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentStatistics.java b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentStatistics.java
new file mode 100644
index 0000000..dce1ac7
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentStatistics.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.impl;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Snapshot of PPPoE Agent statistics.
+ */
+public class PppoeAgentStatistics {
+    private final ImmutableMap<PppoeAgentCountersIdentifier, Long> counters;
+    private PppoeAgentStatistics(ImmutableMap<PppoeAgentCountersIdentifier, Long> counters) {
+        this.counters = counters;
+    }
+    /**
+     * Creates a new empty statistics instance.
+     */
+    public PppoeAgentStatistics() {
+        counters = ImmutableMap.of();
+    }
+    /**
+     * Gets the value of the counter with the given ID. Defaults to 0 if counter is not present.
+     *
+     * @param id counter ID
+     * @return counter value
+     */
+    public long get(PppoeAgentCountersIdentifier id) {
+        return counters.getOrDefault(id, 0L);
+    }
+    /**
+     * Gets the map of counters.
+     *
+     * @return map of counters
+     */
+    public Map<PppoeAgentCountersIdentifier, Long> counters() {
+        return counters;
+    }
+    /**
+     * Creates a new statistics instance with the given counter values.
+     *
+     * @param counters counters
+     * @return statistics
+     */
+    public static PppoeAgentStatistics withCounters(Map<PppoeAgentCountersIdentifier, Long> counters) {
+        ImmutableMap.Builder<PppoeAgentCountersIdentifier, Long> builder = ImmutableMap.builder();
+        counters.forEach(builder::put);
+        return new PppoeAgentStatistics(builder.build());
+    }
+    /**
+     * Adds the given statistics instance to this one (sums the common counters) and returns
+     * a new instance containing the result.
+     *
+     * @param other other instance
+     * @return result
+     */
+    public PppoeAgentStatistics add(PppoeAgentStatistics other) {
+        ImmutableMap.Builder<PppoeAgentCountersIdentifier, Long> builder = ImmutableMap.builder();
+        Set<PppoeAgentCountersIdentifier> keys = Sets.newHashSet(other.counters.keySet());
+        counters.forEach((id, value) -> {
+            builder.put(id, value + other.counters.getOrDefault(id, 0L));
+            keys.remove(id);
+        });
+        keys.forEach(i -> builder.put(i, other.counters.get(i)));
+        return new PppoeAgentStatistics(builder.build());
+    }
+    @Override
+    public String toString() {
+        MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this.getClass());
+        counters.forEach((id, v) -> helper.add(id.toString(), v));
+        return helper.toString();
+    }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/SimplePppoeAgentCountersStore.java b/app/src/main/java/org/opencord/pppoeagent/impl/SimplePppoeAgentCountersStore.java
new file mode 100644
index 0000000..8be88b7
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/SimplePppoeAgentCountersStore.java
@@ -0,0 +1,305 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.impl;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.SafeRecurringTask;
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
+import org.opencord.pppoeagent.PppoeAgentEvent;
+import org.opencord.pppoeagent.PppoeAgentStoreDelegate;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+
+import org.slf4j.Logger;
+import java.nio.charset.StandardCharsets;
+
+import java.util.Dictionary;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.PUBLISH_COUNTERS_RATE;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.PUBLISH_COUNTERS_RATE_DEFAULT;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.SYNC_COUNTERS_RATE;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.SYNC_COUNTERS_RATE_DEFAULT;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * PPPoE Agent Counters Manager Component.
+ */
+@Component(immediate = true,
+        property = {
+                PUBLISH_COUNTERS_RATE + ":Integer=" + PUBLISH_COUNTERS_RATE_DEFAULT,
+                SYNC_COUNTERS_RATE + ":Integer=" + SYNC_COUNTERS_RATE_DEFAULT,
+        }
+)
+public class SimplePppoeAgentCountersStore extends AbstractStore<PppoeAgentEvent, PppoeAgentStoreDelegate>
+        implements PppoeAgentCountersStore {
+    private static final String PPPOE_STATISTICS_LEADERSHIP = "pppoeagent-statistics";
+    private static final MessageSubject RESET_SUBJECT = new MessageSubject("pppoeagent-statistics-reset");
+
+    private final Logger log = getLogger(getClass());
+    private ConcurrentMap<PppoeAgentCountersIdentifier, Long> countersMap;
+
+    private EventuallyConsistentMap<NodeId, PppoeAgentStatistics> statistics;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StorageService storageService;
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterService clusterService;
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected LeadershipService leadershipService;
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ComponentConfigService componentConfigService;
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterCommunicationService clusterCommunicationService;
+    protected int publishCountersRate = PUBLISH_COUNTERS_RATE_DEFAULT;
+    protected int syncCountersRate = SYNC_COUNTERS_RATE_DEFAULT;
+    KryoNamespace serializer = KryoNamespace.newBuilder()
+            .register(KryoNamespaces.API)
+            .register(PppoeAgentStatistics.class)
+            .register(PppoeAgentCountersIdentifier.class)
+            .register(PppoeAgentCounterNames.class)
+            .register(ClusterMessage.class)
+            .register(MessageSubject.class)
+            .build();
+    private ScheduledExecutorService executor;
+    private ScheduledFuture<?> publisherTask;
+    private ScheduledFuture<?> syncTask;
+    private AtomicBoolean dirty = new AtomicBoolean(true);
+
+    @Activate
+    public void activate(ComponentContext context) {
+        log.info("Activate PPPoE Agent Counters Manager");
+        countersMap = new ConcurrentHashMap<>();
+        componentConfigService.registerProperties(getClass());
+        modified(context);
+        statistics = storageService.<NodeId, PppoeAgentStatistics>eventuallyConsistentMapBuilder()
+                .withName("pppoeagent-statistics")
+                .withSerializer(serializer)
+                .withTimestampProvider((k, v) -> new WallClockTimestamp())
+                .build();
+        // Initialize counter values for the global counters
+        initCounters(PppoeAgentEvent.GLOBAL_COUNTER, statistics.get(clusterService.getLocalNode().id()));
+        syncStats();
+        leadershipService.runForLeadership(PPPOE_STATISTICS_LEADERSHIP);
+        executor = Executors.newScheduledThreadPool(1);
+        clusterCommunicationService.addSubscriber(RESET_SUBJECT, Serializer.using(serializer)::decode,
+                this::resetLocal, executor);
+        startSyncTask();
+        startPublishTask();
+    }
+
+    @Deactivate
+    public void deactivate() {
+        clusterCommunicationService.removeSubscriber(RESET_SUBJECT);
+        leadershipService.withdraw(PPPOE_STATISTICS_LEADERSHIP);
+        stopPublishTask();
+        stopSyncTask();
+        executor.shutdownNow();
+        componentConfigService.unregisterProperties(getClass(), false);
+    }
+    @Modified
+    public void modified(ComponentContext context) {
+        Dictionary<String, Object> properties = context.getProperties();
+        String s = Tools.get(properties, PUBLISH_COUNTERS_RATE);
+        int oldPublishCountersRate = publishCountersRate;
+        publishCountersRate = Strings.isNullOrEmpty(s) ? PUBLISH_COUNTERS_RATE_DEFAULT
+                : Integer.parseInt(s.trim());
+        if (oldPublishCountersRate != publishCountersRate) {
+            stopPublishTask();
+            startPublishTask();
+        }
+        s = Tools.get(properties, SYNC_COUNTERS_RATE);
+        int oldSyncCountersRate = syncCountersRate;
+        syncCountersRate = Strings.isNullOrEmpty(s) ? SYNC_COUNTERS_RATE_DEFAULT
+                : Integer.parseInt(s.trim());
+        if (oldSyncCountersRate != syncCountersRate) {
+            stopSyncTask();
+            startSyncTask();
+        }
+    }
+    private ScheduledFuture<?> startTask(Runnable r, int rate) {
+        return executor.scheduleAtFixedRate(SafeRecurringTask.wrap(r),
+                0, rate, TimeUnit.SECONDS);
+    }
+    private void stopTask(ScheduledFuture<?> task) {
+        task.cancel(true);
+    }
+    private void startSyncTask() {
+        syncTask = startTask(this::syncStats, syncCountersRate);
+    }
+    private void stopSyncTask() {
+        stopTask(syncTask);
+    }
+    private void startPublishTask() {
+        publisherTask = startTask(this::publishStats, publishCountersRate);
+    }
+    private void stopPublishTask() {
+        stopTask(publisherTask);
+    }
+
+    ImmutableMap<PppoeAgentCountersIdentifier, Long> getCountersMap() {
+        return ImmutableMap.copyOf(countersMap);
+    }
+
+    public PppoeAgentStatistics getCounters() {
+        return aggregate();
+    }
+
+    /**
+     * Initialize the supported counters map for the given counter class.
+     * @param counterClass class of counters (global, per subscriber)
+     * @param existingStats existing values to intialise the counters to
+     */
+    public void initCounters(String counterClass, PppoeAgentStatistics existingStats) {
+        checkNotNull(counterClass, "counter class can't be null");
+        for (PppoeAgentCounterNames counterType : PppoeAgentCounterNames.SUPPORTED_COUNTERS) {
+            PppoeAgentCountersIdentifier id = new PppoeAgentCountersIdentifier(counterClass, counterType);
+            countersMap.put(id, existingStats == null ? 0L : existingStats.get(id));
+        }
+    }
+
+    /**
+     * Inserts the counter entry if it is not already in the set otherwise increment the existing counter entry.
+     * @param counterClass class of counters (global, per subscriber)
+     * @param counterType name of counter
+     */
+    public void incrementCounter(String counterClass, PppoeAgentCounterNames counterType) {
+        checkNotNull(counterClass, "counter class can't be null");
+        if (PppoeAgentCounterNames.SUPPORTED_COUNTERS.contains(counterType)) {
+            PppoeAgentCountersIdentifier counterIdentifier =
+                    new PppoeAgentCountersIdentifier(counterClass, counterType);
+            countersMap.compute(counterIdentifier, (key, counterValue) ->
+                    (counterValue != null) ? counterValue + 1 : 1L);
+        } else {
+            log.error("Failed to increment counter class {} of type {}", counterClass, counterType);
+        }
+        dirty.set(true);
+    }
+
+    /**
+     * Reset the counters map for the given counter class.
+     * @param counterClass class of counters (global, per subscriber)
+     */
+    public void resetCounters(String counterClass) {
+        byte[] payload = counterClass.getBytes(StandardCharsets.UTF_8);
+        ClusterMessage reset = new ClusterMessage(clusterService.getLocalNode().id(), RESET_SUBJECT, payload);
+        clusterCommunicationService.broadcastIncludeSelf(reset, RESET_SUBJECT, Serializer.using(serializer)::encode);
+    }
+    private void resetLocal(ClusterMessage m) {
+        String counterClass = new String(m.payload(), StandardCharsets.UTF_8);
+        checkNotNull(counterClass, "counter class can't be null");
+        for (PppoeAgentCounterNames counterType : PppoeAgentCounterNames.SUPPORTED_COUNTERS) {
+            PppoeAgentCountersIdentifier counterIdentifier =
+                    new PppoeAgentCountersIdentifier(counterClass, counterType);
+            countersMap.computeIfPresent(counterIdentifier, (key, counterValue) -> 0L);
+        }
+        dirty.set(true);
+        syncStats();
+    }
+
+    /**
+     * Inserts the counter entry if it is not already in the set otherwise update the existing counter entry.
+     * @param counterClass class of counters (global, per subscriber).
+     * @param counterType name of counter
+     * @param value counter value
+     */
+    public void setCounter(String counterClass, PppoeAgentCounterNames counterType, Long value) {
+        checkNotNull(counterClass, "counter class can't be null");
+        if (PppoeAgentCounterNames.SUPPORTED_COUNTERS.contains(counterType)) {
+            PppoeAgentCountersIdentifier counterIdentifier =
+                    new PppoeAgentCountersIdentifier(counterClass, counterType);
+            countersMap.put(counterIdentifier, value);
+        } else {
+            log.error("Failed to increment counter class {} of type {}", counterClass, counterType);
+        }
+        dirty.set(true);
+        syncStats();
+    }
+
+    private PppoeAgentStatistics aggregate() {
+        return statistics.values().stream()
+                .reduce(new PppoeAgentStatistics(), PppoeAgentStatistics::add);
+    }
+    /**
+     * Creates a snapshot of the current in-memory statistics.
+     *
+     * @return snapshot of statistics
+     */
+    private PppoeAgentStatistics snapshot() {
+        return PppoeAgentStatistics.withCounters(countersMap);
+    }
+    /**
+     * Syncs in-memory stats to the eventually consistent map.
+     */
+    private void syncStats() {
+        if (dirty.get()) {
+            statistics.put(clusterService.getLocalNode().id(), snapshot());
+            dirty.set(false);
+        }
+    }
+    private void publishStats() {
+        // Only publish events if we are the cluster leader for PPPoE Agent stats
+        if (!Objects.equals(leadershipService.getLeader(PPPOE_STATISTICS_LEADERSHIP),
+                clusterService.getLocalNode().id())) {
+            return;
+        }
+        if (aggregate().counters() != null) {
+            aggregate().counters().forEach((counterKey, counterValue) -> {
+                // Subscriber-specific counters have the subscriber ID set
+                String subscriberId = null;
+                if (!counterKey.counterClassKey.equals(PppoeAgentEvent.GLOBAL_COUNTER)) {
+                    subscriberId = counterKey.counterClassKey;
+                }
+                if (delegate != null) {
+                    delegate.notify(new PppoeAgentEvent(PppoeAgentEvent.Type.STATS_UPDATE, null,
+                                                        counterKey.counterTypeKey.toString(), counterValue,
+                                       null, subscriberId));
+                }
+            });
+        } else {
+            log.debug("Ignoring 'publishStats' request since counters are null.");
+        }
+    }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/package-info.java b/app/src/main/java/org/opencord/pppoeagent/impl/package-info.java
new file mode 100644
index 0000000..84ceb21
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * PPPoE agent implementation.
+ */
+package org.opencord.pppoeagent.impl;
diff --git a/app/src/main/java/org/opencord/pppoeagent/rest/PppoeAgentWebResource.java b/app/src/main/java/org/opencord/pppoeagent/rest/PppoeAgentWebResource.java
new file mode 100644
index 0000000..56d5598
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/rest/PppoeAgentWebResource.java
@@ -0,0 +1,186 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.rest;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.util.Map;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.PPPoED;
+import org.onlab.util.ItemNotFoundException;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.Port;
+import org.onosproject.rest.AbstractWebResource;
+
+import org.opencord.pppoeagent.impl.PppoeAgentCounterNames;
+import org.opencord.pppoeagent.impl.PppoeAgentCountersStore;
+import org.opencord.pppoeagent.impl.PppoeAgentCountersIdentifier;
+import org.opencord.pppoeagent.impl.PppoeAgentStatistics;
+import org.opencord.pppoeagent.PppoeAgentEvent;
+import org.opencord.pppoeagent.PppoeAgentService;
+import org.opencord.pppoeagent.PppoeSessionInfo;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
+
+/**
+ * PppoeAgent web resource.
+ */
+@Path("pppoeagent-app")
+public class PppoeAgentWebResource extends AbstractWebResource {
+    private final ObjectNode root = mapper().createObjectNode();
+    private final ArrayNode node = root.putArray("entry");
+    private static final String SESSION_NOT_FOUND = "Session not found";
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    DeviceService deviceService = AbstractShellCommand.get(DeviceService.class);
+
+    /**
+     * Get session info object.
+     *
+     * @param mac Session MAC address
+     *
+     * @return 200 OK
+     */
+    @GET
+    @Path("/session/{mac}")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getSubscriber(@PathParam("mac") String mac) {
+        MacAddress macAddress = MacAddress.valueOf(mac);
+        PppoeAgentService pppoeAgent = get(PppoeAgentService.class);
+        PppoeSessionInfo entry = pppoeAgent.getSessionsMap().get(macAddress);
+        if (entry == null) {
+            throw new ItemNotFoundException(SESSION_NOT_FOUND);
+        }
+
+        try {
+            node.add(encodePppoeSessionInfo(entry, macAddress));
+            return ok(mapper().writeValueAsString(root)).build();
+        } catch (IllegalArgumentException e) {
+            log.error("Error while fetching PPPoE session info for MAC {} through REST API: {}", mac, e.getMessage());
+            return Response.status(INTERNAL_SERVER_ERROR).build();
+        } catch (JsonProcessingException e) {
+            log.error("Error assembling JSON response for PPPoE session info request for MAC {} " +
+                    "through REST API: {}", mac, e.getMessage());
+            return Response.status(INTERNAL_SERVER_ERROR).build();
+        }
+    }
+
+    /**
+     * Get all session info objects.
+     *
+     * @return 200 OK
+     */
+    @GET
+    @Path("/session")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getSubscribers() {
+        try {
+            PppoeAgentService pppoeAgent = get(PppoeAgentService.class);
+            pppoeAgent.getSessionsMap().forEach((mac, entry) -> {
+                node.add(encodePppoeSessionInfo(entry, mac));
+            });
+
+            return ok(mapper().writeValueAsString(root)).build();
+        } catch (Exception e) {
+            log.error("Error while fetching PPPoE sessions information through REST API: {}", e.getMessage());
+            return Response.status(INTERNAL_SERVER_ERROR).build();
+        }
+    }
+
+    private ObjectNode encodePppoeSessionInfo(PppoeSessionInfo entry, MacAddress macAddress) {
+        ConnectPoint cp = entry.getClientCp();
+        Port devicePort = deviceService.getPort(cp);
+        String portLabel = "uni-" + ((cp.port().toLong() & 0xF) + 1);
+        String subscriberId = devicePort != null ? devicePort.annotations().value(AnnotationKeys.PORT_NAME) :
+                "UNKNOWN";
+
+        return mapper().createObjectNode()
+                .put("macAddress", macAddress.toString())
+                .put("sessionId", entry.getSessionId())
+                .put("currentState", entry.getCurrentState())
+                .put("lastReceivedPacket", PPPoED.Type.getTypeByValue(entry.getPacketCode()).name())
+                .put("deviceId", cp.deviceId().toString())
+                .put("portNumber", cp.port().toString())
+                .put("portLabel", portLabel)
+                .put("subscriberId", subscriberId);
+    }
+
+    /**
+     * Gets PPPoE Agent counters for global context.
+     *
+     * @return 200 OK
+     */
+    @GET
+    @Path("/stats")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getPppoeStats() {
+        return getStats(PppoeAgentEvent.GLOBAL_COUNTER);
+    }
+
+    /**
+     * Gets PPPoE Agent counters for specific subscriber.
+     *
+     * @param subscriberId Id of subscriber.
+     *
+     * @return 200 OK
+     */
+    @GET
+    @Path("/stats/{subscriberId}")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getPppoeSubscriberStats(@PathParam("subscriberId") String subscriberId) {
+        return getStats(subscriberId);
+    }
+    private Response getStats(String key) {
+        PppoeAgentCountersStore pppoeCounters = get(PppoeAgentCountersStore.class);
+        try {
+            PppoeAgentStatistics pppoeStatistics = pppoeCounters.getCounters();
+            JsonNode completeNode = buildPppoeCounterNodeObject(key, pppoeStatistics.counters());
+            return ok(mapper().writeValueAsString(completeNode)).build();
+        } catch (JsonProcessingException e) {
+            log.error("Error while fetching PPPoE agent counter stats through REST API: {}", e.getMessage());
+            return Response.status(INTERNAL_SERVER_ERROR).build();
+        }
+    }
+
+    private JsonNode buildPppoeCounterNodeObject(String key, Map<PppoeAgentCountersIdentifier, Long> countersMap) {
+        ObjectNode entryNode = mapper().createObjectNode();
+        for (PppoeAgentCounterNames counterType : PppoeAgentCounterNames.SUPPORTED_COUNTERS) {
+            Long value = countersMap.get(new PppoeAgentCountersIdentifier(key, counterType));
+            if (value == null) {
+                continue;
+            }
+            entryNode = entryNode.put(counterType.name(), String.valueOf(value));
+        }
+        return mapper().createObjectNode().set(key, entryNode);
+    }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/rest/package-info.java b/app/src/main/java/org/opencord/pppoeagent/rest/package-info.java
new file mode 100644
index 0000000..70a18d3
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/rest/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Rest interface for PppoeAgent.
+ */
+package org.opencord.pppoeagent.rest;
diff --git a/app/src/main/webapp/WEB-INF/web.xml b/app/src/main/webapp/WEB-INF/web.xml
new file mode 100644
index 0000000..9bb27a0
--- /dev/null
+++ b/app/src/main/webapp/WEB-INF/web.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright 2021-present Open Networking Foundation
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee"
+         xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
+         xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
+         id="ONOS" version="2.5">
+    <display-name>PPPoE Agent REST API v1.0</display-name>
+
+    <servlet>
+        <servlet-name>JAX-RS Service</servlet-name>
+        <servlet-class>org.glassfish.jersey.servlet.ServletContainer</servlet-class>
+        <init-param>
+            <param-name>jersey.config.server.provider.classnames</param-name>
+            <param-value>
+                org.opencord.pppoeagent.rest.PppoeAgentWebResource
+            </param-value>
+        </init-param>
+        <load-on-startup>1</load-on-startup>
+    </servlet>
+
+    <servlet-mapping>
+        <servlet-name>JAX-RS Service</servlet-name>
+        <url-pattern>/*</url-pattern>
+    </servlet-mapping>
+</web-app>
diff --git a/app/src/test/java/org/opencord/pppoeagent/impl/PppoeAgentTest.java b/app/src/test/java/org/opencord/pppoeagent/impl/PppoeAgentTest.java
new file mode 100644
index 0000000..6298acf
--- /dev/null
+++ b/app/src/test/java/org/opencord/pppoeagent/impl/PppoeAgentTest.java
@@ -0,0 +1,421 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.pppoeagent.impl;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.onlab.junit.TestUtils;
+import org.onlab.osgi.ComponentContextAdapter;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.PPPoED;
+import org.onlab.packet.PPPoEDTag;
+import org.onlab.packet.VlanId;
+
+import org.onosproject.net.Device;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterServiceAdapter;
+import org.onosproject.cluster.LeadershipServiceAdapter;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.TestStorageService;
+
+import org.opencord.pppoeagent.PppoeAgentEvent;
+import org.opencord.pppoeagent.PPPoEDVendorSpecificTag;
+import org.opencord.pppoeagent.PppoeSessionInfo;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
+
+import static org.easymock.EasyMock.createMock;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+public class PppoeAgentTest extends PppoeAgentTestBase {
+    private PppoeAgent pppoeAgent;
+    private SimplePppoeAgentCountersStore store;
+
+    ComponentConfigService mockConfigService =
+            createMock(ComponentConfigService.class);
+
+    /**
+     * Sets up the services required by the PPPoE agent app.
+     */
+    @Before
+    public void setUp() {
+        pppoeAgent = new PppoeAgent();
+        pppoeAgent.cfgService = new MockNetworkConfigRegistry();
+        pppoeAgent.coreService = new MockCoreServiceAdapter();
+        pppoeAgent.packetService = new MockPacketService();
+        pppoeAgent.componentConfigService = mockConfigService;
+        pppoeAgent.deviceService = new MockDeviceService();
+        pppoeAgent.sadisService = new MockSadisService();
+        pppoeAgent.subsService = pppoeAgent.sadisService.getSubscriberInfoService();
+        pppoeAgent.mastershipService = new MockMastershipService();
+        pppoeAgent.storageService = new TestStorageService();
+        pppoeAgent.pppoeAgentCounters = this.store;
+
+        store = new SimplePppoeAgentCountersStore();
+        store.storageService = new TestStorageService();
+        store.clusterService = new ClusterServiceAdapter();
+        store.leadershipService = new LeadershipServiceAdapter();
+        store.clusterCommunicationService = new MockClusterCommunicationService<>();
+        store.componentConfigService = mockConfigService;
+        TestUtils.setField(store, "eventDispatcher", new MockEventDispatcher());
+        store.activate(new MockComponentContext());
+
+        pppoeAgent.pppoeAgentCounters = this.store;
+
+        TestUtils.setField(pppoeAgent, "eventDispatcher", new MockEventDispatcher());
+        TestUtils.setField(pppoeAgent, "packetProcessorExecutor", MoreExecutors.newDirectExecutorService());
+
+        pppoeAgent.activate(new ComponentContextAdapter());
+    }
+
+    /**
+     * Tears down the PPPoE agent app.
+     */
+    @After
+    public void tearDown() {
+        pppoeAgent.deactivate();
+    }
+
+    @Test
+    public void testPppoePadi() {
+        testPppoeUpstreamPacket(PPPoED.PPPOED_CODE_PADI);
+    }
+
+    @Test
+    public void testPppoePado() {
+        testPppoeDownstreamPacket(PPPoED.PPPOED_CODE_PADO);
+    }
+
+    @Test
+    public void testPppoePadr() {
+        testPppoeUpstreamPacket(PPPoED.PPPOED_CODE_PADR);
+    }
+
+    @Test
+    public void testPppoePads() {
+        testPppoeDownstreamPacket(PPPoED.PPPOED_CODE_PADS);
+    }
+
+    @Test
+    public void testPppoePadt() {
+        // To simulate a successful PADT from subscriber a session entry is needed.
+        PppoeSessionInfo sessionInfo = new PppoeSessionInfo(DEFAULT_CONNECT_POINT, SERVER_CONNECT_POINT,
+                                                            PPPoED.PPPOED_CODE_PADS, (short) 1,
+                                                            pppoeAgent.subsService.get(CLIENT_NAS_PORT_ID),
+                                                            CLIENT_MAC);
+        putInfoOnSessionMap(CLIENT_MAC, sessionInfo);
+        assertTrue(pppoeAgent.getSessionsMap().containsKey(CLIENT_MAC));
+
+        Ethernet padt = constructPppoedPacket(PPPoED.PPPOED_CODE_PADT, CLIENT_MAC, MacAddress.BROADCAST,
+                                              CLIENT_C_TAG, CLIENT_S_TAG, (short) 1);
+        sendPacket(padt, DEFAULT_CONNECT_POINT);
+        Ethernet processedPadt = (Ethernet) getPacket();
+        assertNotNull(processedPadt);
+        assertEquals(padt, processedPadt);
+        assertFalse(pppoeAgent.getSessionsMap().containsKey(CLIENT_MAC));
+        PppoeAgentEvent e = getEvent();
+        assertNotNull(e);
+        assertEquals(PppoeAgentEvent.Type.TERMINATE, e.type());
+        assertEquals(DEFAULT_CONNECT_POINT, e.getConnectPoint());
+        assertEquals(CLIENT_MAC, e.getSubscriberMacAddress());
+
+        // Simulating PADT from server.
+        putInfoOnSessionMap(CLIENT_MAC, sessionInfo);
+        assertTrue(pppoeAgent.getSessionsMap().containsKey(CLIENT_MAC));
+        padt = constructPppoedPacket(PPPoED.PPPOED_CODE_PADT, SERVER_MAC, CLIENT_MAC,
+                                     CLIENT_C_TAG, CLIENT_S_TAG, (short) 1);
+        sendPacket(padt, SERVER_CONNECT_POINT);
+        processedPadt = (Ethernet) getPacket();
+        assertNotNull(processedPadt);
+        assertEquals(padt, processedPadt);
+        assertFalse(pppoeAgent.getSessionsMap().containsKey(CLIENT_MAC));
+        e = getEvent();
+        assertNotNull(e);
+        assertEquals(PppoeAgentEvent.Type.TERMINATE, e.type());
+        assertEquals(SERVER_CONNECT_POINT, e.getConnectPoint());
+        assertEquals(CLIENT_MAC, e.getSubscriberMacAddress());
+
+        // Simulating PADT from client for unknown session (the packet must not be processed)..
+        padt = constructPppoedPacket(PPPoED.PPPOED_CODE_PADT, CLIENT_MAC, MacAddress.BROADCAST,
+                                     CLIENT_C_TAG, CLIENT_S_TAG, (short) 1);
+        sendPacket(padt, DEFAULT_CONNECT_POINT);
+        processedPadt = (Ethernet) getPacket();
+        assertNull(processedPadt);
+
+        // Simulating PADT from server for unknown session (the packet must not be processed).
+        padt = constructPppoedPacket(PPPoED.PPPOED_CODE_PADT, SERVER_MAC, CLIENT_MAC,
+                                     CLIENT_C_TAG, CLIENT_S_TAG, (short) 1);
+        sendPacket(padt, SERVER_CONNECT_POINT);
+        processedPadt = (Ethernet) getPacket();
+        assertNull(processedPadt);
+    }
+
+    @Test
+    public void testPppoeCircuitIdValidation() {
+        Ethernet packet = constructPppoedPacket(PPPoED.PPPOED_CODE_PADI, CLIENT_MAC, MacAddress.BROADCAST,
+                                                CLIENT_C_TAG, CLIENT_S_TAG, (short) 0);
+        // Send packet with a different port number, so there will be a circuit-id mismatch.
+        sendPacket(packet, new ConnectPoint(DEVICE_ID, PortNumber.portNumber(4096L)));
+        Ethernet processedPacket = (Ethernet) getPacket();
+        assertNull(processedPacket);
+        PppoeAgentEvent e = getEvent();
+        assertNotNull(e);
+        assertEquals(PppoeAgentEvent.Type.INVALID_CID, e.type());
+
+        // Now send it from the default connect point, which should generate the valid circuit-id.
+        sendPacket(packet, DEFAULT_CONNECT_POINT);
+        processedPacket = (Ethernet) getPacket();
+        assertNotNull(processedPacket);
+        PPPoED pppoeLayer = (PPPoED) processedPacket.getPayload();
+        assertNotNull(pppoeLayer);
+        PPPoEDTag tag = pppoeLayer.getTag(PPPoEDTag.PPPOED_TAG_VENDOR_SPECIFIC);
+        assertNotNull(tag);
+        PPPoEDVendorSpecificTag vendorSpecificTag = PPPoEDVendorSpecificTag.fromByteArray(tag.getValue());
+        assertNotNull(vendorSpecificTag);
+
+        // Checks if the configured circuit-id matches with the built one.
+        assertEquals(CLIENT_CIRCUIT_ID, vendorSpecificTag.getCircuitId());
+    }
+
+    @Test
+    public void testPppoeCounters() {
+        short sessionId = (short) 0;
+        Ethernet padi = constructPppoedPacket(PPPoED.PPPOED_CODE_PADI, CLIENT_MAC, MacAddress.BROADCAST,
+                                              CLIENT_C_TAG, CLIENT_S_TAG, sessionId);
+        Ethernet pado = constructPppoedPacket(PPPoED.PPPOED_CODE_PADO, SERVER_MAC, CLIENT_MAC,
+                                              CLIENT_C_TAG, CLIENT_S_TAG, sessionId);
+        Ethernet padr = constructPppoedPacket(PPPoED.PPPOED_CODE_PADR, CLIENT_MAC, MacAddress.BROADCAST,
+                                              CLIENT_C_TAG, CLIENT_S_TAG, sessionId);
+        sessionId++;
+        Ethernet pads = constructPppoedPacket(PPPoED.PPPOED_CODE_PADS, SERVER_MAC, CLIENT_MAC,
+                                              CLIENT_C_TAG, CLIENT_S_TAG, sessionId);
+
+        List.of(new CounterTester(PppoeAgentCounterNames.PADI, 6, padi, DEFAULT_CONNECT_POINT),
+                new CounterTester(PppoeAgentCounterNames.PADO, 2, pado, SERVER_CONNECT_POINT),
+                new CounterTester(PppoeAgentCounterNames.PADR, 5, padr, DEFAULT_CONNECT_POINT),
+                new CounterTester(PppoeAgentCounterNames.PADS, 3, pads, SERVER_CONNECT_POINT),
+                new CounterTester(PppoeAgentCounterNames.PPPOED_PACKETS_FROM_SERVER, 5, null, null),
+                new CounterTester(PppoeAgentCounterNames.PPPOED_PACKETS_TO_SERVER, 11, null, null),
+                new CounterTester(PppoeAgentCounterNames.AC_SYSTEM_ERROR, 0, null, null),
+                new CounterTester(PppoeAgentCounterNames.GENERIC_ERROR_FROM_CLIENT, 0, null, null),
+                new CounterTester(PppoeAgentCounterNames.GENERIC_ERROR_FROM_SERVER, 0, null, null),
+                new CounterTester(PppoeAgentCounterNames.MTU_EXCEEDED, 0, null, null),
+                new CounterTester(PppoeAgentCounterNames.SERVICE_NAME_ERROR, 0, null, null))
+        .forEach(CounterTester::test);
+    }
+
+    @Test
+    public void testSessionsMap() {
+        assertEquals(0, pppoeAgent.getSessionsMap().size());
+        Ethernet packet = constructPppoedPacket(PPPoED.PPPOED_CODE_PADI, CLIENT_MAC, MacAddress.BROADCAST,
+                                                CLIENT_C_TAG, CLIENT_S_TAG, (short) 0);
+        sendPacket(packet, DEFAULT_CONNECT_POINT);
+        assertEquals(1, pppoeAgent.getSessionsMap().size());
+
+        int randomPacketsNumber = 15;
+        sendMultiplePadi(randomPacketsNumber);
+        assertEquals(randomPacketsNumber + 1, pppoeAgent.getSessionsMap().size());
+        PppoeSessionInfo sessionInfo = pppoeAgent.getSessionsMap().get(CLIENT_MAC);
+        assertSessionInfo(sessionInfo, PPPoED.PPPOED_CODE_PADI, (short) 0);
+
+        packet = constructPppoedPacket(PPPoED.PPPOED_CODE_PADT, CLIENT_MAC, MacAddress.BROADCAST,
+                                       CLIENT_C_TAG, CLIENT_S_TAG, (short) 0);
+        sendPacket(packet, DEFAULT_CONNECT_POINT);
+
+        assertEquals(randomPacketsNumber, pppoeAgent.getSessionsMap().size());
+    }
+
+    @Test
+    public void testDeviceEvents() {
+        // Guarantee map is empty.
+        assertEquals(0, pppoeAgent.getSessionsMap().size());
+
+        // Fill sessionsMap by sending 10 PADI packets for random mac addresses.
+        int numPackets = 10;
+        sendMultiplePadi(numPackets);
+        assertEquals(numPackets, pppoeAgent.getSessionsMap().size());
+
+        // Generate PORT_REMOVED event and inject into the device listener.
+        DeviceListener deviceListener = TestUtils.getField(pppoeAgent, "deviceListener");
+        Device device = pppoeAgent.deviceService.getDevice(DEVICE_ID);
+        DeviceEvent deviceEvent = new DeviceEvent(DeviceEvent.Type.PORT_REMOVED,
+                                                  device,
+                                                  new MockPort(PortNumber.portNumber(1L)));
+        deviceListener.event(deviceEvent);
+
+        // Check if session map is empty again.
+        assertEquals(0, pppoeAgent.getSessionsMap().size());
+
+        // Perform the same test but for PORT_UPDATED event.
+        sendMultiplePadi(numPackets);
+        assertEquals(numPackets, pppoeAgent.getSessionsMap().size());
+        deviceEvent = new DeviceEvent(DeviceEvent.Type.PORT_UPDATED,
+                                      device,
+                                      new MockPort(PortNumber.portNumber(1L), false));
+        deviceListener.event(deviceEvent);
+        assertEquals(0, pppoeAgent.getSessionsMap().size());
+
+        // Same test for DEVICE_REMOVED.
+        sendMultiplePadi(numPackets);
+        assertEquals(numPackets, pppoeAgent.getSessionsMap().size());
+        deviceEvent = new DeviceEvent(DeviceEvent.Type.DEVICE_REMOVED, device, null);
+        deviceListener.event(deviceEvent);
+        assertEquals(0, pppoeAgent.getSessionsMap().size());
+    }
+
+    private void sendMultiplePadi(int num) {
+        for (int i = 0; i < num; i++) {
+            MacAddress macAddress;
+            // A trick to guarantee the Mac address won't repeat (this case may never occur).
+            do {
+                macAddress = randomizeMacAddress();
+            } while (pppoeAgent.getSessionsMap().containsKey(macAddress));
+
+            Ethernet packet = constructPppoedPacket(PPPoED.PPPOED_CODE_PADI, macAddress, MacAddress.BROADCAST,
+                    CLIENT_C_TAG, CLIENT_S_TAG, (short) 0);
+            sendPacket(packet, DEFAULT_CONNECT_POINT);
+        }
+    }
+
+    private void testPppoeUpstreamPacket(byte packetCode) {
+        Ethernet packet = constructPppoedPacket(packetCode, CLIENT_MAC, MacAddress.BROADCAST,
+                                                CLIENT_C_TAG, CLIENT_S_TAG, (short) 0);
+        sendPacket(packet, DEFAULT_CONNECT_POINT);
+
+        Ethernet processedPacket = (Ethernet) getPacket();
+        assertNotNull(processedPacket);
+
+        PPPoED pppoedLayer = (PPPoED) processedPacket.getPayload();
+        assertNotNull(pppoedLayer);
+
+        List<PPPoEDTag> pppoedTagList = pppoedLayer.getTags();
+        assertEquals(1, pppoedTagList.size());
+
+        PPPoEDTag ppPoEDTag = pppoedTagList.get(0);
+        assertEquals(PPPoEDTag.PPPOED_TAG_VENDOR_SPECIFIC, ppPoEDTag.getType());
+
+        PPPoEDVendorSpecificTag vendorSpecificTag = PPPoEDVendorSpecificTag.fromByteArray(ppPoEDTag.getValue());
+        assertEquals(CLIENT_CIRCUIT_ID, vendorSpecificTag.getCircuitId());
+        assertEquals(CLIENT_REMOTE_ID, vendorSpecificTag.getRemoteId());
+        assertEquals(Integer.valueOf(PPPoEDVendorSpecificTag.BBF_IANA_VENDOR_ID), vendorSpecificTag.getVendorId());
+
+        // The only difference between the original and processed is the tag list,
+        // so after removing it the packets must be equal.
+        pppoedLayer.setPayload(null);
+        pppoedLayer.setPayloadLength((short) 0);
+        processedPacket.setPayload(pppoedLayer);
+        assertEquals(packet, processedPacket);
+
+        PppoeSessionInfo sessionInfo = pppoeAgent.getSessionsMap().get(CLIENT_MAC);
+        assertNotNull(sessionInfo);
+        assertSessionInfo(sessionInfo, packetCode, (short) 0);
+    }
+
+    private void testPppoeDownstreamPacket(byte packetCode) {
+        // Simulating a session entry of a previous packet.
+        byte previousPacketCode = packetCode == PPPoED.PPPOED_CODE_PADO ? PPPoED.PPPOED_CODE_PADI :
+                                                                         PPPoED.PPPOED_CODE_PADR;
+
+        SubscriberAndDeviceInformation deviceInfo = pppoeAgent.subsService.get(CLIENT_NAS_PORT_ID);
+
+        putInfoOnSessionMap(CLIENT_MAC, new PppoeSessionInfo(DEFAULT_CONNECT_POINT, SERVER_CONNECT_POINT,
+                                                             previousPacketCode, (short) 0, deviceInfo, CLIENT_MAC));
+
+        short sessionId = (short) (packetCode == PPPoED.PPPOED_CODE_PADS ? 1 : 0);
+        Ethernet packet = constructPppoedPacket(packetCode, SERVER_MAC, CLIENT_MAC,
+                                                CLIENT_C_TAG, CLIENT_S_TAG, sessionId);
+        sendPacket(packet, SERVER_CONNECT_POINT);
+
+        Ethernet processedPacket = (Ethernet) getPacket();
+        assertNotNull(processedPacket);
+
+        // sTag is removed before sending the packet to client.
+        packet.setQinQVID(VlanId.UNTAGGED);
+        assertEquals(packet, processedPacket);
+
+        PppoeSessionInfo sessionInfo = pppoeAgent.getSessionsMap().get(CLIENT_MAC);
+        assertNotNull(sessionInfo);
+        assertSessionInfo(sessionInfo, packetCode, sessionId);
+    }
+
+    private void assertSessionInfo(PppoeSessionInfo sessionInfo, Byte packetCode, short sessionId) {
+        assertEquals(packetCode, sessionInfo.getPacketCode());
+        assertEquals(sessionId, sessionInfo.getSessionId());
+        assertEquals(DEFAULT_CONNECT_POINT, sessionInfo.getClientCp());
+        assertEquals(CLIENT_MAC, sessionInfo.getClientMac());
+    }
+
+    private void putInfoOnSessionMap(MacAddress key, PppoeSessionInfo sessionInfo) {
+        ConsistentMap<MacAddress, PppoeSessionInfo> sessionsMap = TestUtils.getField(pppoeAgent, "sessionsMap");
+        sessionsMap.put(key, sessionInfo);
+    }
+
+    class CounterTester {
+        CounterTester(String subscriber, PppoeAgentCounterNames counter,
+                      long expectedValue, Ethernet packetModel,
+                      ConnectPoint cp) {
+            this.subscriber = subscriber;
+            this.counter = counter;
+            this.expectedValue = expectedValue;
+            this.packetModel = packetModel;
+            this.cp = cp;
+        }
+
+        CounterTester(PppoeAgentCounterNames counter, long expectedValue,
+                      Ethernet packetModel, ConnectPoint cp) {
+            this(PppoeAgentEvent.GLOBAL_COUNTER, counter, expectedValue, packetModel, cp);
+        }
+
+        String subscriber;
+        PppoeAgentCounterNames counter;
+        long expectedValue;
+        Ethernet packetModel;
+        ConnectPoint cp;
+
+        void sendModel() {
+            for (int i = 0; i < expectedValue; i++) {
+                sendPacket(packetModel, cp);
+            }
+        }
+
+        void assertCounterValue() {
+            long actualValue = store.getCountersMap()
+                    .get(new PppoeAgentCountersIdentifier(subscriber, counter));
+            assertEquals(expectedValue, actualValue);
+        }
+
+        void test() {
+            if (packetModel != null && cp != null) {
+                sendModel();
+            }
+            assertCounterValue();
+        }
+    }
+}
diff --git a/app/src/test/java/org/opencord/pppoeagent/impl/PppoeAgentTestBase.java b/app/src/test/java/org/opencord/pppoeagent/impl/PppoeAgentTestBase.java
new file mode 100644
index 0000000..59dc216
--- /dev/null
+++ b/app/src/test/java/org/opencord/pppoeagent/impl/PppoeAgentTestBase.java
@@ -0,0 +1,751 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.pppoeagent.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.TimeUnit;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.time.Duration;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.onlab.packet.BasePacket;
+import org.onlab.packet.ChassisId;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.Ip4Address;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.PPPoED;
+import org.onlab.packet.VlanId;
+
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreServiceAdapter;
+import org.onosproject.core.DefaultApplicationId;
+import org.onosproject.event.DefaultEventSinkRegistry;
+import org.onosproject.event.Event;
+import org.onosproject.event.EventDeliveryService;
+import org.onosproject.event.EventSink;
+import org.onosproject.mastership.MastershipServiceAdapter;
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.Annotations;
+import org.onosproject.net.config.Config;
+import org.onosproject.net.config.NetworkConfigRegistryAdapter;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.DefaultDevice;
+import org.onosproject.net.Device;
+import org.onosproject.net.device.DeviceServiceAdapter;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Element;
+import org.onosproject.net.packet.DefaultInboundPacket;
+import org.onosproject.net.packet.DefaultPacketContext;
+import org.onosproject.net.packet.InboundPacket;
+import org.onosproject.net.packet.OutboundPacket;
+import org.onosproject.net.packet.PacketContext;
+import org.onosproject.net.packet.PacketProcessor;
+import org.onosproject.net.packet.PacketServiceAdapter;
+import org.onosproject.net.provider.ProviderId;
+import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+
+import org.opencord.pppoeagent.PppoeAgentEvent;
+import org.opencord.sadis.BandwidthProfileInformation;
+import org.opencord.sadis.BaseInformationService;
+import org.opencord.sadis.SadisService;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
+import org.opencord.sadis.UniTagInformation;
+
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.ComponentInstance;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Common methods for PpppoeAgent app tests.
+ */
+public class PppoeAgentTestBase {
+    static final int UPLINK_PORT = 5;
+    static final String OLT_DEV_ID = "of:00000000000000aa";
+    static final String OLT_SERIAL_NUMBER = "OLT123456789";
+
+    static final VlanId CLIENT_C_TAG = VlanId.vlanId((short) 999);
+    static final VlanId CLIENT_C_TAG_2 = VlanId.vlanId((short) 998);
+    static final VlanId CLIENT_S_TAG = VlanId.vlanId((short) 111);
+    static final String CLIENT_ID_1 = "SUBSCRIBER_ID_1";
+    static final short CLIENT_C_PBIT = 6;
+    static final String CLIENT_NAS_PORT_ID = "ONU123456789";
+    static final String CLIENT_REMOTE_ID = "remote0";
+
+    static final MacAddress CLIENT_MAC = MacAddress.valueOf("B8:26:D4:09:E5:D1");
+    static final MacAddress SERVER_MAC = MacAddress.valueOf("74:86:7A:FB:07:86");
+    static final MacAddress OLT_MAC_ADDRESS = MacAddress.valueOf("01:02:03:04:05:06");
+
+    static final ConnectPoint DEFAULT_CONNECT_POINT = ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1);
+    static final String CLIENT_CIRCUIT_ID = String.format("%s 0/%s:%s", OLT_SERIAL_NUMBER,
+                                                         (DEFAULT_CONNECT_POINT.port().toLong() >> 12) + 1,
+                                                          CLIENT_NAS_PORT_ID);
+
+    static final DeviceId DEVICE_ID = DeviceId.deviceId(OLT_DEV_ID);
+    static final String SCHEME_NAME = "pppoeagent";
+
+    static final ConnectPoint SERVER_CONNECT_POINT = ConnectPoint.deviceConnectPoint("of:00000000000000aa/5");
+
+    static final DefaultAnnotations DEVICE_ANNOTATIONS = DefaultAnnotations.builder()
+            .set(AnnotationKeys.PROTOCOL, SCHEME_NAME.toUpperCase()).build();
+
+    List<BasePacket> savedPackets = new LinkedList<>();
+    PacketProcessor packetProcessor;
+
+    /**
+     * Saves the given packet onto the saved packets list.
+     *
+     * @param packet packet to save
+     */
+    void savePacket(BasePacket packet) {
+        savedPackets.add(packet);
+    }
+
+    /**
+     * Gets and removes the packet in the 1st position of savedPackets list.
+     *
+     * @return the packet in 1st position of savedPackets list.
+     */
+    BasePacket getPacket() {
+        return savedPackets.size() > 0 ? savedPackets.remove(0) : null;
+    }
+
+    /**
+     * Gets the last generated event.
+     *
+     * @return the last generated pppoe agent event.
+     */
+    PppoeAgentEvent getEvent() {
+        List<Event> savedEvents = MockEventDispatcher.eventList;
+        return savedEvents.size() > 0 ? (PppoeAgentEvent) savedEvents.get(savedEvents.size() - 1) : null;
+    }
+
+    /**
+     * Sends an Ethernet packet to the process method of the Packet Processor.
+     *
+     * @param pkt Ethernet packet
+     * @param cp ConnectPoint cp
+     */
+    void sendPacket(Ethernet pkt, ConnectPoint cp) {
+        final ByteBuffer byteBuffer = ByteBuffer.wrap(pkt.serialize());
+        InboundPacket inPacket = new DefaultInboundPacket(cp, pkt, byteBuffer);
+
+        PacketContext context = new MockPacketContext(127L, inPacket, null, false);
+        packetProcessor.process(context);
+    }
+
+    /**
+     * Mocks core service adaptor that provides an appId.
+     */
+    class MockCoreServiceAdapter extends CoreServiceAdapter {
+
+        @Override
+        public ApplicationId registerApplication(String name) {
+            return new DefaultApplicationId(10, name);
+        }
+    }
+
+    /**
+     * Mocks device service to provide the of device object.
+     */
+    class MockDeviceService extends DeviceServiceAdapter {
+
+        private ProviderId providerId = new ProviderId("of", "foo");
+        private final Device device1 = new PppoeAgentTestBase.MockDevice(providerId, DEVICE_ID, Device.Type.SWITCH,
+                "foo.inc", "0", "0", OLT_SERIAL_NUMBER, new ChassisId(),
+                DEVICE_ANNOTATIONS);
+
+        @Override
+        public Device getDevice(DeviceId devId) {
+            return device1;
+        }
+
+        @Override
+        public Port getPort(ConnectPoint cp) {
+            return new PppoeAgentTestBase.MockPort(cp.port());
+        }
+
+        @Override
+        public Port getPort(DeviceId deviceId, PortNumber portNumber) {
+            return new PppoeAgentTestBase.MockPort(portNumber);
+        }
+
+        @Override
+        public boolean isAvailable(DeviceId d) {
+            return true;
+        }
+    }
+
+    /**
+     * Mocks device object.
+     */
+    class MockDevice extends DefaultDevice {
+
+        public MockDevice(ProviderId providerId, DeviceId id, Type type,
+                          String manufacturer, String hwVersion, String swVersion,
+                          String serialNumber, ChassisId chassisId, Annotations... annotations) {
+            super(providerId, id, type, manufacturer, hwVersion, swVersion, serialNumber,
+                    chassisId, annotations);
+        }
+    }
+
+    /**
+     * Mocks port object.
+     */
+    class  MockPort implements Port {
+        private PortNumber portNumber;
+        private boolean isEnabled = true;
+        MockPort(PortNumber portNumber) {
+            this.portNumber = portNumber;
+        }
+
+        MockPort(PortNumber portNumber, boolean isEnabled) {
+            this.portNumber = portNumber;
+            this.isEnabled = isEnabled;
+        }
+
+        @Override
+        public boolean isEnabled() {
+            return this.isEnabled;
+        }
+        @Override
+        public long portSpeed() {
+            return 1000;
+        }
+        @Override
+        public Element element() {
+            return null;
+        }
+        @Override
+        public PortNumber number() {
+            return this.portNumber;
+        }
+        @Override
+        public Annotations annotations() {
+            return new MockAnnotations();
+        }
+        @Override
+        public Type type() {
+            return Port.Type.FIBER;
+        }
+
+        private class MockAnnotations implements Annotations {
+
+            @Override
+            public String value(String val) {
+                return CLIENT_NAS_PORT_ID;
+            }
+            @Override
+            public Set<String> keys() {
+                return null;
+            }
+        }
+    }
+
+    /**
+     * Mocks mastership service to enable local-master operations.
+     */
+    class MockMastershipService extends MastershipServiceAdapter {
+        @Override
+        public boolean isLocalMaster(DeviceId d) {
+            return true;
+        }
+    }
+
+    /**
+     * Keeps a reference to the PacketProcessor and saves the OutboundPackets.
+     */
+    class MockPacketService extends PacketServiceAdapter {
+
+        @Override
+        public void addProcessor(PacketProcessor processor, int priority) {
+            packetProcessor = processor;
+        }
+
+        @Override
+        public void emit(OutboundPacket packet) {
+            try {
+                Ethernet eth = Ethernet.deserializer().deserialize(packet.data().array(),
+                        0, packet.data().array().length);
+                savePacket(eth);
+            } catch (Exception e) {
+                fail(e.getMessage());
+            }
+        }
+    }
+
+    /**
+     * Mocks Sadis service.
+     */
+    class MockSadisService implements SadisService {
+        @Override
+        public BaseInformationService<SubscriberAndDeviceInformation> getSubscriberInfoService() {
+            return new PppoeAgentTestBase.MockSubService();
+        }
+
+        @Override
+        public BaseInformationService<BandwidthProfileInformation> getBandwidthProfileService() {
+            return null;
+        }
+    }
+
+    /**
+     * Mocks Sadis content with a device and a subscriber entry.
+     */
+    class MockSubService implements BaseInformationService<SubscriberAndDeviceInformation> {
+
+        PppoeAgentTestBase.MockSubscriberAndDeviceInformation device =
+                new PppoeAgentTestBase.MockSubscriberAndDeviceInformation(OLT_DEV_ID, VlanId.NONE, VlanId.NONE,
+                                                                          VlanId.NONE, null, null,
+                                                                          OLT_MAC_ADDRESS,
+                                                                          Ip4Address.valueOf("10.10.10.10"),
+                                                                          UPLINK_PORT, null);
+
+        PppoeAgentTestBase.MockSubscriberAndDeviceInformation sub =
+                new PppoeAgentTestBase.MockSubscriberAndDeviceInformation(CLIENT_ID_1, CLIENT_C_TAG, CLIENT_C_TAG_2,
+                                                                          CLIENT_S_TAG, CLIENT_NAS_PORT_ID,
+                                                                          CLIENT_CIRCUIT_ID, null, null,
+                                                                -1, CLIENT_REMOTE_ID);
+
+        @Override
+        public SubscriberAndDeviceInformation get(String id) {
+            if (id.equals(OLT_SERIAL_NUMBER)) {
+                return device;
+            } else {
+                return  sub;
+            }
+        }
+
+        @Override
+        public void invalidateAll() {}
+        @Override
+        public void invalidateId(String id) {}
+        @Override
+        public SubscriberAndDeviceInformation getfromCache(String id) {
+            return null;
+        }
+    }
+
+    /**
+     * Mock Sadis object to populate service.
+     */
+    class MockSubscriberAndDeviceInformation extends SubscriberAndDeviceInformation {
+
+        MockSubscriberAndDeviceInformation(String id, VlanId cTag, VlanId cTag2,
+                                           VlanId sTag, String nasPortId,
+                                           String circuitId, MacAddress hardId,
+                                           Ip4Address ipAddress, int uplinkPort,
+                                           String remoteId) {
+            this.setHardwareIdentifier(hardId);
+            this.setId(id);
+            this.setIPAddress(ipAddress);
+            this.setNasPortId(nasPortId);
+            this.setCircuitId(circuitId);
+            this.setUplinkPort(uplinkPort);
+            this.setRemoteId(remoteId);
+
+            List<UniTagInformation> uniTagInformationList = new ArrayList<>();
+
+            UniTagInformation uniTagInformation = new UniTagInformation.Builder()
+                    .setPonCTag(cTag)
+                    .setPonSTag(sTag)
+                    .setUsPonCTagPriority(CLIENT_C_PBIT)
+                    .build();
+
+            UniTagInformation uniTagInformation2 = new UniTagInformation.Builder()
+                    .setPonCTag(cTag2)
+                    .setPonSTag(sTag)
+                    .setUsPonCTagPriority(CLIENT_C_PBIT)
+                    .build();
+
+            uniTagInformationList.add(uniTagInformation);
+            uniTagInformationList.add(uniTagInformation2);
+            this.setUniTagList(uniTagInformationList);
+        }
+    }
+
+    /**
+     * Mocks component context.
+     */
+    class MockComponentContext implements ComponentContext {
+        @Override
+        public Dictionary<String, Object> getProperties() {
+            Dictionary<String, Object> cfgDict = new Hashtable<String, Object>();
+            return cfgDict;
+        }
+
+        @Override
+        public Object locateService(String name) {
+            return null;
+        }
+
+        @Override
+        public Object locateService(String name, ServiceReference reference) {
+            return null;
+        }
+
+        @Override
+        public Object[] locateServices(String name) {
+            return null;
+        }
+
+        @Override
+        public BundleContext getBundleContext() {
+            return null;
+        }
+
+        @Override
+        public Bundle getUsingBundle() {
+            return null;
+        }
+
+        @Override
+        public ComponentInstance getComponentInstance() {
+            return null;
+        }
+
+        @Override
+        public void enableComponent(String name) {
+        }
+
+        @Override
+        public void disableComponent(String name) {
+        }
+
+        @Override
+        public ServiceReference getServiceReference() {
+            return null;
+        }
+    }
+
+    /**
+     * Mocks the network config registry.
+     */
+    @SuppressWarnings("unchecked")
+    class MockNetworkConfigRegistry
+            extends NetworkConfigRegistryAdapter {
+        @Override
+        public <S, C extends Config<S>> C getConfig(S subject, Class<C> configClass) {
+            PppoeAgentConfig pppoeAgentConfig = new MockPppoeAgentConfig();
+            return (C) pppoeAgentConfig;
+        }
+    }
+
+    /**
+     * Mocks the network config registry.
+     */
+    class MockPppoeAgentConfig extends PppoeAgentConfig {
+        @Override
+        public boolean getUseOltUplinkForServerPktInOut() {
+            return true;
+        }
+    }
+
+    /**
+     * Mocks the DefaultPacketContext.
+     */
+    final class MockPacketContext extends DefaultPacketContext {
+
+        private MockPacketContext(long time, InboundPacket inPkt,
+                                  OutboundPacket outPkt, boolean block) {
+            super(time, inPkt, outPkt, block);
+        }
+
+        @Override
+        public void send() {
+            // We don't send anything out.
+        }
+    }
+
+    /**
+     * Creates a mock for event delivery service.
+     */
+    static class MockEventDispatcher extends DefaultEventSinkRegistry
+            implements EventDeliveryService {
+        static List<Event> eventList = new LinkedList<>();
+        @Override
+        @SuppressWarnings("unchecked")
+        public synchronized void post(Event event) {
+            EventSink sink = getSink(event.getClass());
+            checkState(sink != null, "No sink for event %s", event);
+            sink.process(event);
+            eventList.add(event);
+        }
+
+        @Override
+        public void setDispatchTimeLimit(long millis) {
+        }
+
+        @Override
+        public long getDispatchTimeLimit() {
+            return 0;
+        }
+    }
+
+    /**
+     * Creates a mock object for a scheduled executor service.
+     */
+    class MockExecutor implements ScheduledExecutorService {
+        private ScheduledExecutorService executor;
+
+        MockExecutor(ScheduledExecutorService executor) {
+            this.executor = executor;
+        }
+
+        String lastMethodCalled = "";
+        long lastInitialDelay;
+        long lastDelay;
+        TimeUnit lastUnit;
+
+        public void assertLastMethodCalled(String method, long initialDelay, long delay, TimeUnit unit) {
+            assertEquals(method, lastMethodCalled);
+            assertEquals(initialDelay, lastInitialDelay);
+            assertEquals(delay, lastDelay);
+            assertEquals(unit, lastUnit);
+        }
+
+        @Override
+        public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+            lastMethodCalled = "scheduleRunnable";
+            lastDelay = delay;
+            lastUnit = unit;
+            return null;
+        }
+
+        @Override
+        public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+            lastMethodCalled = "scheduleCallable";
+            lastDelay = delay;
+            lastUnit = unit;
+            return null;
+        }
+
+        @Override
+        public ScheduledFuture<?> scheduleAtFixedRate(
+                Runnable command, long initialDelay, long period, TimeUnit unit) {
+            lastMethodCalled = "scheduleAtFixedRate";
+            lastInitialDelay = initialDelay;
+            lastDelay = period;
+            lastUnit = unit;
+            return null;
+        }
+
+        @Override
+        public ScheduledFuture<?> scheduleWithFixedDelay(
+                Runnable command, long initialDelay, long delay, TimeUnit unit) {
+            lastMethodCalled = "scheduleWithFixedDelay";
+            lastInitialDelay = initialDelay;
+            lastDelay = delay;
+            lastUnit = unit;
+            command.run();
+            return null;
+        }
+
+        @Override
+        public boolean awaitTermination(long timeout, TimeUnit unit) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+                throws InterruptedException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(
+                Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+                throws InterruptedException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+                throws ExecutionException, InterruptedException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+                throws ExecutionException, InterruptedException, TimeoutException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean isShutdown() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean isTerminated() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void shutdown() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public List<Runnable> shutdownNow() {
+            return null;
+        }
+
+        @Override
+        public <T> Future<T> submit(Callable<T> task) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Future<?> submit(Runnable task) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public <T> Future<T> submit(Runnable task, T result) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void execute(Runnable command) {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    /**
+     * Creates a PPPoED message encapsulated by an Ethernet object.
+     *
+     * @param type        PPoED message type.
+     * @param source      source address.
+     * @param destination destination address of the message.
+     * @param cVlan       inner vlan.
+     * @param sVlan       outer vlan.
+     * @param sessionId   session-id number.
+     *
+     * @return Ethernet packet with PPPoED payload.
+     */
+    Ethernet constructPppoedPacket(byte type, MacAddress source, MacAddress destination,
+                                   VlanId cVlan, VlanId sVlan, short sessionId) {
+        Ethernet pppoedPacket = new Ethernet();
+        pppoedPacket.setSourceMACAddress(source);
+        pppoedPacket.setDestinationMACAddress(destination);
+        pppoedPacket.setEtherType(Ethernet.TYPE_PPPOED);
+        pppoedPacket.setVlanID(cVlan.toShort());
+        pppoedPacket.setPriorityCode((byte) CLIENT_C_PBIT);
+        pppoedPacket.setQinQTPID(Ethernet.TYPE_VLAN);
+        pppoedPacket.setQinQVID(sVlan.toShort());
+
+        PPPoED pppoedLayer = new PPPoED();
+        pppoedLayer.setCode(type);
+        pppoedLayer.setSessionId(sessionId);
+        pppoedPacket.setPayload(pppoedLayer);
+
+        return pppoedPacket;
+    }
+
+    public class MockClusterCommunicationService<M> implements ClusterCommunicationService {
+        private Consumer handler;
+        @Override
+        public void addSubscriber(MessageSubject subject,
+                                  ClusterMessageHandler subscriber, ExecutorService executor) {
+        }
+        @Override
+        public <M> void broadcast(M message, MessageSubject subject, Function<M, byte[]> encoder) {
+        }
+        @Override
+        public <M> void broadcastIncludeSelf(M message, MessageSubject subject, Function<M, byte[]> encoder) {
+            handler.accept(message);
+        }
+        @Override
+        public <M> CompletableFuture<Void> unicast(M message, MessageSubject subject,
+                                                   Function<M, byte[]> encoder, NodeId toNodeId) {
+            return null;
+        }
+        @Override
+        public <M> void multicast(M message, MessageSubject subject,
+                                  Function<M, byte[]> encoder, Set<NodeId> nodeIds) {
+        }
+        @Override
+        public <M, R> CompletableFuture<R> sendAndReceive(M message, MessageSubject subject,
+                                                          Function<M, byte[]> encoder,
+                                                          Function<byte[], R> decoder, NodeId toNodeId) {
+            return null;
+        }
+        @Override
+        public <M, R> CompletableFuture<R> sendAndReceive(M message, MessageSubject subject,
+                                                          Function<M, byte[]> encoder, Function<byte[], R> decoder,
+                                                          NodeId toNodeId, Duration timeout) {
+            return null;
+        }
+        @Override
+        public <M, R> void addSubscriber(MessageSubject subject, Function<byte[], M> decoder,
+                                         Function<M, R> handler, Function<R, byte[]> encoder, Executor executor) {
+        }
+        @Override
+        public <M, R> void addSubscriber(MessageSubject subject, Function<byte[], M> decoder,
+                                         Function<M, CompletableFuture<R>> handler, Function<R, byte[]> encoder) {
+        }
+        @Override
+        public <M> void addSubscriber(MessageSubject subject, Function<byte[], M> decoder,
+                                      Consumer<M> handler, Executor executor) {
+            this.handler = handler;
+        }
+        @Override
+        public void removeSubscriber(MessageSubject subject) {
+        }
+    }
+
+    /**
+     * Creates a random MAC address.
+     *
+     * @return random MAC address object.
+     */
+    MacAddress randomizeMacAddress() {
+        byte[] mac = new byte[6];
+        Random r = new Random();
+        r.nextBytes(mac);
+        return MacAddress.valueOf(mac);
+    }
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..6fe95e7
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright 2021-present Open Networking Foundation
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.onosproject</groupId>
+        <artifactId>onos-dependencies</artifactId>
+        <version>2.2.8-b3</version>
+    </parent>
+
+    <groupId>org.opencord</groupId>
+    <artifactId>pppoeagent</artifactId>
+    <version>1.0.0-SNAPSHOT</version>
+    <packaging>pom</packaging>
+
+    <description>PPPoE Intermediate Agent</description>
+
+    <properties>
+        <sadis.api.version>5.2.0</sadis.api.version>
+    </properties>
+
+    <modules>
+        <module>app</module>
+        <module>api</module>
+    </modules>
+
+</project>