[VOL-4513] Always push events on the queue, even if they are duplicates
In the case of an Add -> Delete -> Add that are received very quickly,
we might try to add the second Add to the queue before the first one is removed.
If that's the case the second Add won't be processed and we'll be left in an inconsistent state.
The event handling is idempotent anyway so there's no issue in duplicating them.
Change-Id: Iebc28a147eff9a061148bc72a9878b52a62e113d
diff --git a/impl/src/main/java/org/opencord/olt/impl/Olt.java b/impl/src/main/java/org/opencord/olt/impl/Olt.java
index 8f035bf..c5f7806 100644
--- a/impl/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/impl/src/main/java/org/opencord/olt/impl/Olt.java
@@ -649,16 +649,11 @@
} finally {
queueReadLock.unlock();
}
- if (!q.contains(sub)) {
- log.info("Adding subscriber to queue: {} with status {} and subscriber {}",
- portWithName(sub.port), sub.status, sub.hasSubscriber);
- q.add(sub);
- } else {
- log.debug("Not adding subscriber to queue as already present: {} with status {}",
- portWithName(sub.port), sub.status);
- // no need to update the queue in the map if nothing has changed
- return;
- }
+
+ log.info("Adding subscriber to queue: {} with status {} and subscriber {}",
+ portWithName(sub.port), sub.status, sub.hasSubscriber);
+ q.add(sub);
+
try {
queueWriteLock.lock();
eventsQueues.put(cp, q);
@@ -776,8 +771,11 @@
clearQueueForDevice(deviceId);
} else {
log.info("Device {} availability changed to false, but ports are still available, " +
- "assuming temporary disconnection. Ports: {}",
- deviceId, deviceService.getPorts(deviceId));
+ "assuming temporary disconnection.",
+ deviceId);
+ if (log.isTraceEnabled()) {
+ log.trace("Available ports: {}", deviceService.getPorts(deviceId));
+ }
}
return;
case DEVICE_REMOVED:
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
index 376056d..a4f2b5f 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
@@ -422,20 +422,20 @@
public void handleNniFlows(Device device, Port port, FlowOperation action) {
// always handle the LLDP flow
- log.debug("{} LLDP trap flow on NNI {} for device {}", portWithName(port), device.id(), flowOpToString(action));
+ log.debug("{} LLDP trap flow on NNI {} for device {}", flowOpToString(action), portWithName(port), device.id());
processLldpFilteringObjective(device.id(), port, action);
if (enableDhcpOnNni) {
if (enableDhcpV4) {
- log.debug("{} DHCPv4 trap flow on NNI {} for device {}", portWithName(port), device.id(),
- flowOpToString(action));
+ log.debug("{} DHCPv4 trap flow on NNI {} for device {}", flowOpToString(action),
+ portWithName(port), device.id());
processDhcpFilteringObjectives(device.id(), port, action, FlowDirection.DOWNSTREAM,
67, 68, EthType.EtherType.IPV4.ethType(), IPv4.PROTOCOL_UDP,
null, null, nniUniTag);
}
if (enableDhcpV6) {
- log.debug("{} DHCPv6 trap flow on NNI {} for device {}", portWithName(port), device.id(),
- flowOpToString(action));
+ log.debug("{} DHCPv6 trap flow on NNI {} for device {}", flowOpToString(action),
+ portWithName(port), device.id());
processDhcpFilteringObjectives(device.id(), port, action, FlowDirection.DOWNSTREAM,
546, 547, EthType.EtherType.IPV6.ethType(), IPv6.PROTOCOL_UDP,
null, null, nniUniTag);
@@ -445,13 +445,13 @@
}
if (enableIgmpOnNni) {
- log.debug("{} IGMP flow on NNI {} for device {}", portWithName(port), device.id(), flowOpToString(action));
+ log.debug("{} IGMP flow on NNI {} for device {}", flowOpToString(action), portWithName(port), device.id());
processIgmpFilteringObjectives(device.id(), port, action, FlowDirection.DOWNSTREAM,
null, null, NONE_TP_ID, VlanId.NONE, VlanId.ANY, -1);
}
if (enablePppoe) {
- log.debug("{} PPPoE flow on NNI {} for device {}", port.number(), device.id(), flowOpToString(action));
+ log.debug("{} PPPoE flow on NNI {} for device {}", flowOpToString(action), port.number(), device.id());
processPPPoEDFilteringObjectives(device.id(), port, action, FlowDirection.DOWNSTREAM,
null, null, NONE_TP_ID, VlanId.NONE, VlanId.ANY, null);
}
diff --git a/impl/src/test/java/org/opencord/olt/impl/OltTest.java b/impl/src/test/java/org/opencord/olt/impl/OltTest.java
index e8a76d0..0448f9d 100644
--- a/impl/src/test/java/org/opencord/olt/impl/OltTest.java
+++ b/impl/src/test/java/org/opencord/olt/impl/OltTest.java
@@ -194,12 +194,11 @@
LinkedBlockingQueue<DiscoveredSubscriber> spiedQueue = spy(q);
component.eventsQueues.put(cp, spiedQueue);
- // trying to add the same event twice should result in a single item in the queue
component.addSubscriberToQueue(sub);
component.addSubscriberToQueue(sub);
- verify(spiedQueue, times(1)).add(sub);
- Assert.assertEquals(1, spiedQueue.size());
+ verify(spiedQueue, times(2)).add(sub);
+ Assert.assertEquals(2, spiedQueue.size());