[SEBA-144] Operational Status DHCP L2 Relay.

Change-Id: I8aaeda97feb842f1dd0268d452b3572507923980
Signed-off-by: Marcos Aurelio Carrero <mcarrero@furukawalatam.com>
diff --git a/src/main/java/org/opencord/kafka/integrations/DhcpL2RelayKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/DhcpL2RelayKafkaIntegration.java
index 58b61b8..494635f 100644
--- a/src/main/java/org/opencord/kafka/integrations/DhcpL2RelayKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/DhcpL2RelayKafkaIntegration.java
@@ -19,6 +19,7 @@
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.onosproject.cluster.ClusterService;
 import org.onosproject.net.AnnotationKeys;
 import org.onosproject.net.device.DeviceService;
 import org.opencord.dhcpl2relay.DhcpAllocationInfo;
@@ -46,6 +47,9 @@
     protected EventBusService eventBusService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected DeviceService deviceService;
 
     @Reference(cardinality = ReferenceCardinality.OPTIONAL,
@@ -57,7 +61,9 @@
 
     private final DhcpL2RelayListener listener = new InternalDhcpL2RelayListener();
 
+    // topics
     private static final String TOPIC = "dhcp.events";
+    private static final String DHCP_STATS_TOPIC = "onos.dhcp.stats.kpis";
 
     private static final String TIMESTAMP = "timestamp";
     private static final String DEVICE_ID = "deviceId";
@@ -68,6 +74,18 @@
     private static final String MAC_ADDRESS = "macAddress";
     private static final String IP_ADDRESS = "ipAddress";
 
+    // dhcp stats event params
+    static final String CONNECT_POINT = "connectPoint";
+    static final String INSTANCE_ID = "instance_id";
+    static final String METRICS = "metrics";
+    static final String SUBSCRIBER_ID = "subscriberId";
+    static final String SUBSCRIBER_INFO = "subscriberInfo";
+    static final String TS = "ts";
+    static final String TITLE = "title";
+
+    static final String GLOBAL_STATS_TITLE = "DHCP_L2_Relay_stats";
+    static final String PER_SUBSCRIBER_STATS_TITLE = "DHCP_L2_Relay_stats_Per_Subscriber";
+
     protected void bindDhcpL2RelayService(DhcpL2RelayService incomingService) {
         bindAndAddListener(incomingService, dhcpL2RelayServiceRef, listener);
     }
@@ -88,7 +106,22 @@
     }
 
     private void handle(DhcpL2RelayEvent event) {
-        eventBusService.send(TOPIC, serialize(event));
+        switch (event.type()) {
+            case STATS_UPDATE:
+                // pushes the stats based on the received event (per subscriber or global) on a Kafka bus
+                if (event.getSubscriberId() != null && event.subject() != null) {
+                    eventBusService.send(DHCP_STATS_TOPIC, serializeStat(event, PER_SUBSCRIBER_STATS_TITLE));
+                } else {
+                    eventBusService.send(DHCP_STATS_TOPIC, serializeStat(event, GLOBAL_STATS_TITLE));
+                }
+                log.trace("Writing to kafka topic:{}, type:{}", DHCP_STATS_TOPIC,
+                        DhcpL2RelayEvent.Type.STATS_UPDATE.toString());
+                break;
+            default:
+                eventBusService.send(TOPIC, serialize(event));
+                log.trace("Writing to kafka topic:{}, type:{}", TOPIC, event.type().toString());
+                break;
+        }
     }
 
     private JsonNode serialize(DhcpL2RelayEvent event) {
@@ -109,6 +142,42 @@
         return dhcpEvent;
     }
 
+    /**
+     * Returns a Json object that represents the DHCP L2 Relay stats.
+     *
+     * @param event DHCP L2 Relay event used for stats.
+     * @param title Describes the type of the received stats event (per subscriber or global).
+     */
+    private JsonNode serializeStat(DhcpL2RelayEvent event, String title) {
+        ObjectMapper mapper = new ObjectMapper();
+        ObjectNode statsEvent = mapper.createObjectNode();
+        Long ts = Instant.now().getEpochSecond();
+
+        // metrics for global and per subscriber stats
+        ObjectNode metrics = mapper.createObjectNode();
+        metrics.put(event.getCountersEntry().getKey(), event.getCountersEntry().getValue().longValue());
+
+        statsEvent.put(INSTANCE_ID, clusterService.getLocalNode().id().toString());
+        statsEvent.put(TITLE, title);
+        statsEvent.put(TS, ts);
+        statsEvent.put(METRICS, metrics);
+
+        // specific metrics for per subscriber stats
+        if (event.getSubscriberId() != null && event.subject() != null) {
+            String sn = deviceService.getDevice(event.subject().location().deviceId()).serialNumber();
+            ObjectNode subscriberInfo = mapper.createObjectNode();
+
+            statsEvent.put(SERIAL_NUMBER, sn);
+            subscriberInfo.put(SUBSCRIBER_ID, event.getSubscriberId());
+            subscriberInfo.put(CONNECT_POINT, event.subject().location().toString());
+            subscriberInfo.put(MAC_ADDRESS, event.subject().macAddress().toString());
+
+            statsEvent.put(SUBSCRIBER_INFO, subscriberInfo);
+        }
+
+        return statsEvent;
+    }
+
     private class InternalDhcpL2RelayListener implements
             DhcpL2RelayListener {