Moving InternalDeviceListener logic in separated thread
Change-Id: Icca4b3c6ffa1cb81deb3a80059c2c7fc7335f59d
diff --git a/app/src/main/java/org/opencord/olt/impl/Olt.java b/app/src/main/java/org/opencord/olt/impl/Olt.java
index 48c3aa0..755c178 100644
--- a/app/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/app/src/main/java/org/opencord/olt/impl/Olt.java
@@ -77,6 +77,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
@@ -133,8 +134,11 @@
groupedThreads("onos/olt-service",
"olt-installer-%d"));
+ protected ExecutorService eventExecutor;
+
@Activate
public void activate(ComponentContext context) {
+ eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("onos/olt", "events-%d", log));
modified(context);
appId = coreService.registerApplication(APP_NAME);
componentConfigService.registerProperties(getClass());
@@ -697,101 +701,102 @@
private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
- DeviceId devId = event.subject().id();
- Device dev = event.subject();
+ eventExecutor.execute(() -> {
+ DeviceId devId = event.subject().id();
+ Device dev = event.subject();
+ if (event.type() == DeviceEvent.Type.PORT_STATS_UPDATED) {
+ return;
+ }
- if (event.type() == DeviceEvent.Type.PORT_STATS_UPDATED) {
- return;
- }
+ if (getOltInfo(dev) == null) {
+ log.debug("No device info found, this is not an OLT");
+ return;
+ }
- if (getOltInfo(dev) == null) {
- log.debug("No device info found, this is not an OLT");
- return;
- }
+ log.debug("OLT got {} event for {}", event.type(), event.subject());
- log.debug("OLT got {} event for {}", event.type(), event.subject());
+ switch (event.type()) {
+ //TODO: Port handling and bookkeeping should be improved once
+ // olt firmware handles correct behaviour.
+ case PORT_ADDED:
+ if (isUniPort(dev, event.port())) {
+ post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, devId, event.port()));
- switch (event.type()) {
- //TODO: Port handling and bookkeeping should be improved once
- // olt firmware handles correct behaviour.
- case PORT_ADDED:
- if (isUniPort(dev, event.port())) {
- post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, devId, event.port()));
+ if (event.port().isEnabled()) {
+ processFilteringObjectives(devId, event.port().number(), true);
+ }
+ } else {
+ checkAndCreateDeviceFlows(dev);
+ }
+ break;
+ case PORT_REMOVED:
+ if (isUniPort(dev, event.port())) {
+ if (event.port().isEnabled()) {
+ processFilteringObjectives(devId, event.port().number(), false);
+ removeSubscriber(new ConnectPoint(devId, event.port().number()));
+ }
+
+ post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_REMOVED, devId, event.port()));
+ }
+
+ break;
+ case PORT_UPDATED:
+ if (!isUniPort(dev, event.port())) {
+ break;
+ }
if (event.port().isEnabled()) {
processFilteringObjectives(devId, event.port().number(), true);
- }
- } else {
- checkAndCreateDeviceFlows(dev);
- }
- break;
- case PORT_REMOVED:
- if (isUniPort(dev, event.port())) {
- if (event.port().isEnabled()) {
+ post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, devId, event.port()));
+ } else {
processFilteringObjectives(devId, event.port().number(), false);
- removeSubscriber(new ConnectPoint(devId, event.port().number()));
+ post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_REMOVED, devId, event.port()));
}
-
- post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_REMOVED, devId, event.port()));
- }
-
- break;
- case PORT_UPDATED:
- if (!isUniPort(dev, event.port())) {
break;
- }
-
- if (event.port().isEnabled()) {
- processFilteringObjectives(devId, event.port().number(), true);
- post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, devId, event.port()));
- } else {
- processFilteringObjectives(devId, event.port().number(), false);
- post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_REMOVED, devId, event.port()));
- }
- break;
- case DEVICE_ADDED:
- post(new AccessDeviceEvent(
- AccessDeviceEvent.Type.DEVICE_CONNECTED, devId,
- null, null));
-
- // Send UNI_ADDED events for all existing ports
- deviceService.getPorts(devId).stream()
- .filter(p -> isUniPort(dev, p))
- .filter(Port::isEnabled)
- .forEach(p -> post(new AccessDeviceEvent(
- AccessDeviceEvent.Type.UNI_ADDED, devId, p)));
-
- checkAndCreateDeviceFlows(dev);
- break;
- case DEVICE_REMOVED:
- deviceService.getPorts(devId).stream()
- .filter(p -> isUniPort(dev, p))
- .forEach(p -> post(new AccessDeviceEvent(
- AccessDeviceEvent.Type.UNI_REMOVED, devId, p)));
-
- post(new AccessDeviceEvent(
- AccessDeviceEvent.Type.DEVICE_DISCONNECTED, devId,
- null, null));
- break;
- case DEVICE_AVAILABILITY_CHANGED:
- if (deviceService.isAvailable(devId)) {
+ case DEVICE_ADDED:
post(new AccessDeviceEvent(
AccessDeviceEvent.Type.DEVICE_CONNECTED, devId,
null, null));
+
+ // Send UNI_ADDED events for all existing ports
+ deviceService.getPorts(devId).stream()
+ .filter(p -> isUniPort(dev, p))
+ .filter(Port::isEnabled)
+ .forEach(p -> post(new AccessDeviceEvent(
+ AccessDeviceEvent.Type.UNI_ADDED, devId, p)));
+
checkAndCreateDeviceFlows(dev);
- } else {
+ break;
+ case DEVICE_REMOVED:
+ deviceService.getPorts(devId).stream()
+ .filter(p -> isUniPort(dev, p))
+ .forEach(p -> post(new AccessDeviceEvent(
+ AccessDeviceEvent.Type.UNI_REMOVED, devId, p)));
+
post(new AccessDeviceEvent(
AccessDeviceEvent.Type.DEVICE_DISCONNECTED, devId,
null, null));
- }
- break;
- case DEVICE_UPDATED:
- case DEVICE_SUSPENDED:
- case PORT_STATS_UPDATED:
- default:
- return;
- }
+ break;
+ case DEVICE_AVAILABILITY_CHANGED:
+ if (deviceService.isAvailable(devId)) {
+ post(new AccessDeviceEvent(
+ AccessDeviceEvent.Type.DEVICE_CONNECTED, devId,
+ null, null));
+ checkAndCreateDeviceFlows(dev);
+ } else {
+ post(new AccessDeviceEvent(
+ AccessDeviceEvent.Type.DEVICE_DISCONNECTED, devId,
+ null, null));
+ }
+ break;
+ case DEVICE_UPDATED:
+ case DEVICE_SUSPENDED:
+ case PORT_STATS_UPDATED:
+ default:
+ return;
+ }
+ });
}
}
}