[SEBA-119] Sending UNI_REMOVED event to kafka
Change-Id: Iab9e137b6507ae1462ef85c9d8c9714960ad5b4b
diff --git a/src/main/java/org/opencord/kafka/integrations/AccessDeviceKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/AccessDeviceKafkaIntegration.java
index 9a5ddb1..7367660 100644
--- a/src/main/java/org/opencord/kafka/integrations/AccessDeviceKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/AccessDeviceKafkaIntegration.java
@@ -32,7 +32,6 @@
import org.opencord.olt.AccessDeviceService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.time.Instant;
/**
@@ -53,13 +52,17 @@
private static final String TOPIC = "onu.events";
+ // event fields
private static final String STATUS = "status";
private static final String SERIAL_NUMBER = "serial_number";
private static final String UNI_PORT_ID = "uni_port_id";
private static final String OF_DPID = "of_dpid";
- private static final String ACTIVATED = "activated";
private static final String TIMESTAMP = "timestamp";
+ // statuses
+ private static final String ACTIVATED = "activated";
+ private static final String DISABLED = "disabled";
+
@Activate
public void activate() {
accessDeviceService.addListener(listener);
@@ -72,18 +75,18 @@
log.info("Stopped");
}
- private void handle(AccessDeviceEvent event) {
- eventBusService.send(TOPIC, serialize(event));
+ private void handle(AccessDeviceEvent event, String status) {
+ eventBusService.send(TOPIC, serialize(event, status));
}
- private JsonNode serialize(AccessDeviceEvent event) {
+ private JsonNode serialize(AccessDeviceEvent event, String status) {
Port port = event.port().get();
String serialNumber = port.annotations().value(AnnotationKeys.PORT_NAME);
ObjectMapper mapper = new ObjectMapper();
ObjectNode onuNode = mapper.createObjectNode();
onuNode.put(TIMESTAMP, Instant.now().toString());
- onuNode.put(STATUS, ACTIVATED);
+ onuNode.put(STATUS, status);
onuNode.put(SERIAL_NUMBER, serialNumber);
onuNode.put(UNI_PORT_ID, port.number().toLong());
onuNode.put(OF_DPID, port.element().id().toString());
@@ -96,10 +99,13 @@
@Override
public void event(AccessDeviceEvent accessDeviceEvent) {
+ log.info("Got AccessDeviceEvent: " + accessDeviceEvent.type());
switch (accessDeviceEvent.type()) {
case UNI_ADDED:
- handle(accessDeviceEvent);
+ handle(accessDeviceEvent, ACTIVATED);
break;
+ case UNI_REMOVED:
+ handle(accessDeviceEvent, DISABLED);
default:
break;
}