Send UNI_ADDED events on port enable.
Add timestamp in events.
Change-Id: I39fc961e196944b6049cb786ac4a685e8bf6acbe
(cherry picked from commit 6960f820c1d943ffb24ca5127b0488c71b428e94)
diff --git a/kafka/src/main/java/org/opencord/olt/kafka/KafkaIntegration.java b/kafka/src/main/java/org/opencord/olt/kafka/KafkaIntegration.java
index 60be6e6..7ecbaf6 100644
--- a/kafka/src/main/java/org/opencord/olt/kafka/KafkaIntegration.java
+++ b/kafka/src/main/java/org/opencord/olt/kafka/KafkaIntegration.java
@@ -39,6 +39,7 @@
import org.opencord.olt.AccessDeviceService;
import org.slf4j.Logger;
+import java.time.Instant;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
@@ -109,6 +110,7 @@
private static final String UNI_PORT_ID = "uni_port_id";
private static final String OF_DPID = "of_dpid";
private static final String ACTIVATED = "activated";
+ private static final String TIMESTAMP = "timestamp";
@Activate
public void activate() {
@@ -194,8 +196,8 @@
String serialNumber = port.annotations().value(AnnotationKeys.PORT_NAME);
ObjectMapper mapper = new ObjectMapper();
-
ObjectNode onuNode = mapper.createObjectNode();
+ onuNode.put(TIMESTAMP, Instant.now().toString());
onuNode.put(STATUS, ACTIVATED);
onuNode.put(SERIAL_NUMBER, serialNumber);
onuNode.put(UNI_PORT_ID, port.number().toLong());
@@ -244,6 +246,8 @@
@Override
public void event(AccessDeviceEvent accessDeviceEvent) {
+ log.debug("KafkaIntegration got {} event for {}/{}",
+ accessDeviceEvent.type(), accessDeviceEvent.subject(), accessDeviceEvent.port());
switch (accessDeviceEvent.type()) {
case UNI_ADDED:
executor.execute(() ->