diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
index e838a37..39e1cb3 100755
--- a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
@@ -17,9 +17,13 @@
 
 import static org.onlab.packet.DHCP.DHCPOptionCode.OptionCode_MessageType;
 import static org.onlab.packet.MacAddress.valueOf;
+import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.getIntegerProperty;
 import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
 
+import java.io.ByteArrayOutputStream;
 import java.nio.ByteBuffer;
+import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Dictionary;
@@ -27,9 +31,15 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import org.apache.commons.io.HexDump;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -48,6 +58,7 @@
 import org.onlab.packet.UDP;
 import org.onlab.packet.VlanId;
 import org.onlab.packet.dhcp.DhcpOption;
+import org.onlab.util.SafeRecurringTask;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.core.ApplicationId;
@@ -153,6 +164,11 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected FlowObjectiveService flowObjectiveService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DhcpL2RelayCountersStore dhcpL2RelayCounters;
+
+    protected PublishCountersToKafka publishCountersToKafka;
+
     @Property(name = "option82", boolValue = true,
             label = "Add option 82 to relayed packets")
     protected boolean option82 = true;
@@ -161,6 +177,19 @@
             label = "Ask the DHCP Server to send back replies as L2 broadcast")
     protected boolean enableDhcpBroadcastReplies = false;
 
+    private static final int DEFAULT_RATE = 10;
+    @Property(name = "publishCountersRate", intValue = DEFAULT_RATE,
+            label = "The interval in seconds between calls to publish the DHCP counters to Kafka")
+    protected int publishCountersRate = 0;
+
+    ScheduledFuture<?> refreshTask;
+    ScheduledExecutorService refreshService = Executors.newSingleThreadScheduledExecutor();
+
+    private static final String DEFAULT_DHCP_COUNTERS_TOPIC = "onos_traffic.stats";
+    @Property(name = "dhcpCountersTopic", value = DEFAULT_DHCP_COUNTERS_TOPIC,
+            label = "The topic where to publish the DHCP L2 Relay counters to Kafka")
+    private String dhcpCountersTopic = DEFAULT_DHCP_COUNTERS_TOPIC;
+
     private DhcpRelayPacketProcessor dhcpRelayPacketProcessor =
             new DhcpRelayPacketProcessor();
 
@@ -169,14 +198,14 @@
 
     // connect points to the DHCP server
     Set<ConnectPoint> dhcpConnectPoints;
-    private AtomicReference<ConnectPoint> dhcpServerConnectPoint = new AtomicReference<>();
+    protected AtomicReference<ConnectPoint> dhcpServerConnectPoint = new AtomicReference<>();
     private MacAddress dhcpConnectMac = MacAddress.BROADCAST;
     private ApplicationId appId;
 
     static Map<String, DhcpAllocationInfo> allocationMap = Maps.newConcurrentMap();
-    private boolean modifyClientPktsSrcDstMac = false;
+    protected boolean modifyClientPktsSrcDstMac = false;
     //Whether to use the uplink port of the OLTs to send/receive messages to the DHCP server
-    private boolean useOltUplink = false;
+    protected boolean useOltUplink = false;
 
     private BaseInformationService<SubscriberAndDeviceInformation> subsService;
 
@@ -206,12 +235,21 @@
             modified(context);
         }
 
+        publishCountersToKafka = new PublishCountersToKafka();
+        subsService = sadisService.getSubscriberInfoService();
+        restartPublishCountersTask();
 
         log.info("DHCP-L2-RELAY Started");
     }
 
     @Deactivate
     protected void deactivate() {
+        if (refreshTask != null) {
+            refreshTask.cancel(true);
+        }
+        if (refreshService != null) {
+            refreshService.shutdownNow();
+        }
         cfgService.removeListener(cfgListener);
         factories.forEach(cfgService::unregisterConfigFactory);
         packetService.removeProcessor(dhcpRelayPacketProcessor);
@@ -238,6 +276,62 @@
         if (o != null) {
             enableDhcpBroadcastReplies = o;
         }
+
+        Integer newPublishCountersRate = getIntegerProperty(properties, "publishCountersRate");
+        if (newPublishCountersRate != null) {
+            if (newPublishCountersRate != publishCountersRate && newPublishCountersRate >= 0) {
+                log.info("publishCountersRate modified from {} to {}", publishCountersRate, newPublishCountersRate);
+                publishCountersRate = newPublishCountersRate;
+            } else if (newPublishCountersRate < 0) {
+                log.error("Invalid newPublishCountersRate : {}, defaulting to 0", newPublishCountersRate);
+                publishCountersRate = 0;
+            }
+            restartPublishCountersTask();
+        }
+
+        String newDhcpCountersTopic = get(properties, "dhcpCountersTopic");
+        if (newDhcpCountersTopic != null && !newDhcpCountersTopic.equals(dhcpCountersTopic)) {
+            log.info("Property dhcpCountersTopic modified from {} to {}", dhcpCountersTopic, newDhcpCountersTopic);
+            dhcpCountersTopic = newDhcpCountersTopic;
+        }
+    }
+
+    /**
+     * Starts a thread to publish the counters to kafka at a certain rate time.
+     */
+    private void restartPublishCountersTask() {
+        if (refreshTask != null) {
+            refreshTask.cancel(true);
+        }
+        if (publishCountersRate > 0) {
+            log.info("Refresh Rate set to {}, publishCountersToKafka will be called every {} seconds",
+                    publishCountersRate, publishCountersRate);
+            refreshTask = refreshService.scheduleWithFixedDelay(SafeRecurringTask.wrap(publishCountersToKafka),
+                    publishCountersRate, publishCountersRate, TimeUnit.SECONDS);
+        } else {
+            log.info("Refresh Rate set to 0, disabling calls to publishCountersToKafka");
+        }
+    }
+
+    /**
+     * Publish the counters to kafka.
+     */
+    private class PublishCountersToKafka implements Runnable {
+        public void run() {
+            dhcpL2RelayCounters.getCountersMap().forEach((counterKey, counterValue) -> {
+                // Publish the global counters
+                if (counterKey.counterClassKey.equals(DhcpL2RelayCountersIdentifier.GLOBAL_COUNTER)) {
+                    post(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.STATS_UPDATE, null, null,
+                            new AbstractMap.SimpleEntry<String, AtomicLong>(counterKey.counterTypeKey.toString(),
+                                    counterValue), dhcpCountersTopic, null));
+                } else { // Publish the counters per subscriber
+                    DhcpAllocationInfo info = allocationMap.get(counterKey.counterClassKey);
+                    post(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.STATS_UPDATE, info, null,
+                            new AbstractMap.SimpleEntry<String, AtomicLong>(counterKey.counterTypeKey.toString(),
+                                    counterValue), dhcpCountersTopic, counterKey.counterClassKey));
+                }
+            });
+        }
     }
 
     /**
@@ -245,7 +339,7 @@
      *
      * @return true if all information we need have been initialized
      */
-    private boolean configured() {
+    protected boolean configured() {
         if (!useOltUplink) {
             return dhcpServerConnectPoint.get() != null;
         }
@@ -605,6 +699,9 @@
                               toSendTo, packet);
                 }
                 packetService.emit(o);
+
+                SubscriberAndDeviceInformation entry = getSubscriberInfoFromClient(context);
+                updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("PACKETS_TO_SERVER"));
             } else {
                 log.error("No connect point to send msg to DHCP Server");
             }
@@ -622,6 +719,44 @@
             return null;
         }
 
+        private void  updateDhcpRelayCountersStore(SubscriberAndDeviceInformation entry,
+                                                   DhcpL2RelayCounters counterType) {
+            // Update global counter stats
+            dhcpL2RelayCounters.incrementCounter(DhcpL2RelayCountersIdentifier.GLOBAL_COUNTER, counterType);
+            if (entry == null) {
+                log.warn("Counter not updated as subscriber info not found.");
+            } else {
+                // Update subscriber counter stats
+                dhcpL2RelayCounters.incrementCounter(entry.id(), counterType);
+            }
+        }
+
+        /*
+         * Get subscriber information based on it's context packet.
+         */
+        private SubscriberAndDeviceInformation getSubscriberInfoFromClient(PacketContext context) {
+            if (context != null) {
+                return getSubscriber(context);
+            }
+            return null;
+        }
+
+        /*
+         * Get subscriber information based on it's DHCP payload.
+         */
+        private SubscriberAndDeviceInformation getSubscriberInfoFromServer(DHCP dhcpPayload) {
+            if (dhcpPayload != null) {
+                MacAddress descMac = valueOf(dhcpPayload.getClientHardwareAddress());
+                ConnectPoint subsCp = getConnectPointOfClient(descMac);
+
+                if (subsCp != null) {
+                    String portId = nasPortId(subsCp);
+                    return subsService.get(portId);
+                }
+            }
+            return null;
+        }
+
         //process the dhcp packet before sending to server
         private void processDhcpPacket(PacketContext context, Ethernet packet,
                                        DHCP dhcpPayload) {
@@ -631,6 +766,18 @@
             }
 
             DHCPPacketType incomingPacketType = getDhcpPacketType(dhcpPayload);
+            if (incomingPacketType == null) {
+                log.warn("DHCP packet type not found. Dump of ethernet pkt in hex format for troubleshooting.");
+                byte[] array = packet.serialize();
+                ByteArrayOutputStream buf = new ByteArrayOutputStream();
+                try {
+                    HexDump.dump(array, 0, buf, 0);
+                    log.trace(buf.toString());
+                } catch (Exception e) { }
+                return;
+            }
+
+            SubscriberAndDeviceInformation entry = null;
 
             log.info("Received DHCP Packet of type {} from {}",
                      incomingPacketType, context.inPacket().receivedFrom());
@@ -642,6 +789,8 @@
                     if (ethernetPacketDiscover != null) {
                         forwardPacket(ethernetPacketDiscover, context);
                     }
+                    entry = getSubscriberInfoFromClient(context);
+                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPDISCOVER"));
                     break;
                 case DHCPOFFER:
                     //reply to dhcp client.
@@ -650,6 +799,8 @@
                     if (ethernetPacketOffer != null) {
                         sendReply(ethernetPacketOffer, dhcpPayload);
                     }
+                    entry = getSubscriberInfoFromServer(dhcpPayload);
+                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPOFFER"));
                     break;
                 case DHCPREQUEST:
                     Ethernet ethernetPacketRequest =
@@ -657,6 +808,8 @@
                     if (ethernetPacketRequest != null) {
                         forwardPacket(ethernetPacketRequest, context);
                     }
+                    entry = getSubscriberInfoFromClient(context);
+                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPREQUEST"));
                     break;
                 case DHCPACK:
                     //reply to dhcp client.
@@ -665,6 +818,20 @@
                     if (ethernetPacketAck != null) {
                         sendReply(ethernetPacketAck, dhcpPayload);
                     }
+                    entry = getSubscriberInfoFromServer(dhcpPayload);
+                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPACK"));
+                    break;
+                case DHCPDECLINE:
+                    entry = getSubscriberInfoFromClient(context);
+                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPDECLINE"));
+                    break;
+                case DHCPNAK:
+                    entry = getSubscriberInfoFromServer(dhcpPayload);
+                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPNACK"));
+                    break;
+                case DHCPRELEASE:
+                    entry = getSubscriberInfoFromClient(context);
+                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPRELEASE"));
                     break;
                 default:
                     break;
@@ -708,7 +875,7 @@
                     context.inPacket().receivedFrom(), dhcpPacket.getPacketType(),
                     entry.nasPortId(), clientMac, clientIp);
 
-            allocationMap.put(entry.nasPortId(), info);
+            allocationMap.put(entry.id(), info);
 
             post(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.UPDATED, info,
                                       context.inPacket().receivedFrom()));
@@ -779,12 +946,15 @@
                     DhcpAllocationInfo info = new DhcpAllocationInfo(subsCp,
                             dhcpPayload.getPacketType(), circuitId, dstMac, ip);
 
-                    allocationMap.put(sub.nasPortId(), info);
+                    allocationMap.put(sub.id(), info);
 
                     post(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.UPDATED, info, subsCp));
                 }
             } // end storing of info
 
+            SubscriberAndDeviceInformation entry = getSubscriberInfoFromServer(dhcpPayload);
+            updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("PACKETS_FROM_SERVER"));
+
             // we leave the srcMac from the original packet
             etherReply.setDestinationMACAddress(dstMac);
             etherReply.setQinQVID(sTag(subsCp).toShort());
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayConfig.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayConfig.java
index 9a47ab3..7dafaf4 100755
--- a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayConfig.java
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayConfig.java
@@ -37,8 +37,8 @@
     private static final String MODIFY_SRC_DST_MAC  = "modifyUlPacketsSrcDstMacAddresses";
     private static final String USE_OLT_ULPORT_FOR_PKT_INOUT = "useOltUplinkForServerPktInOut";
 
-    private static final Boolean DEFAULT_MODIFY_SRC_DST_MAC = false;
-    private static final Boolean DEFAULT_USE_OLT_ULPORT_FOR_PKT_INOUT = false;
+    protected static final Boolean DEFAULT_MODIFY_SRC_DST_MAC = false;
+    protected static final Boolean DEFAULT_USE_OLT_ULPORT_FOR_PKT_INOUT = false;
 
     @Override
     public boolean isValid() {
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCounters.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCounters.java
new file mode 100644
index 0000000..71f7adc
--- /dev/null
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCounters.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.opencord.dhcpl2relay.impl;
+
+import com.google.common.collect.ImmutableSet;
+
+import java.util.Set;
+
+/**
+ * Represents DHCP relay counters type.
+ */
+public enum DhcpL2RelayCounters {
+    /**
+     *  DHCP relay counter of type Discover.
+     */
+    DHCPDISCOVER,
+    /**
+     *  DHCP relay counter of type Release.
+     */
+    DHCPRELEASE,
+    /**
+     *  DHCP relay counter of type Decline.
+     */
+    DHCPDECLINE,
+    /**
+     *  DHCP relay counter of type Request.
+     */
+    DHCPREQUEST,
+    /**
+     *  DHCP relay counter of type Offer.
+     */
+    DHCPOFFER,
+    /**
+     *  DHCP relay counter of type ACK.
+     */
+    DHCPACK,
+    /**
+     *  DHCP relay counter of type NACK.
+     */
+    DHCPNACK,
+    /**
+     *  DHCP relay counter of type Packets_to_server.
+     */
+    PACKETS_TO_SERVER,
+    /**
+     *  DHCP relay counter of type Packets_from_server.
+     */
+    PACKETS_FROM_SERVER;
+
+    /**
+     * Supported types of DHCP relay counters.
+     */
+    static final Set<DhcpL2RelayCounters> SUPPORTED_COUNTERS = ImmutableSet.of(DhcpL2RelayCounters.DHCPDISCOVER,
+            DhcpL2RelayCounters.DHCPRELEASE, DhcpL2RelayCounters.DHCPDECLINE,
+            DhcpL2RelayCounters.DHCPREQUEST, DhcpL2RelayCounters.DHCPOFFER,
+            DhcpL2RelayCounters.DHCPACK, DhcpL2RelayCounters.DHCPNACK,
+            DhcpL2RelayCounters.PACKETS_TO_SERVER,
+            DhcpL2RelayCounters.PACKETS_FROM_SERVER);
+    }
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersIdentifier.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersIdentifier.java
new file mode 100644
index 0000000..0bdacdc
--- /dev/null
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersIdentifier.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.dhcpl2relay.impl;
+
+import java.util.Objects;
+
+/**
+ * Represents DHCP relay counters identifier.
+ */
+public final class DhcpL2RelayCountersIdentifier {
+    final String counterClassKey;
+    final Enum<DhcpL2RelayCounters> counterTypeKey;
+    static final String GLOBAL_COUNTER = "global";
+
+    /**
+     * Creates a default global counter identifier for a given counterType.
+     *
+     * @param counterTypeKey Identifies the supported type of DHCP relay counters
+     */
+    public DhcpL2RelayCountersIdentifier(DhcpL2RelayCounters counterTypeKey) {
+        this.counterClassKey = GLOBAL_COUNTER;
+        this.counterTypeKey = counterTypeKey;
+    }
+
+    /**
+     * Creates a counter identifier. A counter is defined by the key pair <counterClass, counterType>,
+     * where counterClass can be maybe global or the subscriber ID and counterType is the supported DHCP message type.
+     *
+     * @param counterClassKey Identifies which class the counter is assigned (global or per subscriber)
+     * @param counterTypeKey Identifies the supported type of DHCP relay counters
+     */
+    public DhcpL2RelayCountersIdentifier(String counterClassKey, DhcpL2RelayCounters counterTypeKey) {
+        this.counterClassKey = counterClassKey;
+        this.counterTypeKey = counterTypeKey;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj instanceof DhcpL2RelayCountersIdentifier) {
+            final DhcpL2RelayCountersIdentifier other = (DhcpL2RelayCountersIdentifier) obj;
+            return Objects.equals(this.counterClassKey, other.counterClassKey)
+                    && Objects.equals(this.counterTypeKey, other.counterTypeKey);
+        }
+
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(counterClassKey, counterTypeKey);
+    }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStore.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStore.java
new file mode 100644
index 0000000..72b450e
--- /dev/null
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStore.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.dhcpl2relay.impl;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Represents a stored DHCP Relay Counters. A counter entry is defined by the pair <counterClass, counterType>,
+ * where counterClass can be maybe global or subscriber ID and counterType is the DHCP message type.
+ */
+public interface DhcpL2RelayCountersStore {
+
+    String NAME = "DHCP_L2_Relay_stats";
+
+    /**
+     * Init counter values for a given counter class.
+     *
+     * @param counterClass class of counters (global, per subscriber).
+     */
+    void initCounters(String counterClass);
+
+    /**
+     * Creates or updates DHCP L2 Relay counter.
+     *
+     * @param counterClass class of counters (global, per subscriber).
+     * @param counterType name of counter
+     */
+    void incrementCounter(String counterClass, DhcpL2RelayCounters counterType);
+
+    /**
+     * Sets the value of a DHCP L2 Relay counter.
+     *
+     * @param counterClass class of counters (global, per subscriber).
+     * @param counterType name of counter
+     * @param value The value of the counter
+     */
+    void setCounter(String counterClass, DhcpL2RelayCounters counterType, Long value);
+
+    /**
+     * Gets the DHCP L2 Relay counters map.
+     *
+     * @return the DHCP counter map
+     */
+    public Map<DhcpL2RelayCountersIdentifier, AtomicLong> getCountersMap();
+
+    /**
+     * Resets counter values for a given counter class.
+     *
+     * @param counterClass class of counters (global, per subscriber).
+     */
+    void resetCounters(String counterClass);
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayStatsCommand.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayStatsCommand.java
new file mode 100644
index 0000000..5140243
--- /dev/null
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayStatsCommand.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.dhcpl2relay.impl;
+
+
+import org.apache.karaf.shell.commands.Argument;
+import org.apache.karaf.shell.commands.Command;
+import org.apache.karaf.shell.commands.Option;
+import org.onosproject.cli.AbstractShellCommand;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Display/Reset the DHCP L2 relay application statistics.
+ */
+@Command(scope = "onos", name = "dhcpl2relay-stats",
+        description = "Display or Reset the DHCP L2 relay application statistics")
+public class DhcpL2RelayStatsCommand extends AbstractShellCommand {
+    private static final String CONFIRM_PHRASE = "please";
+
+    @Option(name = "-r", aliases = "--reset", description = "Reset the counter[s]\n" +
+            "(WARNING!!!: In case no counter name is explicitly specified, all DHCP L2 Relay counters will be reset).",
+            required = false, multiValued = false)
+    private boolean reset = false;
+
+    @Option(name = "-s", aliases = "--subscriberId", description = "Subscriber Id\n",
+            required = false, multiValued = false)
+    private String subscriberId = null;
+
+    @Option(name = "-p", aliases = "--please", description = "Confirmation phrase",
+            required = false, multiValued = false)
+    String please = null;
+
+    @Argument(index = 0, name = "counter",
+            description = "The counter to display (or reset). In case not specified, all counters\nwill be " +
+                    "displayed (or reset in case the -r option is specified).",
+            required = false, multiValued = false)
+    DhcpL2RelayCounters counter = null;
+
+    @Override
+    protected void execute() {
+        DhcpL2RelayCountersStore dhcpCounters = AbstractShellCommand.get(
+                DhcpL2RelayCountersStore.class);
+
+        if ((subscriberId == null) || (subscriberId.equals("global"))) {
+            // All subscriber Ids
+            subscriberId = DhcpL2RelayCountersIdentifier.GLOBAL_COUNTER;
+        }
+
+       if (reset) {
+            if (please == null || !please.equals(CONFIRM_PHRASE)) {
+                print("WARNING: Be aware that you are going to reset the counters. " +
+                        "Enter confirmation phrase to continue.");
+                return;
+            }
+            if (counter == null) {
+                // Reset all global counters
+                dhcpCounters.resetCounters(subscriberId);
+            } else {
+                // Reset the specified counter
+                dhcpCounters.setCounter(subscriberId, counter, (long) 0);
+            }
+        } else {
+           Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpCounters.getCountersMap();
+           if (countersMap.size() > 0) {
+               if (counter == null) {
+                   String jsonString = "";
+                   if (outputJson()) {
+                       jsonString = String.format("{\"%s\":{", dhcpCounters.NAME);
+                   } else {
+                       print("%s [%s] :", dhcpCounters.NAME, subscriberId);
+                   }
+                   for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+                        it.hasNext();) {
+                       DhcpL2RelayCounters counterType = it.next();
+                       AtomicLong v = countersMap.get(new DhcpL2RelayCountersIdentifier(subscriberId, counterType));
+                       if (v == null) {
+                           v = new AtomicLong(0);
+                       }
+                       if (outputJson()) {
+                           jsonString += String.format("\"%s\":%d", counterType, v.longValue());
+                           if (it.hasNext()) {
+                               jsonString += ",";
+                           }
+                       } else {
+                           printCounter(counterType, v);
+                       }
+                   }
+                   if (outputJson()) {
+                       jsonString += "}}";
+                       print("%s", jsonString);
+                   }
+               } else {
+                   // Show only the specified counter
+                   AtomicLong v = countersMap.get(new DhcpL2RelayCountersIdentifier(subscriberId, counter));
+                   if (v == null) {
+                       v = new AtomicLong(0);
+                   }
+                   if (outputJson()) {
+                       print("{\"%s\":%d}", counter, v.longValue());
+                   } else {
+                       printCounter(counter, v);
+                   }
+               }
+           } else {
+               print("No DHCP L2 Relay Counters were created yet for counter class [%s]",
+                       DhcpL2RelayCountersIdentifier.GLOBAL_COUNTER);
+           }
+       }
+    }
+
+    void printCounter(DhcpL2RelayCounters c, AtomicLong a) {
+        // print in non-JSON format
+        print("  %s %s %-4d", c,
+                String.join("", Collections.nCopies(50 - c.toString().length(), ".")),
+                a.longValue());
+    }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/SimpleDhcpL2RelayCountersStore.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/SimpleDhcpL2RelayCountersStore.java
new file mode 100644
index 0000000..455d761
--- /dev/null
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/SimpleDhcpL2RelayCountersStore.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.dhcpl2relay.impl;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.slf4j.Logger;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.slf4j.LoggerFactory.getLogger;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * DHCP Relay Agent Counters Manager Component.
+ */
+@Component(immediate = true)
+@Service
+public class SimpleDhcpL2RelayCountersStore implements DhcpL2RelayCountersStore {
+    private ApplicationId appId;
+    private final Logger log = getLogger(getClass());
+    private Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected CoreService coreService;
+
+    @Activate
+    public void activate() {
+        log.info("Activate Dhcp L2 Counters Manager");
+        //appId = coreService.getAppId(DhcpL2Relay.DHCP_L2RELAY_APP);
+        countersMap = new ConcurrentHashMap();
+        // Initialize counter values for the global counters
+        initCounters(DhcpL2RelayCountersIdentifier.GLOBAL_COUNTER);
+    }
+
+    public ImmutableMap<DhcpL2RelayCountersIdentifier, AtomicLong> getCountersMap() {
+        return ImmutableMap.copyOf(countersMap);
+    }
+
+    /**
+     * Initialize the supported counters map for the given counter class.
+     * @param counterClass class of counters (global, per subscriber)
+     */
+    public void initCounters(String counterClass) {
+        checkNotNull(counterClass, "counter class can't be null");
+        for (DhcpL2RelayCounters counterType : DhcpL2RelayCounters.SUPPORTED_COUNTERS) {
+            countersMap.put(new DhcpL2RelayCountersIdentifier(counterClass, counterType), new AtomicLong(0));
+        }
+    }
+
+    /**
+     * Inserts the counter entry if it is not already in the set otherwise increment the existing counter entry.
+     * @param counterClass class of counters (global, per subscriber)
+     * @param counterType name of counter
+     */
+    public void incrementCounter(String counterClass, DhcpL2RelayCounters counterType) {
+        checkNotNull(counterClass, "counter class can't be null");
+        if (DhcpL2RelayCounters.SUPPORTED_COUNTERS.contains(counterType)) {
+            DhcpL2RelayCountersIdentifier counterIdentifier =
+                    new DhcpL2RelayCountersIdentifier(counterClass, counterType);
+            countersMap.compute(counterIdentifier, (key, counterValue) ->
+                (counterValue != null) ? new AtomicLong(counterValue.incrementAndGet()) : new AtomicLong(1)
+            );
+        } else {
+            log.error("Failed to increment counter class {} of type {}", counterClass, counterType);
+        }
+    }
+
+    /**
+     * Reset the counters map for the given counter class.
+     * @param counterClass class of counters (global, per subscriber)
+     */
+    public void resetCounters(String counterClass) {
+        checkNotNull(counterClass, "counter class can't be null");
+        for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator(); it.hasNext();) {
+            DhcpL2RelayCounters counterType = it.next();
+            DhcpL2RelayCountersIdentifier counterIdentifier =
+                    new DhcpL2RelayCountersIdentifier(counterClass, counterType);
+            countersMap.computeIfPresent(counterIdentifier, (key, counterValue) ->
+                    new AtomicLong(0)
+            );
+        }
+    }
+
+    /**
+     * Inserts the counter entry if it is not already in the set otherwise update the existing counter entry.
+     * @param counterClass class of counters (global, per subscriber).
+     * @param counterType name of counter
+     * @param value conter value
+     */
+    public void setCounter(String counterClass, DhcpL2RelayCounters counterType, Long value) {
+        checkNotNull(counterClass, "counter class can't be null");
+        if (DhcpL2RelayCounters.SUPPORTED_COUNTERS.contains(counterType)) {
+            DhcpL2RelayCountersIdentifier counterIdentifier =
+                    new DhcpL2RelayCountersIdentifier(counterClass, counterType);
+            countersMap.put(counterIdentifier, new AtomicLong(value));
+        } else {
+            log.error("Failed to increment counter class {} of type {}", counterClass, counterType);
+        }
+    }
+}
diff --git a/app/src/main/resources/OSGI-INF/blueprint/shell-config.xml b/app/src/main/resources/OSGI-INF/blueprint/shell-config.xml
index b5edcc3..49e8ef7 100644
--- a/app/src/main/resources/OSGI-INF/blueprint/shell-config.xml
+++ b/app/src/main/resources/OSGI-INF/blueprint/shell-config.xml
@@ -19,5 +19,8 @@
         <command>
             <action class="org.opencord.dhcpl2relay.impl.DhcpL2RelayAllocationsCommand"/>
         </command>
+        <command>
+            <action class="org.opencord.dhcpl2relay.impl.DhcpL2RelayStatsCommand"/>
+        </command>
     </command-bundle>
 </blueprint>
\ No newline at end of file
