[SEBA-119] Sending UNI_REMOVED event to kafka

Change-Id: Iab9e137b6507ae1462ef85c9d8c9714960ad5b4b
diff --git a/src/main/java/org/opencord/kafka/integrations/AccessDeviceKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/AccessDeviceKafkaIntegration.java
index 9a5ddb1..7367660 100644
--- a/src/main/java/org/opencord/kafka/integrations/AccessDeviceKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/AccessDeviceKafkaIntegration.java
@@ -32,7 +32,6 @@
 import org.opencord.olt.AccessDeviceService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.time.Instant;
 
 /**
@@ -53,13 +52,17 @@
 
     private static final String TOPIC = "onu.events";
 
+    // event fields
     private static final String STATUS = "status";
     private static final String SERIAL_NUMBER = "serial_number";
     private static final String UNI_PORT_ID = "uni_port_id";
     private static final String OF_DPID = "of_dpid";
-    private static final String ACTIVATED = "activated";
     private static final String TIMESTAMP = "timestamp";
 
+    // statuses
+    private static final String ACTIVATED = "activated";
+    private static final String DISABLED = "disabled";
+
     @Activate
     public void activate() {
         accessDeviceService.addListener(listener);
@@ -72,18 +75,18 @@
         log.info("Stopped");
     }
 
-    private void handle(AccessDeviceEvent event) {
-        eventBusService.send(TOPIC, serialize(event));
+    private void handle(AccessDeviceEvent event, String status) {
+        eventBusService.send(TOPIC, serialize(event, status));
     }
 
-    private JsonNode serialize(AccessDeviceEvent event) {
+    private JsonNode serialize(AccessDeviceEvent event, String status) {
         Port port = event.port().get();
         String serialNumber = port.annotations().value(AnnotationKeys.PORT_NAME);
 
         ObjectMapper mapper = new ObjectMapper();
         ObjectNode onuNode = mapper.createObjectNode();
         onuNode.put(TIMESTAMP, Instant.now().toString());
-        onuNode.put(STATUS, ACTIVATED);
+        onuNode.put(STATUS, status);
         onuNode.put(SERIAL_NUMBER, serialNumber);
         onuNode.put(UNI_PORT_ID, port.number().toLong());
         onuNode.put(OF_DPID, port.element().id().toString());
@@ -96,10 +99,13 @@
 
         @Override
         public void event(AccessDeviceEvent accessDeviceEvent) {
+            log.info("Got AccessDeviceEvent: " + accessDeviceEvent.type());
             switch (accessDeviceEvent.type()) {
             case UNI_ADDED:
-                handle(accessDeviceEvent);
+                handle(accessDeviceEvent, ACTIVATED);
                 break;
+            case UNI_REMOVED:
+                handle(accessDeviceEvent, DISABLED);
             default:
                 break;
             }