Moving InternalDeviceListener logic in separated thread

Change-Id: Icca4b3c6ffa1cb81deb3a80059c2c7fc7335f59d
diff --git a/app/src/main/java/org/opencord/olt/impl/Olt.java b/app/src/main/java/org/opencord/olt/impl/Olt.java
index 48c3aa0..755c178 100644
--- a/app/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/app/src/main/java/org/opencord/olt/impl/Olt.java
@@ -77,6 +77,7 @@
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
 import static org.onlab.util.Tools.get;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -133,8 +134,11 @@
                                                                          groupedThreads("onos/olt-service",
                                                                                         "olt-installer-%d"));
 
+    protected ExecutorService eventExecutor;
+
     @Activate
     public void activate(ComponentContext context) {
+        eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("onos/olt", "events-%d", log));
         modified(context);
         appId = coreService.registerApplication(APP_NAME);
         componentConfigService.registerProperties(getClass());
@@ -697,101 +701,102 @@
     private class InternalDeviceListener implements DeviceListener {
         @Override
         public void event(DeviceEvent event) {
-            DeviceId devId = event.subject().id();
-            Device dev = event.subject();
+            eventExecutor.execute(() -> {
+                DeviceId devId = event.subject().id();
+                Device dev = event.subject();
 
+                if (event.type() == DeviceEvent.Type.PORT_STATS_UPDATED) {
+                    return;
+                }
 
-            if (event.type() == DeviceEvent.Type.PORT_STATS_UPDATED) {
-                return;
-            }
+                if (getOltInfo(dev) == null) {
+                    log.debug("No device info found, this is not an OLT");
+                    return;
+                }
 
-            if (getOltInfo(dev) == null) {
-                log.debug("No device info found, this is not an OLT");
-                return;
-            }
+                log.debug("OLT got {} event for {}", event.type(), event.subject());
 
-            log.debug("OLT got {} event for {}", event.type(), event.subject());
+                switch (event.type()) {
+                    //TODO: Port handling and bookkeeping should be improved once
+                    // olt firmware handles correct behaviour.
+                    case PORT_ADDED:
+                        if (isUniPort(dev, event.port())) {
+                            post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, devId, event.port()));
 
-            switch (event.type()) {
-                //TODO: Port handling and bookkeeping should be improved once
-                // olt firmware handles correct behaviour.
-                case PORT_ADDED:
-                    if (isUniPort(dev, event.port())) {
-                        post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, devId, event.port()));
+                            if (event.port().isEnabled()) {
+                                processFilteringObjectives(devId, event.port().number(), true);
+                            }
+                        } else {
+                            checkAndCreateDeviceFlows(dev);
+                        }
+                        break;
+                    case PORT_REMOVED:
+                        if (isUniPort(dev, event.port())) {
+                            if (event.port().isEnabled()) {
+                                processFilteringObjectives(devId, event.port().number(), false);
+                                removeSubscriber(new ConnectPoint(devId, event.port().number()));
+                            }
+
+                            post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_REMOVED, devId, event.port()));
+                        }
+
+                        break;
+                    case PORT_UPDATED:
+                        if (!isUniPort(dev, event.port())) {
+                            break;
+                        }
 
                         if (event.port().isEnabled()) {
                             processFilteringObjectives(devId, event.port().number(), true);
-                        }
-                    } else {
-                        checkAndCreateDeviceFlows(dev);
-                    }
-                    break;
-                case PORT_REMOVED:
-                    if (isUniPort(dev, event.port())) {
-                        if (event.port().isEnabled()) {
+                            post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, devId, event.port()));
+                        } else {
                             processFilteringObjectives(devId, event.port().number(), false);
-                            removeSubscriber(new ConnectPoint(devId, event.port().number()));
+                            post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_REMOVED, devId, event.port()));
                         }
-
-                        post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_REMOVED, devId, event.port()));
-                    }
-
-                    break;
-                case PORT_UPDATED:
-                    if (!isUniPort(dev, event.port())) {
                         break;
-                    }
-
-                    if (event.port().isEnabled()) {
-                        processFilteringObjectives(devId, event.port().number(), true);
-                        post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_ADDED, devId, event.port()));
-                    } else {
-                        processFilteringObjectives(devId, event.port().number(), false);
-                        post(new AccessDeviceEvent(AccessDeviceEvent.Type.UNI_REMOVED, devId, event.port()));
-                    }
-                    break;
-                case DEVICE_ADDED:
-                    post(new AccessDeviceEvent(
-                            AccessDeviceEvent.Type.DEVICE_CONNECTED, devId,
-                            null, null));
-
-                    // Send UNI_ADDED events for all existing ports
-                    deviceService.getPorts(devId).stream()
-                            .filter(p -> isUniPort(dev, p))
-                            .filter(Port::isEnabled)
-                            .forEach(p -> post(new AccessDeviceEvent(
-                                    AccessDeviceEvent.Type.UNI_ADDED, devId, p)));
-
-                    checkAndCreateDeviceFlows(dev);
-                    break;
-                case DEVICE_REMOVED:
-                    deviceService.getPorts(devId).stream()
-                            .filter(p -> isUniPort(dev, p))
-                            .forEach(p -> post(new AccessDeviceEvent(
-                                    AccessDeviceEvent.Type.UNI_REMOVED, devId, p)));
-
-                    post(new AccessDeviceEvent(
-                            AccessDeviceEvent.Type.DEVICE_DISCONNECTED, devId,
-                            null, null));
-                    break;
-                case DEVICE_AVAILABILITY_CHANGED:
-                    if (deviceService.isAvailable(devId)) {
+                    case DEVICE_ADDED:
                         post(new AccessDeviceEvent(
                                 AccessDeviceEvent.Type.DEVICE_CONNECTED, devId,
                                 null, null));
+
+                        // Send UNI_ADDED events for all existing ports
+                        deviceService.getPorts(devId).stream()
+                                .filter(p -> isUniPort(dev, p))
+                                .filter(Port::isEnabled)
+                                .forEach(p -> post(new AccessDeviceEvent(
+                                        AccessDeviceEvent.Type.UNI_ADDED, devId, p)));
+
                         checkAndCreateDeviceFlows(dev);
-                    } else {
+                        break;
+                    case DEVICE_REMOVED:
+                        deviceService.getPorts(devId).stream()
+                                .filter(p -> isUniPort(dev, p))
+                                .forEach(p -> post(new AccessDeviceEvent(
+                                        AccessDeviceEvent.Type.UNI_REMOVED, devId, p)));
+
                         post(new AccessDeviceEvent(
                                 AccessDeviceEvent.Type.DEVICE_DISCONNECTED, devId,
                                 null, null));
-                    }
-                    break;
-                case DEVICE_UPDATED:
-                case DEVICE_SUSPENDED:
-                case PORT_STATS_UPDATED:
-                default:
-                    return;
-            }
+                        break;
+                    case DEVICE_AVAILABILITY_CHANGED:
+                        if (deviceService.isAvailable(devId)) {
+                            post(new AccessDeviceEvent(
+                                    AccessDeviceEvent.Type.DEVICE_CONNECTED, devId,
+                                    null, null));
+                            checkAndCreateDeviceFlows(dev);
+                        } else {
+                            post(new AccessDeviceEvent(
+                                    AccessDeviceEvent.Type.DEVICE_DISCONNECTED, devId,
+                                    null, null));
+                        }
+                        break;
+                    case DEVICE_UPDATED:
+                    case DEVICE_SUSPENDED:
+                    case PORT_STATS_UPDATED:
+                    default:
+                        return;
+                }
+            });
         }
     }
 }