[SEBA-144] Operational Status DHCP L2 Relay.
Change-Id: I8aaeda97feb842f1dd0268d452b3572507923980
Signed-off-by: Marcos Aurelio Carrero <mcarrero@furukawalatam.com>
diff --git a/src/main/java/org/opencord/kafka/integrations/DhcpL2RelayKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/DhcpL2RelayKafkaIntegration.java
index 58b61b8..494635f 100644
--- a/src/main/java/org/opencord/kafka/integrations/DhcpL2RelayKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/DhcpL2RelayKafkaIntegration.java
@@ -19,6 +19,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.onosproject.cluster.ClusterService;
import org.onosproject.net.AnnotationKeys;
import org.onosproject.net.device.DeviceService;
import org.opencord.dhcpl2relay.DhcpAllocationInfo;
@@ -46,6 +47,9 @@
protected EventBusService eventBusService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.OPTIONAL,
@@ -57,7 +61,9 @@
private final DhcpL2RelayListener listener = new InternalDhcpL2RelayListener();
+ // topics
private static final String TOPIC = "dhcp.events";
+ private static final String DHCP_STATS_TOPIC = "onos.dhcp.stats.kpis";
private static final String TIMESTAMP = "timestamp";
private static final String DEVICE_ID = "deviceId";
@@ -68,6 +74,18 @@
private static final String MAC_ADDRESS = "macAddress";
private static final String IP_ADDRESS = "ipAddress";
+ // dhcp stats event params
+ static final String CONNECT_POINT = "connectPoint";
+ static final String INSTANCE_ID = "instance_id";
+ static final String METRICS = "metrics";
+ static final String SUBSCRIBER_ID = "subscriberId";
+ static final String SUBSCRIBER_INFO = "subscriberInfo";
+ static final String TS = "ts";
+ static final String TITLE = "title";
+
+ static final String GLOBAL_STATS_TITLE = "DHCP_L2_Relay_stats";
+ static final String PER_SUBSCRIBER_STATS_TITLE = "DHCP_L2_Relay_stats_Per_Subscriber";
+
protected void bindDhcpL2RelayService(DhcpL2RelayService incomingService) {
bindAndAddListener(incomingService, dhcpL2RelayServiceRef, listener);
}
@@ -88,7 +106,22 @@
}
private void handle(DhcpL2RelayEvent event) {
- eventBusService.send(TOPIC, serialize(event));
+ switch (event.type()) {
+ case STATS_UPDATE:
+ // pushes the stats based on the received event (per subscriber or global) on a Kafka bus
+ if (event.getSubscriberId() != null && event.subject() != null) {
+ eventBusService.send(DHCP_STATS_TOPIC, serializeStat(event, PER_SUBSCRIBER_STATS_TITLE));
+ } else {
+ eventBusService.send(DHCP_STATS_TOPIC, serializeStat(event, GLOBAL_STATS_TITLE));
+ }
+ log.trace("Writing to kafka topic:{}, type:{}", DHCP_STATS_TOPIC,
+ DhcpL2RelayEvent.Type.STATS_UPDATE.toString());
+ break;
+ default:
+ eventBusService.send(TOPIC, serialize(event));
+ log.trace("Writing to kafka topic:{}, type:{}", TOPIC, event.type().toString());
+ break;
+ }
}
private JsonNode serialize(DhcpL2RelayEvent event) {
@@ -109,6 +142,42 @@
return dhcpEvent;
}
+ /**
+ * Returns a Json object that represents the DHCP L2 Relay stats.
+ *
+ * @param event DHCP L2 Relay event used for stats.
+ * @param title Describes the type of the received stats event (per subscriber or global).
+ */
+ private JsonNode serializeStat(DhcpL2RelayEvent event, String title) {
+ ObjectMapper mapper = new ObjectMapper();
+ ObjectNode statsEvent = mapper.createObjectNode();
+ Long ts = Instant.now().getEpochSecond();
+
+ // metrics for global and per subscriber stats
+ ObjectNode metrics = mapper.createObjectNode();
+ metrics.put(event.getCountersEntry().getKey(), event.getCountersEntry().getValue().longValue());
+
+ statsEvent.put(INSTANCE_ID, clusterService.getLocalNode().id().toString());
+ statsEvent.put(TITLE, title);
+ statsEvent.put(TS, ts);
+ statsEvent.put(METRICS, metrics);
+
+ // specific metrics for per subscriber stats
+ if (event.getSubscriberId() != null && event.subject() != null) {
+ String sn = deviceService.getDevice(event.subject().location().deviceId()).serialNumber();
+ ObjectNode subscriberInfo = mapper.createObjectNode();
+
+ statsEvent.put(SERIAL_NUMBER, sn);
+ subscriberInfo.put(SUBSCRIBER_ID, event.getSubscriberId());
+ subscriberInfo.put(CONNECT_POINT, event.subject().location().toString());
+ subscriberInfo.put(MAC_ADDRESS, event.subject().macAddress().toString());
+
+ statsEvent.put(SUBSCRIBER_INFO, subscriberInfo);
+ }
+
+ return statsEvent;
+ }
+
private class InternalDhcpL2RelayListener implements
DhcpL2RelayListener {