Send UNI_ADDED events on port enable.
Add timestamp in events.
Change-Id: I39fc961e196944b6049cb786ac4a685e8bf6acbe
diff --git a/app/src/main/java/org/opencord/olt/impl/Olt.java b/app/src/main/java/org/opencord/olt/impl/Olt.java
index c3a6f73..9fe561b 100644
--- a/app/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/app/src/main/java/org/opencord/olt/impl/Olt.java
@@ -578,6 +578,9 @@
if (!oltData.containsKey(devId)) {
return;
}
+ if (event.type() != DeviceEvent.Type.PORT_STATS_UPDATED) {
+ log.debug("Olt got {} event for {}", event.type(), event.subject());
+ }
switch (event.type()) {
//TODO: Port handling and bookkeeping should be improved once
// olt firmware handles correct behaviour.
@@ -614,6 +617,7 @@
}
if (event.port().isEnabled()) {
processFilteringObjectives(devId, event.port().number(), true);
+ post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, devId, event.port()));
} else {
processFilteringObjectives(devId, event.port().number(), false);
}
diff --git a/kafka/src/main/java/org/opencord/olt/kafka/KafkaIntegration.java b/kafka/src/main/java/org/opencord/olt/kafka/KafkaIntegration.java
index 60be6e6..7ecbaf6 100644
--- a/kafka/src/main/java/org/opencord/olt/kafka/KafkaIntegration.java
+++ b/kafka/src/main/java/org/opencord/olt/kafka/KafkaIntegration.java
@@ -39,6 +39,7 @@
import org.opencord.olt.AccessDeviceService;
import org.slf4j.Logger;
+import java.time.Instant;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
@@ -109,6 +110,7 @@
private static final String UNI_PORT_ID = "uni_port_id";
private static final String OF_DPID = "of_dpid";
private static final String ACTIVATED = "activated";
+ private static final String TIMESTAMP = "timestamp";
@Activate
public void activate() {
@@ -194,8 +196,8 @@
String serialNumber = port.annotations().value(AnnotationKeys.PORT_NAME);
ObjectMapper mapper = new ObjectMapper();
-
ObjectNode onuNode = mapper.createObjectNode();
+ onuNode.put(TIMESTAMP, Instant.now().toString());
onuNode.put(STATUS, ACTIVATED);
onuNode.put(SERIAL_NUMBER, serialNumber);
onuNode.put(UNI_PORT_ID, port.number().toLong());
@@ -244,6 +246,8 @@
@Override
public void event(AccessDeviceEvent accessDeviceEvent) {
+ log.debug("KafkaIntegration got {} event for {}/{}",
+ accessDeviceEvent.type(), accessDeviceEvent.subject(), accessDeviceEvent.port());
switch (accessDeviceEvent.type()) {
case UNI_ADDED:
executor.execute(() ->