[SEBA-144] Operational Status DHCP L2 Relay.

Cherry-picked from dhcpl2relay-1.6

Change-Id: I58aab083152793af36dfd34fa91cd78385fd2ed7
Signed-off-by: Marcos Aurelio Carrero <mcarrero@furukawalatam.com>
diff --git a/api/src/main/java/org/opencord/dhcpl2relay/DhcpL2RelayEvent.java b/api/src/main/java/org/opencord/dhcpl2relay/DhcpL2RelayEvent.java
index 364197d..596fb6c 100644
--- a/api/src/main/java/org/opencord/dhcpl2relay/DhcpL2RelayEvent.java
+++ b/api/src/main/java/org/opencord/dhcpl2relay/DhcpL2RelayEvent.java
@@ -19,13 +19,24 @@
 import org.onosproject.event.AbstractEvent;
 import org.onosproject.net.ConnectPoint;
 
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * Dhcp L2 relay event.
  */
 public class DhcpL2RelayEvent extends AbstractEvent<DhcpL2RelayEvent.Type, DhcpAllocationInfo> {
 
+    public static final String GLOBAL_COUNTER = "global";
+
     private final ConnectPoint connectPoint;
 
+    private final Map.Entry<String, AtomicLong> countersEntry;
+
+    private final String dhcpCountersTopic;
+
+    private final String subscriberId;
+
     /**
      * Type of the event.
      */
@@ -38,7 +49,32 @@
         /**
          * DHCP lease was removed.
          */
-        REMOVED
+        REMOVED,
+
+        /**
+         * DHCP stats update.
+         */
+        STATS_UPDATE
+    }
+
+    /**
+     * Creates a new event used for STATS.
+     *
+     * @param type type of the event
+     * @param allocationInfo DHCP allocation info
+     * @param connectPoint connect point the client is on
+     * @param countersEntry an entry that represents the counters (used for STATS events)
+     * @param dhcpCountersTopic Kafka topic where the dhcp counters are published
+     * @param subscriberId the subscriber identifier information
+     */
+    public DhcpL2RelayEvent(Type type, DhcpAllocationInfo allocationInfo, ConnectPoint connectPoint,
+                            Map.Entry<String, AtomicLong> countersEntry,
+                            String dhcpCountersTopic, String subscriberId) {
+        super(type, allocationInfo);
+        this.connectPoint = connectPoint;
+        this.countersEntry = countersEntry;
+        this.dhcpCountersTopic = dhcpCountersTopic;
+        this.subscriberId = subscriberId;
     }
 
     /**
@@ -51,6 +87,9 @@
     public DhcpL2RelayEvent(Type type, DhcpAllocationInfo allocationInfo, ConnectPoint connectPoint) {
         super(type, allocationInfo);
         this.connectPoint = connectPoint;
+        this.countersEntry = null;
+        this.dhcpCountersTopic = null;
+        this.subscriberId = null;
     }
 
     /**
@@ -61,4 +100,31 @@
     public ConnectPoint connectPoint() {
         return connectPoint;
     }
+
+    /**
+     * Gets the counters map entry.
+     *
+     * @return counters map entry
+     */
+    public Map.Entry<String, AtomicLong> getCountersEntry() {
+        return countersEntry;
+    }
+
+    /**
+     * Gets the Kafka topic where the dhcp counters are published.
+     *
+     * @return the dhcp kafka topic
+     */
+    public String getDhcpCountersTopic() {
+        return dhcpCountersTopic;
+    }
+
+    /**
+     * Gets the subscriber identifier information.
+     *
+     * @return the Id from subscriber
+     */
+    public String getSubscriberId() {
+        return subscriberId;
+    }
 }
diff --git a/app/pom.xml b/app/pom.xml
index 338b11a..162450a 100644
--- a/app/pom.xml
+++ b/app/pom.xml
@@ -27,7 +27,7 @@
     <artifactId>dhcpl2relay-app</artifactId>
 
     <packaging>bundle</packaging>
-    <description>DHCP L2 Realy application for CORD</description>
+    <description>DHCP L2 Relay application for CORD</description>
 
     <properties>
         <onos.app.name>org.opencord.dhcpl2relay</onos.app.name>
@@ -94,6 +94,12 @@
 
         <dependency>
             <groupId>org.apache.karaf.shell</groupId>
+            <artifactId>org.apache.karaf.shell.core</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.karaf.shell</groupId>
             <artifactId>org.apache.karaf.shell.console</artifactId>
             <scope>provided</scope>
         </dependency>
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/cli/DhcpL2RelayStatsCommand.java b/app/src/main/java/org/opencord/dhcpl2relay/cli/DhcpL2RelayStatsCommand.java
new file mode 100644
index 0000000..0f4901f
--- /dev/null
+++ b/app/src/main/java/org/opencord/dhcpl2relay/cli/DhcpL2RelayStatsCommand.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.dhcpl2relay.cli;
+
+
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.apache.karaf.shell.api.action.Option;
+import org.onosproject.cli.AbstractShellCommand;
+import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
+import org.opencord.dhcpl2relay.impl.DhcpL2RelayCounters;
+import org.opencord.dhcpl2relay.impl.DhcpL2RelayCountersIdentifier;
+import org.opencord.dhcpl2relay.impl.DhcpL2RelayCountersStore;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Display/Reset the DHCP L2 relay application statistics.
+ */
+@Service
+@Command(scope = "onos", name = "dhcpl2relay-stats",
+        description = "Display or Reset the DHCP L2 relay application statistics")
+public class DhcpL2RelayStatsCommand extends AbstractShellCommand {
+    private static final String CONFIRM_PHRASE = "please";
+
+    @Option(name = "-r", aliases = "--reset", description = "Reset the counter[s]\n" +
+            "(WARNING!!!: In case no counter name is explicitly specified, all DHCP L2 Relay counters will be reset).",
+            required = false, multiValued = false)
+    private boolean reset = false;
+
+    @Option(name = "-s", aliases = "--subscriberId", description = "Subscriber Id\n",
+            required = false, multiValued = false)
+    private String subscriberId = null;
+
+    @Option(name = "-p", aliases = "--please", description = "Confirmation phrase",
+            required = false, multiValued = false)
+    String please = null;
+
+    @Argument(index = 0, name = "counter",
+            description = "The counter to display (or reset). In case not specified, all counters\nwill be " +
+                    "displayed (or reset in case the -r option is specified).",
+            required = false, multiValued = false)
+    DhcpL2RelayCounters counter = null;
+
+    @Override
+    protected void doExecute() {
+        DhcpL2RelayCountersStore dhcpCounters = AbstractShellCommand.get(
+                DhcpL2RelayCountersStore.class);
+
+        if ((subscriberId == null) || (subscriberId.equals("global"))) {
+            // All subscriber Ids
+            subscriberId = DhcpL2RelayEvent.GLOBAL_COUNTER;
+        }
+
+       if (reset) {
+            if (please == null || !please.equals(CONFIRM_PHRASE)) {
+                print("WARNING: Be aware that you are going to reset the counters. " +
+                        "Enter confirmation phrase to continue.");
+                return;
+            }
+            if (counter == null) {
+                // Reset all global counters
+                dhcpCounters.resetCounters(subscriberId);
+            } else {
+                // Reset the specified counter
+                dhcpCounters.setCounter(subscriberId, counter, (long) 0);
+            }
+        } else {
+           Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpCounters.getCountersMap();
+           if (countersMap.size() > 0) {
+               if (counter == null) {
+                   String jsonString = "";
+                   if (outputJson()) {
+                       jsonString = String.format("{\"%s\":{", dhcpCounters.NAME);
+                   } else {
+                       print("%s [%s] :", dhcpCounters.NAME, subscriberId);
+                   }
+                   DhcpL2RelayCounters[] counters = DhcpL2RelayCounters.values();
+                   for (int i = 0; i < counters.length; i++) {
+                       DhcpL2RelayCounters counterType = counters[i];
+                       AtomicLong v = countersMap.get(new DhcpL2RelayCountersIdentifier(subscriberId, counterType));
+                       if (v == null) {
+                           v = new AtomicLong(0);
+                       }
+                       if (outputJson()) {
+                           jsonString += String.format("\"%s\":%d", counterType, v.longValue());
+                           if (i < counters.length - 1) {
+                               jsonString += ",";
+                           }
+                       } else {
+                           printCounter(counterType, v);
+                       }
+                   }
+                   if (outputJson()) {
+                       jsonString += "}}";
+                       print("%s", jsonString);
+                   }
+               } else {
+                   // Show only the specified counter
+                   AtomicLong v = countersMap.get(new DhcpL2RelayCountersIdentifier(subscriberId, counter));
+                   if (v == null) {
+                       v = new AtomicLong(0);
+                   }
+                   if (outputJson()) {
+                       print("{\"%s\":%d}", counter, v.longValue());
+                   } else {
+                       printCounter(counter, v);
+                   }
+               }
+           } else {
+               print("No DHCP L2 Relay Counters were created yet for counter class [%s]",
+                       DhcpL2RelayEvent.GLOBAL_COUNTER);
+           }
+       }
+    }
+
+    void printCounter(DhcpL2RelayCounters c, AtomicLong a) {
+        // print in non-JSON format
+        print("  %s %s %-4d", c,
+                String.join("", Collections.nCopies(50 - c.toString().length(), ".")),
+                a.longValue());
+    }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
index c7e4913..411d763 100755
--- a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
@@ -19,6 +19,31 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import static org.onlab.packet.DHCP.DHCPOptionCode.OptionCode_MessageType;
+import static org.onlab.packet.MacAddress.valueOf;
+import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.getIntegerProperty;
+import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.commons.io.HexDump;
 import org.onlab.packet.DHCP;
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.IPv4;
@@ -28,6 +53,7 @@
 import org.onlab.packet.UDP;
 import org.onlab.packet.VlanId;
 import org.onlab.packet.dhcp.DhcpOption;
+import org.onlab.util.SafeRecurringTask;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.core.ApplicationId;
@@ -80,24 +106,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Dictionary;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-
-import static org.onlab.packet.DHCP.DHCPOptionCode.OptionCode_MessageType;
-import static org.onlab.packet.MacAddress.valueOf;
-import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
-import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.ENABLE_DHCP_BROADCAST_REPLIES;
-import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.ENABLE_DHCP_BROADCAST_REPLIES_DEFAULT;
-import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.OPTION_82;
-import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.OPTION_82_DEFAULT;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.*;
 
 /**
  * DHCP Relay Agent Application Component.
@@ -106,6 +115,8 @@
 property = {
         OPTION_82 + ":Boolean=" + OPTION_82_DEFAULT,
         ENABLE_DHCP_BROADCAST_REPLIES + ":Boolean=" + ENABLE_DHCP_BROADCAST_REPLIES_DEFAULT,
+        PUBLISH_COUNTERS_RATE + ":Integer=" + PUBLISH_COUNTERS_RATE_DEFAULT,
+        DHCP_COUNTERS_TOPIC + ":String=" + DHCP_COUNTERS_TOPIC_DEFAULT
 })
 public class DhcpL2Relay
         extends AbstractListenerManager<DhcpL2RelayEvent, DhcpL2RelayListener>
@@ -156,12 +167,24 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected FlowObjectiveService flowObjectiveService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected DhcpL2RelayCountersStore dhcpL2RelayCounters;
+
+    // OSGi Properties
     /** Add option 82 to relayed packets. */
     protected boolean option82 = OPTION_82_DEFAULT;
-
     /** Ask the DHCP Server to send back replies as L2 broadcast. */
     protected boolean enableDhcpBroadcastReplies = ENABLE_DHCP_BROADCAST_REPLIES_DEFAULT;
 
+    protected int publishCountersRate = PUBLISH_COUNTERS_RATE_DEFAULT;
+    private String dhcpCountersTopic = DHCP_COUNTERS_TOPIC_DEFAULT;
+
+
+    protected PublishCountersToKafka publishCountersToKafka;
+
+    ScheduledFuture<?> refreshTask;
+    ScheduledExecutorService refreshService = Executors.newSingleThreadScheduledExecutor();
+
     private DhcpRelayPacketProcessor dhcpRelayPacketProcessor =
             new DhcpRelayPacketProcessor();
 
@@ -170,14 +193,14 @@
 
     // connect points to the DHCP server
     Set<ConnectPoint> dhcpConnectPoints;
-    private AtomicReference<ConnectPoint> dhcpServerConnectPoint = new AtomicReference<>();
+    protected AtomicReference<ConnectPoint> dhcpServerConnectPoint = new AtomicReference<>();
     private MacAddress dhcpConnectMac = MacAddress.BROADCAST;
     private ApplicationId appId;
 
     static Map<String, DhcpAllocationInfo> allocationMap = Maps.newConcurrentMap();
-    private boolean modifyClientPktsSrcDstMac = false;
+    protected boolean modifyClientPktsSrcDstMac = false;
     //Whether to use the uplink port of the OLTs to send/receive messages to the DHCP server
-    private boolean useOltUplink = false;
+    protected boolean useOltUplink = false;
 
     private BaseInformationService<SubscriberAndDeviceInformation> subsService;
 
@@ -207,12 +230,21 @@
             modified(context);
         }
 
+        publishCountersToKafka = new PublishCountersToKafka();
+        subsService = sadisService.getSubscriberInfoService();
+        restartPublishCountersTask();
 
         log.info("DHCP-L2-RELAY Started");
     }
 
     @Deactivate
     protected void deactivate() {
+        if (refreshTask != null) {
+            refreshTask.cancel(true);
+        }
+        if (refreshService != null) {
+            refreshService.shutdownNow();
+        }
         cfgService.removeListener(cfgListener);
         factories.forEach(cfgService::unregisterConfigFactory);
         packetService.removeProcessor(dhcpRelayPacketProcessor);
@@ -239,6 +271,62 @@
         if (o != null) {
             enableDhcpBroadcastReplies = o;
         }
+
+        Integer newPublishCountersRate = getIntegerProperty(properties, "publishCountersRate");
+        if (newPublishCountersRate != null) {
+            if (newPublishCountersRate != publishCountersRate && newPublishCountersRate >= 0) {
+                log.info("publishCountersRate modified from {} to {}", publishCountersRate, newPublishCountersRate);
+                publishCountersRate = newPublishCountersRate;
+            } else if (newPublishCountersRate < 0) {
+                log.error("Invalid newPublishCountersRate : {}, defaulting to 0", newPublishCountersRate);
+                publishCountersRate = 0;
+            }
+            restartPublishCountersTask();
+        }
+
+        String newDhcpCountersTopic = get(properties, "dhcpCountersTopic");
+        if (newDhcpCountersTopic != null && !newDhcpCountersTopic.equals(dhcpCountersTopic)) {
+            log.info("Property dhcpCountersTopic modified from {} to {}", dhcpCountersTopic, newDhcpCountersTopic);
+            dhcpCountersTopic = newDhcpCountersTopic;
+        }
+    }
+
+    /**
+     * Starts a thread to publish the counters to kafka at a certain rate time.
+     */
+    private void restartPublishCountersTask() {
+        if (refreshTask != null) {
+            refreshTask.cancel(true);
+        }
+        if (publishCountersRate > 0) {
+            log.info("Refresh Rate set to {}, publishCountersToKafka will be called every {} seconds",
+                    publishCountersRate, publishCountersRate);
+            refreshTask = refreshService.scheduleWithFixedDelay(SafeRecurringTask.wrap(publishCountersToKafka),
+                    publishCountersRate, publishCountersRate, TimeUnit.SECONDS);
+        } else {
+            log.info("Refresh Rate set to 0, disabling calls to publishCountersToKafka");
+        }
+    }
+
+    /**
+     * Publish the counters to kafka.
+     */
+    private class PublishCountersToKafka implements Runnable {
+        public void run() {
+            dhcpL2RelayCounters.getCountersMap().forEach((counterKey, counterValue) -> {
+                // Publish the global counters
+                if (counterKey.counterClassKey.equals(DhcpL2RelayEvent.GLOBAL_COUNTER)) {
+                    post(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.STATS_UPDATE, null, null,
+                            new AbstractMap.SimpleEntry<String, AtomicLong>(counterKey.counterTypeKey.toString(),
+                                    counterValue), dhcpCountersTopic, null));
+                } else { // Publish the counters per subscriber
+                    DhcpAllocationInfo info = allocationMap.get(counterKey.counterClassKey);
+                    post(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.STATS_UPDATE, info, null,
+                            new AbstractMap.SimpleEntry<String, AtomicLong>(counterKey.counterTypeKey.toString(),
+                                    counterValue), dhcpCountersTopic, counterKey.counterClassKey));
+                }
+            });
+        }
     }
 
     /**
@@ -246,7 +334,7 @@
      *
      * @return true if all information we need have been initialized
      */
-    private boolean configured() {
+    protected boolean configured() {
         if (!useOltUplink) {
             return dhcpServerConnectPoint.get() != null;
         }
@@ -606,6 +694,9 @@
                               toSendTo, packet);
                 }
                 packetService.emit(o);
+
+                SubscriberAndDeviceInformation entry = getSubscriberInfoFromClient(context);
+                updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("PACKETS_TO_SERVER"));
             } else {
                 log.error("No connect point to send msg to DHCP Server");
             }
@@ -623,6 +714,44 @@
             return null;
         }
 
+        private void  updateDhcpRelayCountersStore(SubscriberAndDeviceInformation entry,
+                                                   DhcpL2RelayCounters counterType) {
+            // Update global counter stats
+            dhcpL2RelayCounters.incrementCounter(DhcpL2RelayEvent.GLOBAL_COUNTER, counterType);
+            if (entry == null) {
+                log.warn("Counter not updated as subscriber info not found.");
+            } else {
+                // Update subscriber counter stats
+                dhcpL2RelayCounters.incrementCounter(entry.id(), counterType);
+            }
+        }
+
+        /*
+         * Get subscriber information based on it's context packet.
+         */
+        private SubscriberAndDeviceInformation getSubscriberInfoFromClient(PacketContext context) {
+            if (context != null) {
+                return getSubscriber(context);
+            }
+            return null;
+        }
+
+        /*
+         * Get subscriber information based on it's DHCP payload.
+         */
+        private SubscriberAndDeviceInformation getSubscriberInfoFromServer(DHCP dhcpPayload) {
+            if (dhcpPayload != null) {
+                MacAddress descMac = valueOf(dhcpPayload.getClientHardwareAddress());
+                ConnectPoint subsCp = getConnectPointOfClient(descMac);
+
+                if (subsCp != null) {
+                    String portId = nasPortId(subsCp);
+                    return subsService.get(portId);
+                }
+            }
+            return null;
+        }
+
         //process the dhcp packet before sending to server
         private void processDhcpPacket(PacketContext context, Ethernet packet,
                                        DHCP dhcpPayload) {
@@ -632,6 +761,18 @@
             }
 
             DHCP.MsgType incomingPacketType = getDhcpPacketType(dhcpPayload);
+            if (incomingPacketType == null) {
+                log.warn("DHCP packet type not found. Dump of ethernet pkt in hex format for troubleshooting.");
+                byte[] array = packet.serialize();
+                ByteArrayOutputStream buf = new ByteArrayOutputStream();
+                try {
+                    HexDump.dump(array, 0, buf, 0);
+                    log.trace(buf.toString());
+                } catch (Exception e) { }
+                return;
+            }
+
+            SubscriberAndDeviceInformation entry = null;
 
             log.info("Received DHCP Packet of type {} from {}",
                      incomingPacketType, context.inPacket().receivedFrom());
@@ -643,6 +784,8 @@
                     if (ethernetPacketDiscover != null) {
                         forwardPacket(ethernetPacketDiscover, context);
                     }
+                    entry = getSubscriberInfoFromClient(context);
+                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPDISCOVER"));
                     break;
                 case DHCPOFFER:
                     //reply to dhcp client.
@@ -651,6 +794,8 @@
                     if (ethernetPacketOffer != null) {
                         sendReply(ethernetPacketOffer, dhcpPayload);
                     }
+                    entry = getSubscriberInfoFromServer(dhcpPayload);
+                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPOFFER"));
                     break;
                 case DHCPREQUEST:
                     Ethernet ethernetPacketRequest =
@@ -658,6 +803,8 @@
                     if (ethernetPacketRequest != null) {
                         forwardPacket(ethernetPacketRequest, context);
                     }
+                    entry = getSubscriberInfoFromClient(context);
+                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPREQUEST"));
                     break;
                 case DHCPACK:
                     //reply to dhcp client.
@@ -666,6 +813,20 @@
                     if (ethernetPacketAck != null) {
                         sendReply(ethernetPacketAck, dhcpPayload);
                     }
+                    entry = getSubscriberInfoFromServer(dhcpPayload);
+                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPACK"));
+                    break;
+                case DHCPDECLINE:
+                    entry = getSubscriberInfoFromClient(context);
+                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPDECLINE"));
+                    break;
+                case DHCPNAK:
+                    entry = getSubscriberInfoFromServer(dhcpPayload);
+                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPNACK"));
+                    break;
+                case DHCPRELEASE:
+                    entry = getSubscriberInfoFromClient(context);
+                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPRELEASE"));
                     break;
                 default:
                     break;
@@ -709,7 +870,7 @@
                     context.inPacket().receivedFrom(), dhcpPacket.getPacketType(),
                     entry.nasPortId(), clientMac, clientIp);
 
-            allocationMap.put(entry.nasPortId(), info);
+            allocationMap.put(entry.id(), info);
 
             post(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.UPDATED, info,
                                       context.inPacket().receivedFrom()));
@@ -781,12 +942,15 @@
                     DhcpAllocationInfo info = new DhcpAllocationInfo(subsCp,
                             dhcpPayload.getPacketType(), circuitId, dstMac, ip);
 
-                    allocationMap.put(sub.nasPortId(), info);
+                    allocationMap.put(sub.id(), info);
 
                     post(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.UPDATED, info, subsCp));
                 }
             } // end storing of info
 
+            SubscriberAndDeviceInformation entry = getSubscriberInfoFromServer(dhcpPayload);
+            updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("PACKETS_FROM_SERVER"));
+
             // we leave the srcMac from the original packet
             etherReply.setDestinationMACAddress(dstMac);
             etherReply.setQinQVID(sTag(subsCp).toShort());
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayConfig.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayConfig.java
index 9a47ab3..7dafaf4 100755
--- a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayConfig.java
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayConfig.java
@@ -37,8 +37,8 @@
     private static final String MODIFY_SRC_DST_MAC  = "modifyUlPacketsSrcDstMacAddresses";
     private static final String USE_OLT_ULPORT_FOR_PKT_INOUT = "useOltUplinkForServerPktInOut";
 
-    private static final Boolean DEFAULT_MODIFY_SRC_DST_MAC = false;
-    private static final Boolean DEFAULT_USE_OLT_ULPORT_FOR_PKT_INOUT = false;
+    protected static final Boolean DEFAULT_MODIFY_SRC_DST_MAC = false;
+    protected static final Boolean DEFAULT_USE_OLT_ULPORT_FOR_PKT_INOUT = false;
 
     @Override
     public boolean isValid() {
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCounters.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCounters.java
new file mode 100644
index 0000000..71f7adc
--- /dev/null
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCounters.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.opencord.dhcpl2relay.impl;
+
+import com.google.common.collect.ImmutableSet;
+
+import java.util.Set;
+
+/**
+ * Represents DHCP relay counters type.
+ */
+public enum DhcpL2RelayCounters {
+    /**
+     *  DHCP relay counter of type Discover.
+     */
+    DHCPDISCOVER,
+    /**
+     *  DHCP relay counter of type Release.
+     */
+    DHCPRELEASE,
+    /**
+     *  DHCP relay counter of type Decline.
+     */
+    DHCPDECLINE,
+    /**
+     *  DHCP relay counter of type Request.
+     */
+    DHCPREQUEST,
+    /**
+     *  DHCP relay counter of type Offer.
+     */
+    DHCPOFFER,
+    /**
+     *  DHCP relay counter of type ACK.
+     */
+    DHCPACK,
+    /**
+     *  DHCP relay counter of type NACK.
+     */
+    DHCPNACK,
+    /**
+     *  DHCP relay counter of type Packets_to_server.
+     */
+    PACKETS_TO_SERVER,
+    /**
+     *  DHCP relay counter of type Packets_from_server.
+     */
+    PACKETS_FROM_SERVER;
+
+    /**
+     * Supported types of DHCP relay counters.
+     */
+    static final Set<DhcpL2RelayCounters> SUPPORTED_COUNTERS = ImmutableSet.of(DhcpL2RelayCounters.DHCPDISCOVER,
+            DhcpL2RelayCounters.DHCPRELEASE, DhcpL2RelayCounters.DHCPDECLINE,
+            DhcpL2RelayCounters.DHCPREQUEST, DhcpL2RelayCounters.DHCPOFFER,
+            DhcpL2RelayCounters.DHCPACK, DhcpL2RelayCounters.DHCPNACK,
+            DhcpL2RelayCounters.PACKETS_TO_SERVER,
+            DhcpL2RelayCounters.PACKETS_FROM_SERVER);
+    }
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersIdentifier.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersIdentifier.java
new file mode 100644
index 0000000..63587e1
--- /dev/null
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersIdentifier.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.dhcpl2relay.impl;
+
+import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
+
+import java.util.Objects;
+
+/**
+ * Represents DHCP relay counters identifier.
+ */
+public final class DhcpL2RelayCountersIdentifier {
+    final String counterClassKey;
+    final Enum<DhcpL2RelayCounters> counterTypeKey;
+
+    /**
+     * Creates a default global counter identifier for a given counterType.
+     *
+     * @param counterTypeKey Identifies the supported type of DHCP relay counters
+     */
+    public DhcpL2RelayCountersIdentifier(DhcpL2RelayCounters counterTypeKey) {
+        this.counterClassKey = DhcpL2RelayEvent.GLOBAL_COUNTER;
+        this.counterTypeKey = counterTypeKey;
+    }
+
+    /**
+     * Creates a counter identifier. A counter is defined by the key pair &lt;counterClass, counterType&gt;,
+     * where counterClass can be maybe global or the subscriber ID and counterType is the supported DHCP message type.
+     *
+     * @param counterClassKey Identifies which class the counter is assigned (global or per subscriber)
+     * @param counterTypeKey Identifies the supported type of DHCP relay counters
+     */
+    public DhcpL2RelayCountersIdentifier(String counterClassKey, DhcpL2RelayCounters counterTypeKey) {
+        this.counterClassKey = counterClassKey;
+        this.counterTypeKey = counterTypeKey;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj instanceof DhcpL2RelayCountersIdentifier) {
+            final DhcpL2RelayCountersIdentifier other = (DhcpL2RelayCountersIdentifier) obj;
+            return Objects.equals(this.counterClassKey, other.counterClassKey)
+                    && Objects.equals(this.counterTypeKey, other.counterTypeKey);
+        }
+
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(counterClassKey, counterTypeKey);
+    }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStore.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStore.java
new file mode 100644
index 0000000..b79006b
--- /dev/null
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStore.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.dhcpl2relay.impl;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Represents a stored DHCP Relay Counters. A counter entry is defined by the pair &lt;counterClass, counterType&gt;,
+ * where counterClass can be maybe global or subscriber ID and counterType is the DHCP message type.
+ */
+public interface DhcpL2RelayCountersStore {
+
+    String NAME = "DHCP_L2_Relay_stats";
+
+    /**
+     * Init counter values for a given counter class.
+     *
+     * @param counterClass class of counters (global, per subscriber).
+     */
+    void initCounters(String counterClass);
+
+    /**
+     * Creates or updates DHCP L2 Relay counter.
+     *
+     * @param counterClass class of counters (global, per subscriber).
+     * @param counterType name of counter
+     */
+    void incrementCounter(String counterClass, DhcpL2RelayCounters counterType);
+
+    /**
+     * Sets the value of a DHCP L2 Relay counter.
+     *
+     * @param counterClass class of counters (global, per subscriber).
+     * @param counterType name of counter
+     * @param value The value of the counter
+     */
+    void setCounter(String counterClass, DhcpL2RelayCounters counterType, Long value);
+
+    /**
+     * Gets the DHCP L2 Relay counters map.
+     *
+     * @return the DHCP counter map
+     */
+    public Map<DhcpL2RelayCountersIdentifier, AtomicLong> getCountersMap();
+
+    /**
+     * Resets counter values for a given counter class.
+     *
+     * @param counterClass class of counters (global, per subscriber).
+     */
+    void resetCounters(String counterClass);
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/OsgiPropertyConstants.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/OsgiPropertyConstants.java
index 2612fdf..af89aa3 100644
--- a/app/src/main/java/org/opencord/dhcpl2relay/impl/OsgiPropertyConstants.java
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/OsgiPropertyConstants.java
@@ -29,4 +29,10 @@
 
     public static final String ENABLE_DHCP_BROADCAST_REPLIES = "enableDhcpBroadcastReplies";
     public static final boolean ENABLE_DHCP_BROADCAST_REPLIES_DEFAULT = false;
+
+    public static final String PUBLISH_COUNTERS_RATE = "publishCountersRate";
+    public static final int PUBLISH_COUNTERS_RATE_DEFAULT = 10;
+
+    public static final String DHCP_COUNTERS_TOPIC = "dhcpCountersTopic";
+    public static final String DHCP_COUNTERS_TOPIC_DEFAULT = "onos_traffic.stats";
 }
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/SimpleDhcpL2RelayCountersStore.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/SimpleDhcpL2RelayCountersStore.java
new file mode 100644
index 0000000..eedbace
--- /dev/null
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/SimpleDhcpL2RelayCountersStore.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.dhcpl2relay.impl;
+
+import com.google.common.collect.ImmutableMap;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.slf4j.LoggerFactory.getLogger;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * DHCP Relay Agent Counters Manager Component.
+ */
+@Component(immediate = true)
+public class SimpleDhcpL2RelayCountersStore implements DhcpL2RelayCountersStore {
+    private ApplicationId appId;
+    private final Logger log = getLogger(getClass());
+    private Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected CoreService coreService;
+
+    @Activate
+    public void activate() {
+        log.info("Activate Dhcp L2 Counters Manager");
+        //appId = coreService.getAppId(DhcpL2Relay.DHCP_L2RELAY_APP);
+        countersMap = new ConcurrentHashMap();
+        // Initialize counter values for the global counters
+        initCounters(DhcpL2RelayEvent.GLOBAL_COUNTER);
+    }
+
+    public ImmutableMap<DhcpL2RelayCountersIdentifier, AtomicLong> getCountersMap() {
+        return ImmutableMap.copyOf(countersMap);
+    }
+
+    /**
+     * Initialize the supported counters map for the given counter class.
+     * @param counterClass class of counters (global, per subscriber)
+     */
+    public void initCounters(String counterClass) {
+        checkNotNull(counterClass, "counter class can't be null");
+        for (DhcpL2RelayCounters counterType : DhcpL2RelayCounters.SUPPORTED_COUNTERS) {
+            countersMap.put(new DhcpL2RelayCountersIdentifier(counterClass, counterType), new AtomicLong(0));
+        }
+    }
+
+    /**
+     * Inserts the counter entry if it is not already in the set otherwise increment the existing counter entry.
+     * @param counterClass class of counters (global, per subscriber)
+     * @param counterType name of counter
+     */
+    public void incrementCounter(String counterClass, DhcpL2RelayCounters counterType) {
+        checkNotNull(counterClass, "counter class can't be null");
+        if (DhcpL2RelayCounters.SUPPORTED_COUNTERS.contains(counterType)) {
+            DhcpL2RelayCountersIdentifier counterIdentifier =
+                    new DhcpL2RelayCountersIdentifier(counterClass, counterType);
+            countersMap.compute(counterIdentifier, (key, counterValue) ->
+                (counterValue != null) ? new AtomicLong(counterValue.incrementAndGet()) : new AtomicLong(1)
+            );
+        } else {
+            log.error("Failed to increment counter class {} of type {}", counterClass, counterType);
+        }
+    }
+
+    /**
+     * Reset the counters map for the given counter class.
+     * @param counterClass class of counters (global, per subscriber)
+     */
+    public void resetCounters(String counterClass) {
+        checkNotNull(counterClass, "counter class can't be null");
+        for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator(); it.hasNext();) {
+            DhcpL2RelayCounters counterType = it.next();
+            DhcpL2RelayCountersIdentifier counterIdentifier =
+                    new DhcpL2RelayCountersIdentifier(counterClass, counterType);
+            countersMap.computeIfPresent(counterIdentifier, (key, counterValue) ->
+                    new AtomicLong(0)
+            );
+        }
+    }
+
+    /**
+     * Inserts the counter entry if it is not already in the set otherwise update the existing counter entry.
+     * @param counterClass class of counters (global, per subscriber).
+     * @param counterType name of counter
+     * @param value conter value
+     */
+    public void setCounter(String counterClass, DhcpL2RelayCounters counterType, Long value) {
+        checkNotNull(counterClass, "counter class can't be null");
+        if (DhcpL2RelayCounters.SUPPORTED_COUNTERS.contains(counterType)) {
+            DhcpL2RelayCountersIdentifier counterIdentifier =
+                    new DhcpL2RelayCountersIdentifier(counterClass, counterType);
+            countersMap.put(counterIdentifier, new AtomicLong(value));
+        } else {
+            log.error("Failed to increment counter class {} of type {}", counterClass, counterType);
+        }
+    }
+}
diff --git a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayConfigTest.java b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayConfigTest.java
new file mode 100644
index 0000000..94a90c8
--- /dev/null
+++ b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayConfigTest.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.dhcpl2relay.impl;
+
+import com.google.common.collect.ImmutableSet;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.*;
+
+import org.onlab.junit.TestUtils;
+import org.onlab.osgi.ComponentContextAdapter;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.common.event.impl.TestEventDispatcher;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.config.Config;
+import org.onosproject.net.config.NetworkConfigRegistryAdapter;
+import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
+
+import java.util.Set;
+
+/**
+ * Tests for DHCP relay app configuration.
+ */
+public class DhcpL2RelayConfigTest extends DhcpL2RelayTestBase {
+
+    static final boolean USE_OLT_ULPORT_FOR_PKT_INOUT = true;
+    static final boolean MODIFY_SRC_DST_MAC = true;
+
+    private DhcpL2Relay dhcpL2Relay;
+
+    ComponentConfigService mockConfigService =
+            EasyMock.createMock(ComponentConfigService.class);
+
+    /**
+     * Sets up the services required by the dhcpl2relay app.
+     */
+    @Before
+    public void setUp() {
+        dhcpL2Relay = new DhcpL2Relay();
+        dhcpL2Relay.cfgService = new TestNetworkConfigRegistry();
+        dhcpL2Relay.coreService = new MockCoreServiceAdapter();
+        dhcpL2Relay.flowObjectiveService = new FlowObjectiveServiceAdapter();
+        dhcpL2Relay.packetService = new MockPacketService();
+        dhcpL2Relay.componentConfigService = mockConfigService;
+        dhcpL2Relay.deviceService = new MockDeviceService();
+        dhcpL2Relay.sadisService = new MockSadisService();
+        dhcpL2Relay.hostService = new MockHostService();
+        dhcpL2Relay.mastershipService = new MockMastershipService();
+        TestUtils.setField(dhcpL2Relay, "eventDispatcher", new TestEventDispatcher());
+        dhcpL2Relay.activate(new ComponentContextAdapter());
+    }
+
+    /**
+     * Mocks the network config registry.
+     */
+    static class MockDhcpL2RelayConfig extends DhcpL2RelayConfig {
+        @Override
+        public Set<ConnectPoint> getDhcpServerConnectPoint() {
+            return ImmutableSet.of(SERVER_CONNECT_POINT);
+        }
+
+        @Override
+        public boolean getModifySrcDstMacAddresses() {
+            return true;
+        }
+
+        @Override
+        public boolean getUseOltUplinkForServerPktInOut() {
+            return true;
+        }
+    }
+
+    /**
+     * Tests the default configuration.
+     */
+    @Test
+    public void testConfig() {
+        assertThat(dhcpL2Relay.useOltUplink, is(USE_OLT_ULPORT_FOR_PKT_INOUT));
+        assertThat(dhcpL2Relay.modifyClientPktsSrcDstMac, is(MODIFY_SRC_DST_MAC));
+        assertNull(dhcpL2Relay.dhcpServerConnectPoint.get());
+    }
+
+    /**
+     * Tests if dhcpl2relay app has been configured.
+     */
+    @Test
+    public void testDhcpL2RelayConfigured() {
+        assertTrue(dhcpL2Relay.configured());
+    }
+
+    /**
+     * Mocks the network config registry.
+     */
+    @SuppressWarnings("unchecked")
+    static final class TestNetworkConfigRegistry
+            extends NetworkConfigRegistryAdapter {
+        @Override
+        public <S, C extends Config<S>> C getConfig(S subject, Class<C> configClass) {
+            DhcpL2RelayConfig dhcpConfig = new MockDhcpL2RelayConfig();
+            return (C) dhcpConfig;
+        }
+    }
+}
\ No newline at end of file
diff --git a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStoreTest.java b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStoreTest.java
new file mode 100644
index 0000000..32bb0d3
--- /dev/null
+++ b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStoreTest.java
@@ -0,0 +1,182 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.dhcpl2relay.impl;
+
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onlab.junit.TestUtils;
+import org.onlab.osgi.ComponentContextAdapter;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.common.event.impl.TestEventDispatcher;
+import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
+import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class DhcpL2RelayCountersStoreTest extends DhcpL2RelayTestBase {
+
+    private DhcpL2Relay dhcpL2Relay;
+    private SimpleDhcpL2RelayCountersStore store;
+
+    ComponentConfigService mockConfigService =
+            EasyMock.createMock(ComponentConfigService.class);
+
+    /**
+     * Sets up the services required by the dhcpl2relay app.
+     */
+    @Before
+    public void setUp() {
+        dhcpL2Relay = new DhcpL2Relay();
+        dhcpL2Relay.cfgService = new DhcpL2RelayConfigTest.TestNetworkConfigRegistry();
+        dhcpL2Relay.coreService = new MockCoreServiceAdapter();
+        dhcpL2Relay.flowObjectiveService = new FlowObjectiveServiceAdapter();
+        dhcpL2Relay.packetService = new MockPacketService();
+        dhcpL2Relay.componentConfigService = mockConfigService;
+        dhcpL2Relay.deviceService = new MockDeviceService();
+        dhcpL2Relay.sadisService = new MockSadisService();
+        dhcpL2Relay.mastershipService = new MockMastershipService();
+        TestUtils.setField(dhcpL2Relay, "eventDispatcher", new TestEventDispatcher());
+        dhcpL2Relay.activate(new ComponentContextAdapter());
+        store = new SimpleDhcpL2RelayCountersStore();
+        TestUtils.setField(store, "eventDispatcher", new TestEventDispatcher());
+        store.activate();
+        dhcpL2Relay.dhcpL2RelayCounters = this.store;
+    }
+
+    /**
+     * Tears down the dhcpL2Relay application.
+     */
+    @After
+    public void tearDown() {
+        dhcpL2Relay.deactivate();
+    }
+
+    /**
+     * Tests the initialization of the counter.
+     */
+    @Test
+    public void testInitCounter() {
+        // Init the supported global counter
+        dhcpL2Relay.dhcpL2RelayCounters.initCounters(DhcpL2RelayEvent.GLOBAL_COUNTER);
+        // Init the supported counter for a specific subscriber
+        dhcpL2Relay.dhcpL2RelayCounters.initCounters(CLIENT_ID_1);
+
+        Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
+        for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+             it.hasNext();) {
+            DhcpL2RelayCounters counterType = it.next();
+            long globalCounterValue = countersMap.get(new DhcpL2RelayCountersIdentifier(
+                    DhcpL2RelayEvent.GLOBAL_COUNTER, counterType)).longValue();
+            long perSubscriberValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
+                    counterType)).longValue();
+            assertEquals((long) 0, globalCounterValue);
+            assertEquals((long) 0, perSubscriberValue);
+        }
+    }
+
+    /**
+     * Tests the insertion of the counter entry if it is not already in the set
+     * otherwise increment the existing counter entry.
+     */
+    @Test
+    public void testIncrementCounter() {
+        // Init the supported global counter
+        dhcpL2Relay.dhcpL2RelayCounters.initCounters(DhcpL2RelayEvent.GLOBAL_COUNTER);
+
+        for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+             it.hasNext();) {
+            DhcpL2RelayCounters counterType = it.next();
+            // Increment of existing supported global counter
+            dhcpL2Relay.dhcpL2RelayCounters.incrementCounter(DhcpL2RelayEvent.GLOBAL_COUNTER, counterType);
+            // Add of a Subscriber entry that is not already in the set
+            dhcpL2Relay.dhcpL2RelayCounters.incrementCounter(CLIENT_ID_1, counterType);
+        }
+
+        Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
+        for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+             it.hasNext();) {
+            DhcpL2RelayCounters counterType = it.next();
+            long globalCounterValue = countersMap.get(new DhcpL2RelayCountersIdentifier(
+                    DhcpL2RelayEvent.GLOBAL_COUNTER, counterType)).longValue();
+            long perSubscriberValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
+                    counterType)).longValue();
+            assertEquals((long) 1, globalCounterValue);
+            assertEquals((long) 1, perSubscriberValue);
+        }
+    }
+
+    /**
+     * Tests the increment and reset functions of the counters map for the given counter class.
+     */
+    @Test
+    public void testIncrementAndResetCounter() {
+        DhcpL2RelayCounters counterType;
+        long subscriberValue;
+        Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap;
+
+        // First start incrementing the counter of a specific subscriber
+        for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+             it.hasNext();) {
+            counterType = it.next();
+            // Insert of a Subscriber entry that is not already in the set
+            dhcpL2Relay.dhcpL2RelayCounters.incrementCounter(CLIENT_ID_1, counterType);
+        }
+
+        // Make sure that the counter is incremented
+        countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
+        for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+             it.hasNext();) {
+            counterType = it.next();
+            subscriberValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
+                    counterType)).longValue();
+            assertEquals((long) 1, subscriberValue);
+        }
+
+        // Reset the counter
+        dhcpL2Relay.dhcpL2RelayCounters.resetCounters(CLIENT_ID_1);
+        countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
+        for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+             it.hasNext();) {
+            counterType = it.next();
+            subscriberValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
+                    counterType)).longValue();
+            assertEquals((long) 0, subscriberValue);
+        }
+    }
+
+    /**
+     * Tests the insert of the counter value for a subscriber entry if it is not already in the set
+     * otherwise update the existing counter entry.
+     */
+    @Test
+    public void testInsertOrUpdateCounter() {
+        dhcpL2Relay.dhcpL2RelayCounters.setCounter(CLIENT_ID_1, DhcpL2RelayCounters.valueOf("DHCPDISCOVER"), (long) 50);
+
+        Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
+        long subscriberValue = countersMap.get(new DhcpL2RelayCountersIdentifier(
+                CLIENT_ID_1, DhcpL2RelayCounters.valueOf("DHCPDISCOVER"))).longValue();
+
+        assertEquals((long) 50, subscriberValue);
+    }
+
+}
\ No newline at end of file
diff --git a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTest.java b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTest.java
index 17fb417..9841953 100755
--- a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTest.java
+++ b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTest.java
@@ -19,80 +19,33 @@
 
 import java.nio.ByteBuffer;
 import java.util.List;
-import java.util.Set;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.TimeUnit;
 
 import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.onlab.junit.TestUtils;
-import org.onlab.osgi.ComponentContextAdapter;
-import org.onlab.packet.ChassisId;
 import org.onlab.packet.DHCP;
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.IPv4;
-import org.onlab.packet.Ip4Address;
-import org.onlab.packet.IpAddress;
-import org.onlab.packet.MacAddress;
 import org.onlab.packet.UDP;
-import org.onlab.packet.VlanId;
 import org.onlab.packet.dhcp.DhcpOption;
-import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.common.event.impl.TestEventDispatcher;
-import org.onosproject.mastership.MastershipServiceAdapter;
-import org.onosproject.net.AnnotationKeys;
-import org.onosproject.net.Annotations;
-import org.onosproject.net.ConnectPoint;
-import org.onosproject.net.DefaultAnnotations;
-import org.onosproject.net.DefaultDevice;
-import org.onosproject.net.DefaultHost;
-import org.onosproject.net.Device;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.Element;
-import org.onosproject.net.Host;
-import org.onosproject.net.HostId;
-import org.onosproject.net.HostLocation;
-import org.onosproject.net.Port;
-import org.onosproject.net.PortNumber;
-import org.onosproject.net.config.Config;
-import org.onosproject.net.config.NetworkConfigRegistryAdapter;
-import org.onosproject.net.device.DeviceServiceAdapter;
-import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
-import org.onosproject.net.host.HostServiceAdapter;
-import org.onosproject.net.provider.ProviderId;
-import org.opencord.dhcpl2relay.impl.packet.DhcpOption82;
-import org.opencord.sadis.SubscriberAndDeviceInformation;
-import org.opencord.sadis.BandwidthProfileInformation;
-import org.opencord.sadis.BaseInformationService;
-import org.opencord.sadis.SadisService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.ImmutableSet;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
+import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
+import org.opencord.dhcpl2relay.impl.packet.DhcpOption82;
+
 import com.google.common.collect.Lists;
 
 public class DhcpL2RelayTest extends DhcpL2RelayTestBase {
 
-    private final Logger log = LoggerFactory.getLogger(getClass());
-
     private DhcpL2Relay dhcpL2Relay;
-
-    private static final MacAddress CLIENT_MAC = MacAddress.valueOf("00:00:00:00:00:01");
-    private static final VlanId CLIENT_C_TAG = VlanId.vlanId((short) 999);
-    private static final VlanId CLIENT_S_TAG = VlanId.vlanId((short) 111);
-    private static final String CLIENT_NAS_PORT_ID = "PON 1/1";
-    private static final String CLIENT_CIRCUIT_ID = "CIR-PON 1/1";
-
-    private static final String OLT_DEV_ID = "of:00000000000000aa";
-    private static final MacAddress OLT_MAC_ADDRESS = MacAddress.valueOf("01:02:03:04:05:06");
-    private static final DeviceId DEVICE_ID_1 = DeviceId.deviceId(OLT_DEV_ID);
-
-    private static final ConnectPoint SERVER_CONNECT_POINT =
-            ConnectPoint.deviceConnectPoint("of:0000000000000001/5");
-
-    private static final String SCHEME_NAME = "dhcpl2relay";
-    private static final DefaultAnnotations DEVICE_ANNOTATIONS = DefaultAnnotations.builder()
-            .set(AnnotationKeys.PROTOCOL, SCHEME_NAME.toUpperCase()).build();
+    private SimpleDhcpL2RelayCountersStore store;
 
     ComponentConfigService mockConfigService =
             EasyMock.createMock(ComponentConfigService.class);
@@ -103,7 +56,7 @@
     @Before
     public void setUp() {
         dhcpL2Relay = new DhcpL2Relay();
-        dhcpL2Relay.cfgService = new TestNetworkConfigRegistry();
+        dhcpL2Relay.cfgService = new DhcpL2RelayConfigTest.TestNetworkConfigRegistry();
         dhcpL2Relay.coreService = new MockCoreServiceAdapter();
         dhcpL2Relay.flowObjectiveService = new FlowObjectiveServiceAdapter();
         dhcpL2Relay.packetService = new MockPacketService();
@@ -113,7 +66,12 @@
         dhcpL2Relay.hostService = new MockHostService();
         dhcpL2Relay.mastershipService = new MockMastershipService();
         TestUtils.setField(dhcpL2Relay, "eventDispatcher", new TestEventDispatcher());
-        dhcpL2Relay.activate(new ComponentContextAdapter());
+        dhcpL2Relay.refreshService = new MockExecutor(dhcpL2Relay.refreshService);
+        dhcpL2Relay.activate(new DhcpL2RelayTestBase.MockComponentContext());
+        store = new SimpleDhcpL2RelayCountersStore();
+        TestUtils.setField(store, "eventDispatcher", new TestEventDispatcher());
+        store.activate();
+        dhcpL2Relay.dhcpL2RelayCounters = this.store;
     }
 
     /**
@@ -132,7 +90,6 @@
     @Test
     public void testDhcpDiscover()  throws Exception {
         //  (1) Sending DHCP discover packet
-        System.out.println("Sending pakcet");
         Ethernet discoverPacket = constructDhcpDiscoverPacket(CLIENT_MAC);
 
         sendPacket(discoverPacket, ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1));
@@ -149,7 +106,6 @@
     @Test
     public void testDhcpRequest()  throws Exception {
         //  (1) Sending DHCP discover packet
-        System.out.println("Sending pakcet");
         Ethernet requestPacket = constructDhcpRequestPacket(CLIENT_MAC);
 
         sendPacket(requestPacket, ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1));
@@ -166,9 +122,8 @@
     @Test
     public void testDhcpOffer() {
         //  (1) Sending DHCP discover packet
-        System.out.println("Sending pakcet");
-        Ethernet offerPacket = constructDhcpOfferPacket(MacAddress.valueOf("bb:bb:bb:bb:bb:bb"),
-                CLIENT_MAC, "1.1.1.1", "2.2.2.2");
+        Ethernet offerPacket = constructDhcpOfferPacket(SERVER_MAC,
+                CLIENT_MAC, DESTINATION_ADDRESS_IP, DHCP_CLIENT_IP_ADDRESS);
 
         sendPacket(offerPacket, ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1));
 
@@ -184,8 +139,8 @@
     @Test
     public void testDhcpAck() {
 
-        Ethernet ackPacket = constructDhcpAckPacket(MacAddress.valueOf("bb:bb:bb:bb:bb:bb"),
-                CLIENT_MAC, "1.1.1.1", "2.2.2.2");
+        Ethernet ackPacket = constructDhcpAckPacket(SERVER_MAC,
+                CLIENT_MAC, DESTINATION_ADDRESS_IP, DHCP_CLIENT_IP_ADDRESS);
 
         sendPacket(ackPacket, ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1));
 
@@ -193,6 +148,95 @@
         compareServerPackets(ackPacket, ackRelayed);
     }
 
+    /**
+     * Tests the DHCP global counters.
+     */
+    @Test
+    public void testDhcpGlobalCounters() {
+        long discoveryValue = 0;
+        long offerValue = 0;
+        long requestValue = 0;
+        long ackValue = 0;
+
+        Ethernet discoverPacket = constructDhcpDiscoverPacket(CLIENT_MAC);
+        Ethernet offerPacket = constructDhcpOfferPacket(SERVER_MAC,
+                CLIENT_MAC, DESTINATION_ADDRESS_IP, DHCP_CLIENT_IP_ADDRESS);
+        Ethernet requestPacket = constructDhcpRequestPacket(CLIENT_MAC);
+        Ethernet ackPacket = constructDhcpAckPacket(SERVER_MAC,
+                CLIENT_MAC, DESTINATION_ADDRESS_IP, DHCP_CLIENT_IP_ADDRESS);
+
+        sendPacket(discoverPacket, ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1));
+        sendPacket(offerPacket, ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1));
+        sendPacket(requestPacket, ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1));
+        sendPacket(ackPacket, ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1));
+
+        Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
+        discoveryValue = countersMap.get(new DhcpL2RelayCountersIdentifier(DhcpL2RelayEvent.GLOBAL_COUNTER,
+                DhcpL2RelayCounters.valueOf("DHCPDISCOVER"))).longValue();
+        offerValue = countersMap.get(new DhcpL2RelayCountersIdentifier(DhcpL2RelayEvent.GLOBAL_COUNTER,
+                DhcpL2RelayCounters.valueOf("DHCPOFFER"))).longValue();
+        requestValue = countersMap.get(new DhcpL2RelayCountersIdentifier(DhcpL2RelayEvent.GLOBAL_COUNTER,
+                DhcpL2RelayCounters.valueOf("DHCPREQUEST"))).longValue();
+        ackValue = countersMap.get(new DhcpL2RelayCountersIdentifier(DhcpL2RelayEvent.GLOBAL_COUNTER,
+                DhcpL2RelayCounters.valueOf("DHCPACK"))).longValue();
+
+        assertEquals((long) 1, discoveryValue);
+        assertEquals((long) 1, offerValue);
+        assertEquals((long) 1, requestValue);
+        assertEquals((long) 1, ackValue);
+    }
+
+    /**
+     * Tests the DHCP per subscriber counters.
+     *
+     */
+    @Test
+    public void testDhcpPerSubscriberCounters() {
+        long discoveryValue;
+        long offerValue;
+        long requestValue;
+        long ackValue;
+
+        Ethernet discoverPacket = constructDhcpDiscoverPacket(CLIENT_MAC);
+        Ethernet offerPacket = constructDhcpOfferPacket(SERVER_MAC,
+                CLIENT_MAC, DESTINATION_ADDRESS_IP, DHCP_CLIENT_IP_ADDRESS);
+        Ethernet requestPacket = constructDhcpRequestPacket(CLIENT_MAC);
+        Ethernet ackPacket = constructDhcpAckPacket(SERVER_MAC,
+                CLIENT_MAC, DESTINATION_ADDRESS_IP, DHCP_CLIENT_IP_ADDRESS);
+
+        sendPacket(discoverPacket, ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1));
+        sendPacket(offerPacket, ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1));
+        sendPacket(requestPacket, ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1));
+        sendPacket(ackPacket, ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1));
+
+        Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
+        discoveryValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
+                DhcpL2RelayCounters.valueOf("DHCPDISCOVER"))).longValue();
+        offerValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
+                DhcpL2RelayCounters.valueOf("DHCPOFFER"))).longValue();
+        requestValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
+                DhcpL2RelayCounters.valueOf("DHCPREQUEST"))).longValue();
+        ackValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
+                DhcpL2RelayCounters.valueOf("DHCPACK"))).longValue();
+
+        assertEquals((long) 1, discoveryValue);
+        assertEquals((long) 1, offerValue);
+        assertEquals((long) 1, requestValue);
+        assertEquals((long) 1, ackValue);
+    }
+
+    /**
+     * Tests the schedule function to publish the counters to kafka.
+     *
+     */
+    @Test
+    public void testSchedulePublishCountersToKafka() {
+        MockExecutor executor = new MockExecutor(dhcpL2Relay.refreshService);
+        dhcpL2Relay.refreshTask = executor.scheduleWithFixedDelay(
+                dhcpL2Relay.publishCountersToKafka, 0, 10, TimeUnit.SECONDS);
+        executor.assertLastMethodCalled("scheduleWithFixedDelay", 0, 10, TimeUnit.SECONDS);
+    }
+
     public void compareClientPackets(Ethernet sent, Ethernet relayed) {
         sent.setSourceMACAddress(OLT_MAC_ADDRESS);
         sent.setQinQVID(CLIENT_S_TAG.toShort());
@@ -232,202 +276,4 @@
         assertEquals(expectedPacket, relayed);
 
     }
-
-    private class MockDevice extends DefaultDevice {
-
-        public MockDevice(ProviderId providerId, DeviceId id, Type type,
-                          String manufacturer, String hwVersion, String swVersion,
-                          String serialNumber, ChassisId chassisId, Annotations... annotations) {
-            super(providerId, id, type, manufacturer, hwVersion, swVersion, serialNumber,
-                    chassisId, annotations);
-        }
-    }
-
-    private class MockDeviceService extends DeviceServiceAdapter {
-
-        private ProviderId providerId = new ProviderId("of", "foo");
-        private final Device device1 = new MockDevice(providerId, DEVICE_ID_1, Device.Type.SWITCH,
-                "foo.inc", "0", "0", OLT_DEV_ID, new ChassisId(),
-                DEVICE_ANNOTATIONS);
-
-        @Override
-        public Device getDevice(DeviceId devId) {
-            return device1;
-
-        }
-
-        @Override
-        public Port getPort(ConnectPoint cp) {
-            return new MockPort();
-        }
-
-        @Override
-        public boolean isAvailable(DeviceId d) {
-            return true;
-        }
-    }
-
-    private class  MockPort implements Port {
-
-        @Override
-        public boolean isEnabled() {
-            return true;
-        }
-        @Override
-        public long portSpeed() {
-            return 1000;
-        }
-        @Override
-        public Element element() {
-            return null;
-        }
-        @Override
-        public PortNumber number() {
-            return null;
-        }
-        @Override
-        public Annotations annotations() {
-            return new MockAnnotations();
-        }
-        @Override
-        public Type type() {
-            return Port.Type.FIBER;
-        }
-
-        private class MockAnnotations implements Annotations {
-
-            @Override
-            public String value(String val) {
-                return "PON 1/1";
-            }
-            @Override
-            public Set<String> keys() {
-                return null;
-            }
-        }
-    }
-
-    private class MockSadisService implements SadisService {
-        @Override
-        public BaseInformationService<SubscriberAndDeviceInformation> getSubscriberInfoService() {
-            return new MockSubService();
-        }
-
-        @Override
-        public BaseInformationService<BandwidthProfileInformation> getBandwidthProfileService() {
-            return null;
-        }
-    }
-
-    private class MockSubService implements BaseInformationService<SubscriberAndDeviceInformation> {
-        MockSubscriberAndDeviceInformation device =
-                new MockSubscriberAndDeviceInformation(OLT_DEV_ID, VlanId.NONE, VlanId.NONE, null, null,
-                        OLT_MAC_ADDRESS, Ip4Address.valueOf("10.10.10.10"));
-        MockSubscriberAndDeviceInformation sub =
-                new MockSubscriberAndDeviceInformation(CLIENT_NAS_PORT_ID, CLIENT_C_TAG,
-                        CLIENT_S_TAG, CLIENT_NAS_PORT_ID, CLIENT_CIRCUIT_ID, null, null);
-        @Override
-        public SubscriberAndDeviceInformation get(String id) {
-            if (id.equals(OLT_DEV_ID)) {
-                return device;
-            } else {
-                return  sub;
-            }
-        }
-
-        @Override
-        public void invalidateAll() {}
-        @Override
-        public void invalidateId(String id) {}
-        @Override
-        public SubscriberAndDeviceInformation getfromCache(String id) {
-            return null;
-        }
-    }
-
-    private class MockMastershipService extends MastershipServiceAdapter {
-        @Override
-        public boolean isLocalMaster(DeviceId d) {
-            return true;
-        }
-    }
-
-    private class MockSubscriberAndDeviceInformation extends SubscriberAndDeviceInformation {
-
-        MockSubscriberAndDeviceInformation(String id, VlanId ctag,
-                                           VlanId stag, String nasPortId,
-                                           String circuitId, MacAddress hardId,
-                                           Ip4Address ipAddress) {
-            this.setCTag(ctag);
-            this.setHardwareIdentifier(hardId);
-            this.setId(id);
-            this.setIPAddress(ipAddress);
-            this.setSTag(stag);
-            this.setNasPortId(nasPortId);
-            this.setCircuitId(circuitId);
-        }
-    }
-
-    private class MockHostService extends HostServiceAdapter {
-
-        @Override
-        public Set<Host> getHostsByMac(MacAddress mac) {
-
-            HostLocation loc = new HostLocation(DEVICE_ID_1, PortNumber.portNumber(22), 0);
-
-            IpAddress ip = IpAddress.valueOf("10.100.200.10");
-
-            Host h = new DefaultHost(ProviderId.NONE, HostId.hostId(mac, VlanId.NONE),
-                    mac, VlanId.NONE, loc, ImmutableSet.of(ip));
-
-            return ImmutableSet.of(h);
-        }
-    }
-
-
-    /**
-     * Mocks the AAAConfig class to force usage of an unroutable address for the
-     * RADIUS server.
-     */
-    static class MockDhcpL2RealyConfig extends DhcpL2RelayConfig {
-        @Override
-        public Set<ConnectPoint> getDhcpServerConnectPoint() {
-            return ImmutableSet.of(SERVER_CONNECT_POINT);
-        }
-
-        @Override
-        public boolean getModifySrcDstMacAddresses() {
-            return true;
-        }
-    }
-
-    /**
-     * Mocks the network config registry.
-     */
-    @SuppressWarnings("unchecked")
-    private static final class TestNetworkConfigRegistry
-            extends NetworkConfigRegistryAdapter {
-        @Override
-        public <S, C extends Config<S>> C getConfig(S subject, Class<C> configClass) {
-            DhcpL2RelayConfig dhcpConfig = new MockDhcpL2RealyConfig();
-            return (C) dhcpConfig;
-        }
-    }
-}
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+}
\ No newline at end of file
diff --git a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTestBase.java b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTestBase.java
index a389ba4..bae3575 100755
--- a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTestBase.java
+++ b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTestBase.java
@@ -16,18 +16,63 @@
 
 package org.opencord.dhcpl2relay.impl;
 
+import static com.google.common.base.Preconditions.checkState;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableSet;
 import org.onlab.packet.BasePacket;
+import org.onlab.packet.ChassisId;
 import org.onlab.packet.DHCP;
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.IPv4;
+import org.onlab.packet.IpAddress;
 import org.onlab.packet.Ip4Address;
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.UDP;
+import org.onlab.packet.VlanId;
 import org.onlab.packet.dhcp.DhcpOption;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreServiceAdapter;
 import org.onosproject.core.DefaultApplicationId;
+import org.onosproject.event.DefaultEventSinkRegistry;
+import org.onosproject.event.Event;
+import org.onosproject.event.EventDeliveryService;
+import org.onosproject.event.EventSink;
+import org.onosproject.mastership.MastershipServiceAdapter;
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.Annotations;
 import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.DefaultDevice;
+import org.onosproject.net.DefaultHost;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Element;
+import org.onosproject.net.Host;
+import org.onosproject.net.HostId;
+import org.onosproject.net.HostLocation;
+import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DeviceServiceAdapter;
+import org.onosproject.net.host.HostServiceAdapter;
 import org.onosproject.net.packet.DefaultInboundPacket;
 import org.onosproject.net.packet.DefaultPacketContext;
 import org.onosproject.net.packet.InboundPacket;
@@ -35,17 +80,19 @@
 import org.onosproject.net.packet.PacketContext;
 import org.onosproject.net.packet.PacketProcessor;
 import org.onosproject.net.packet.PacketServiceAdapter;
+import org.onosproject.net.provider.ProviderId;
+import org.opencord.sadis.BandwidthProfileInformation;
+import org.opencord.sadis.BaseInformationService;
+import org.opencord.sadis.SadisService;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.ComponentInstance;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-import static org.junit.Assert.fail;
-
 
 /**
  * Common methods for AAA app testing.
@@ -53,9 +100,30 @@
 public class DhcpL2RelayTestBase {
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    private static final int TRANSACTION_ID = 1000;
+    static final VlanId CLIENT_C_TAG = VlanId.vlanId((short) 999);
+    static final VlanId CLIENT_S_TAG = VlanId.vlanId((short) 111);
+    static final String CLIENT_ID_1 = "SUBSCRIBER_ID_1";
+    static final String CLIENT_NAS_PORT_ID = "PON 1/1";
+    static final String CLIENT_CIRCUIT_ID = "CIR-PON 1/1";
 
-    private static final String EXPECTED_IP = "10.2.0.2";
+    static final MacAddress CLIENT_MAC = MacAddress.valueOf("00:00:00:00:00:01");
+    static final MacAddress SERVER_MAC = MacAddress.valueOf("bb:bb:bb:bb:bb:bb");
+    static final String DESTINATION_ADDRESS_IP = "1.1.1.1";
+    static final String DHCP_CLIENT_IP_ADDRESS = "2.2.2.2";
+    static final int UPLINK_PORT = 5;
+
+    static final String EXPECTED_IP = "10.2.0.2";
+    static final String OLT_DEV_ID = "of:00000000000000aa";
+    static final DeviceId DEVICE_ID_1 = DeviceId.deviceId(OLT_DEV_ID);
+    static final int TRANSACTION_ID = 1000;
+    static final String SCHEME_NAME = "dhcpl2relay";
+    static final MacAddress OLT_MAC_ADDRESS = MacAddress.valueOf("01:02:03:04:05:06");
+
+    static final ConnectPoint SERVER_CONNECT_POINT =
+            ConnectPoint.deviceConnectPoint("of:00000000000000aa/5");
+
+    static final DefaultAnnotations DEVICE_ANNOTATIONS = DefaultAnnotations.builder()
+            .set(AnnotationKeys.PROTOCOL, SCHEME_NAME.toUpperCase()).build();
 
     List<BasePacket> savedPackets = new LinkedList<>();
     PacketProcessor packetProcessor;
@@ -85,6 +153,108 @@
         }
     }
 
+    class MockDeviceService extends DeviceServiceAdapter {
+
+        private ProviderId providerId = new ProviderId("of", "foo");
+        private final Device device1 = new DhcpL2RelayTestBase.MockDevice(providerId, DEVICE_ID_1, Device.Type.SWITCH,
+                "foo.inc", "0", "0", OLT_DEV_ID, new ChassisId(),
+                DEVICE_ANNOTATIONS);
+
+        @Override
+        public Device getDevice(DeviceId devId) {
+            return device1;
+
+        }
+
+        @Override
+        public Port getPort(ConnectPoint cp) {
+            return new DhcpL2RelayTestBase.MockPort();
+        }
+
+        @Override
+        public Port getPort(DeviceId deviceId, PortNumber portNumber) {
+            return new DhcpL2RelayTestBase.MockPort();
+        }
+
+        @Override
+        public boolean isAvailable(DeviceId d) {
+            return true;
+        }
+    }
+
+    class MockDevice extends DefaultDevice {
+
+        public MockDevice(ProviderId providerId, DeviceId id, Type type,
+                          String manufacturer, String hwVersion, String swVersion,
+                          String serialNumber, ChassisId chassisId, Annotations... annotations) {
+            super(providerId, id, type, manufacturer, hwVersion, swVersion, serialNumber,
+                    chassisId, annotations);
+        }
+    }
+
+    class MockHostService extends HostServiceAdapter {
+
+        @Override
+        public Set<Host> getHostsByMac(MacAddress mac) {
+
+            HostLocation loc = new HostLocation(DEVICE_ID_1, PortNumber.portNumber(22), 0);
+
+            IpAddress ip = IpAddress.valueOf("10.100.200.10");
+
+            Host h = new DefaultHost(ProviderId.NONE, HostId.hostId(mac, VlanId.NONE),
+                    mac, VlanId.NONE, loc, ImmutableSet.of(ip));
+
+            return ImmutableSet.of(h);
+        }
+    }
+
+    class MockMastershipService extends MastershipServiceAdapter {
+        @Override
+        public boolean isLocalMaster(DeviceId d) {
+            return true;
+        }
+    }
+
+    class  MockPort implements Port {
+
+        @Override
+        public boolean isEnabled() {
+            return true;
+        }
+        @Override
+        public long portSpeed() {
+            return 1000;
+        }
+        @Override
+        public Element element() {
+            return null;
+        }
+        @Override
+        public PortNumber number() {
+            return null;
+        }
+        @Override
+        public Annotations annotations() {
+            return new MockAnnotations();
+        }
+        @Override
+        public Type type() {
+            return Port.Type.FIBER;
+        }
+
+        private class MockAnnotations implements Annotations {
+
+            @Override
+            public String value(String val) {
+                return "PON 1/1";
+            }
+            @Override
+            public Set<String> keys() {
+                return null;
+            }
+        }
+    }
+
     /**
      * Keeps a reference to the PacketProcessor and saves the OutboundPackets.
      */
@@ -107,6 +277,124 @@
         }
     }
 
+    class MockSadisService implements SadisService {
+        @Override
+        public BaseInformationService<SubscriberAndDeviceInformation> getSubscriberInfoService() {
+            return new DhcpL2RelayTestBase.MockSubService();
+        }
+
+        @Override
+        public BaseInformationService<BandwidthProfileInformation> getBandwidthProfileService() {
+            return null;
+        }
+    }
+
+    class MockSubService implements BaseInformationService<SubscriberAndDeviceInformation> {
+        DhcpL2RelayTestBase.MockSubscriberAndDeviceInformation device =
+                new DhcpL2RelayTestBase.MockSubscriberAndDeviceInformation(OLT_DEV_ID, VlanId.NONE, VlanId.NONE, null,
+                        null, OLT_MAC_ADDRESS, Ip4Address.valueOf("10.10.10.10"), UPLINK_PORT);
+        DhcpL2RelayTestBase.MockSubscriberAndDeviceInformation sub =
+                new DhcpL2RelayTestBase.MockSubscriberAndDeviceInformation(CLIENT_ID_1, CLIENT_C_TAG,
+                        CLIENT_S_TAG, CLIENT_NAS_PORT_ID, CLIENT_CIRCUIT_ID, null, null, -1);
+        @Override
+        public SubscriberAndDeviceInformation get(String id) {
+            if (id.equals(OLT_DEV_ID)) {
+                return device;
+            } else {
+                return  sub;
+            }
+        }
+
+        @Override
+        public void invalidateAll() {}
+        @Override
+        public void invalidateId(String id) {}
+        @Override
+        public SubscriberAndDeviceInformation getfromCache(String id) {
+            return null;
+        }
+    }
+
+    class MockSubscriberAndDeviceInformation extends SubscriberAndDeviceInformation {
+
+        MockSubscriberAndDeviceInformation(String id, VlanId ctag,
+                                           VlanId stag, String nasPortId,
+                                           String circuitId, MacAddress hardId,
+                                           Ip4Address ipAddress, int uplinkPort) {
+            this.setCTag(ctag);
+            this.setHardwareIdentifier(hardId);
+            this.setId(id);
+            this.setIPAddress(ipAddress);
+            this.setSTag(stag);
+            this.setNasPortId(nasPortId);
+            this.setCircuitId(circuitId);
+            this.setUplinkPort(uplinkPort);
+        }
+    }
+
+    class MockComponentContext implements ComponentContext {
+
+        @Override
+        public Dictionary<String, Object> getProperties() {
+            Dictionary<String, Object> cfgDict = new Hashtable<String, Object>();
+            cfgDict.put("publishCountersRate", 10);
+            return cfgDict;
+        }
+
+        @Override
+        public Object locateService(String name) {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public Object locateService(String name, ServiceReference reference) {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public Object[] locateServices(String name) {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public BundleContext getBundleContext() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public Bundle getUsingBundle() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public ComponentInstance getComponentInstance() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public void enableComponent(String name) {
+            // TODO Auto-generated method stub
+        }
+
+        @Override
+        public void disableComponent(String name) {
+            // TODO Auto-generated method stub
+        }
+
+        @Override
+        public ServiceReference getServiceReference() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+    }
+
+
     /**
      * Mocks the DefaultPacketContext.
      */
@@ -123,6 +411,157 @@
         }
     }
 
+    public static class TestEventDispatcher extends DefaultEventSinkRegistry
+            implements EventDeliveryService {
+        @Override
+        @SuppressWarnings("unchecked")
+        public synchronized void post(Event event) {
+            EventSink sink = getSink(event.getClass());
+            checkState(sink != null, "No sink for event %s", event);
+            sink.process(event);
+        }
+
+        @Override
+        public void setDispatchTimeLimit(long millis) {
+        }
+
+        @Override
+        public long getDispatchTimeLimit() {
+            return 0;
+        }
+    }
+
+    /**
+     * Creates a mock object for a scheduled executor service.
+     *
+     */
+    public static final class MockExecutor implements ScheduledExecutorService {
+        private ScheduledExecutorService executor;
+
+        MockExecutor(ScheduledExecutorService executor) {
+            this.executor = executor;
+        }
+
+        String lastMethodCalled = "";
+        long lastInitialDelay;
+        long lastDelay;
+        TimeUnit lastUnit;
+
+        public void assertLastMethodCalled(String method, long initialDelay, long delay, TimeUnit unit) {
+            assertEquals(method, lastMethodCalled);
+            assertEquals(initialDelay, lastInitialDelay);
+            assertEquals(delay, lastDelay);
+            assertEquals(unit, lastUnit);
+        }
+
+        @Override
+        public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+            lastMethodCalled = "scheduleRunnable";
+            lastDelay = delay;
+            lastUnit = unit;
+            return null;
+        }
+
+        @Override
+        public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+            lastMethodCalled = "scheduleCallable";
+            lastDelay = delay;
+            lastUnit = unit;
+            return null;
+        }
+
+        @Override
+        public ScheduledFuture<?> scheduleAtFixedRate(
+                Runnable command, long initialDelay, long period, TimeUnit unit) {
+            lastMethodCalled = "scheduleAtFixedRate";
+            lastInitialDelay = initialDelay;
+            lastDelay = period;
+            lastUnit = unit;
+            return null;
+        }
+
+        @Override
+        public ScheduledFuture<?> scheduleWithFixedDelay(
+                Runnable command, long initialDelay, long delay, TimeUnit unit) {
+            lastMethodCalled = "scheduleWithFixedDelay";
+            lastInitialDelay = initialDelay;
+            lastDelay = delay;
+            lastUnit = unit;
+            command.run();
+            return null;
+        }
+
+        @Override
+        public boolean awaitTermination(long timeout, TimeUnit unit) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+                throws InterruptedException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(
+                Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+                throws InterruptedException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+                throws ExecutionException, InterruptedException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+                throws ExecutionException, InterruptedException, TimeoutException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean isShutdown() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean isTerminated() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void shutdown() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public List<Runnable> shutdownNow() {
+            return null;
+        }
+
+        @Override
+        public <T> Future<T> submit(Callable<T> task) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Future<?> submit(Runnable task) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public <T> Future<T> submit(Runnable task, T result) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void execute(Runnable command) {
+            throw new UnsupportedOperationException();
+        }
+    }
+
     /**
      * Sends an Ethernet packet to the process method of the Packet Processor.
      *
@@ -307,4 +746,4 @@
 
         return optionList;
     }
-}
+}
\ No newline at end of file