Merge "[SEBA-549] Moving to aaa, dhcp and olt SNAPSHOT versions"
diff --git a/src/main/java/org/opencord/kafka/integrations/DeviceKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/DeviceKafkaIntegration.java
index 488a338..b96bd5d 100644
--- a/src/main/java/org/opencord/kafka/integrations/DeviceKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/DeviceKafkaIntegration.java
@@ -30,6 +30,7 @@
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.device.PortStatistics;
+import org.onosproject.net.Port;
import org.opencord.kafka.EventBusService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,6 +57,7 @@
private final DeviceListener listener = new InternalDeviceListener();
private static final String TOPIC = "onos.kpis";
+ private static final String PORT_EVENT_TOPIC = "onos.events.port";
// event fields
private static final String TIMESTAMP = "timestamp";
@@ -68,7 +70,9 @@
private static final String BYTES_TX = "bytesTx";
private static final String PKT_RX_DROP = "pktRxDrp";
private static final String PKT_TX_DROP = "pktTxDrp";
-
+ private static final String ENABLED = "enabled";
+ private static final String SPEED = "speed";
+ private static final String TYPE = "type";
@Activate
public void activate() {
@@ -83,10 +87,14 @@
}
private void handle(List<PortStatistics> stats, DeviceId deviceId) {
- eventBusService.send(TOPIC, serialize(stats, deviceId));
+ eventBusService.send(TOPIC, serializeStats(stats, deviceId));
}
- private JsonNode serialize(List<PortStatistics> stats, DeviceId deviceId) {
+ private void handlePortUpdate(Port port, DeviceId deviceId) {
+ eventBusService.send(PORT_EVENT_TOPIC, serializePort(port, deviceId));
+ }
+
+ private JsonNode serializeStats(List<PortStatistics> stats, DeviceId deviceId) {
ObjectMapper mapper = new ObjectMapper();
ObjectNode kpis = mapper.createObjectNode();
@@ -114,11 +122,27 @@
return kpis;
}
+ private JsonNode serializePort(Port port, DeviceId deviceId) {
+
+ ObjectMapper mapper = new ObjectMapper();
+ ObjectNode update = mapper.createObjectNode();
+
+ update.put(TIMESTAMP, Instant.now().toString());
+ update.put(DEVICE_ID, deviceId.toString());
+ update.put(PORT_ID, port.number().toString());
+ update.put(ENABLED, port.isEnabled());
+ update.put(SPEED, port.portSpeed());
+ update.put(TYPE, port.type().toString());
+
+ return update;
+ }
+
private class InternalDeviceListener implements
DeviceListener {
@Override
public void event(DeviceEvent deviceEvent) {
+ final DeviceId deviceId;
if (deviceEvent.subject().manufacturer().contains("VOLTHA")) {
// TODO check the NNI port instead
@@ -128,10 +152,15 @@
log.trace("Got DeviceEvent: " + deviceEvent.type());
switch (deviceEvent.type()) {
case PORT_STATS_UPDATED:
- final DeviceId deviceId = deviceEvent.subject().id();
+ deviceId = deviceEvent.subject().id();
final List<PortStatistics> stats = deviceService.getPortStatistics(deviceId);
handle(stats, deviceId);
break;
+ case PORT_UPDATED:
+ deviceId = deviceEvent.subject().id();
+ final Port port = deviceEvent.port();
+ handlePortUpdate(port, deviceId);
+ break;
default:
break;
}