Merge "[SEBA-549] Moving to aaa, dhcp and olt SNAPSHOT versions"
diff --git a/src/main/java/org/opencord/kafka/integrations/DeviceKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/DeviceKafkaIntegration.java
index 488a338..b96bd5d 100644
--- a/src/main/java/org/opencord/kafka/integrations/DeviceKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/DeviceKafkaIntegration.java
@@ -30,6 +30,7 @@
 import org.onosproject.net.device.DeviceListener;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.device.PortStatistics;
+import org.onosproject.net.Port;
 import org.opencord.kafka.EventBusService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,6 +57,7 @@
     private final DeviceListener listener = new InternalDeviceListener();
 
     private static final String TOPIC = "onos.kpis";
+    private static final String PORT_EVENT_TOPIC = "onos.events.port";
 
     // event fields
     private static final String TIMESTAMP = "timestamp";
@@ -68,7 +70,9 @@
     private static final String BYTES_TX = "bytesTx";
     private static final String PKT_RX_DROP = "pktRxDrp";
     private static final String PKT_TX_DROP = "pktTxDrp";
-
+    private static final String ENABLED = "enabled";
+    private static final String SPEED = "speed";
+    private static final String TYPE = "type";
 
     @Activate
     public void activate() {
@@ -83,10 +87,14 @@
     }
 
     private void handle(List<PortStatistics> stats, DeviceId deviceId) {
-        eventBusService.send(TOPIC, serialize(stats, deviceId));
+        eventBusService.send(TOPIC, serializeStats(stats, deviceId));
     }
 
-    private JsonNode serialize(List<PortStatistics> stats, DeviceId deviceId) {
+    private void handlePortUpdate(Port port, DeviceId deviceId) {
+        eventBusService.send(PORT_EVENT_TOPIC, serializePort(port, deviceId));
+    }
+
+    private JsonNode serializeStats(List<PortStatistics> stats, DeviceId deviceId) {
 
         ObjectMapper mapper = new ObjectMapper();
         ObjectNode kpis = mapper.createObjectNode();
@@ -114,11 +122,27 @@
         return kpis;
     }
 
+    private JsonNode serializePort(Port port, DeviceId deviceId) {
+
+        ObjectMapper mapper = new ObjectMapper();
+        ObjectNode update = mapper.createObjectNode();
+
+        update.put(TIMESTAMP, Instant.now().toString());
+        update.put(DEVICE_ID, deviceId.toString());
+        update.put(PORT_ID, port.number().toString());
+        update.put(ENABLED, port.isEnabled());
+        update.put(SPEED, port.portSpeed());
+        update.put(TYPE, port.type().toString());
+
+        return update;
+    }
+
     private class InternalDeviceListener implements
             DeviceListener {
 
         @Override
         public void event(DeviceEvent deviceEvent) {
+            final DeviceId deviceId;
 
             if (deviceEvent.subject().manufacturer().contains("VOLTHA")) {
                 // TODO check the NNI port instead
@@ -128,10 +152,15 @@
             log.trace("Got DeviceEvent: " + deviceEvent.type());
             switch (deviceEvent.type()) {
             case PORT_STATS_UPDATED:
-                final DeviceId deviceId = deviceEvent.subject().id();
+                deviceId = deviceEvent.subject().id();
                 final List<PortStatistics> stats = deviceService.getPortStatistics(deviceId);
                 handle(stats, deviceId);
                 break;
+            case PORT_UPDATED:
+                deviceId = deviceEvent.subject().id();
+                final Port port = deviceEvent.port();
+                handlePortUpdate(port, deviceId);
+                break;
             default:
                 break;
             }