[VOL-3088] Removing Groups upon OLT disconnection

Change-Id: I54122280e93247c547f46dd138597df7f8f08c0d
diff --git a/app/src/main/java/org/opencord/cordmcast/impl/CordMcast.java b/app/src/main/java/org/opencord/cordmcast/impl/CordMcast.java
index c47f9d1..7f1a4d1 100644
--- a/app/src/main/java/org/opencord/cordmcast/impl/CordMcast.java
+++ b/app/src/main/java/org/opencord/cordmcast/impl/CordMcast.java
@@ -44,6 +44,8 @@
 import org.onosproject.net.config.NetworkConfigRegistry;
 import org.onosproject.net.config.basics.McastConfig;
 import org.onosproject.net.config.basics.SubjectFactories;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.flow.DefaultTrafficSelector;
 import org.onosproject.net.flow.DefaultTrafficTreatment;
@@ -57,6 +59,7 @@
 import org.onosproject.net.flowobjective.Objective;
 import org.onosproject.net.flowobjective.ObjectiveContext;
 import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.group.GroupService;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.ConsistentMap;
 import org.onosproject.store.service.Serializer;
@@ -148,6 +151,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected CordMcastStatisticsService cordMcastStatisticsService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected GroupService groupService;
+
     protected McastListener listener = new InternalMulticastListener();
 
     private InternalNetworkConfigListener configListener =
@@ -192,6 +198,9 @@
     }
     private ExecutorService eventExecutor;
 
+    //Device listener to purge groups upon device disconnection.
+    private DeviceListener deviceListener = new InternalDeviceListener();
+
     @Activate
     public void activate(ComponentContext context) {
         componentConfigService.registerProperties(getClass());
@@ -225,6 +234,7 @@
 
         McastConfig config = networkConfig.getConfig(coreAppId, CORD_MCAST_CONFIG_CLASS);
         updateConfig(config);
+        deviceService.addListener(deviceListener);
         log.info("Started");
     }
 
@@ -247,6 +257,7 @@
 
     @Deactivate
     public void deactivate() {
+        deviceService.removeListener(deviceListener);
         componentConfigService.unregisterProperties(getClass(), false);
         mcastService.removeListener(listener);
         networkConfig.removeListener(configListener);
@@ -573,6 +584,7 @@
                                 if (vlanEnabled && (mcastVlan != config.egressVlan().toShort() ||
                                                     !mcastInnerVlan.equals(config.egressInnerVlan()))) {
                                     clearGroups();
+                                    groups.clear();
                                 }
                                 updateConfig(config);
                             }
@@ -771,6 +783,43 @@
         return true;
     }
 
+    private class InternalDeviceListener implements DeviceListener {
+
+        @Override
+        public void event(DeviceEvent event) {
+            eventExecutor.execute(() -> {
+                DeviceId devId = event.subject().id();
+                if (!deviceService.isAvailable(devId)) {
+                    if (deviceService.getPorts(devId).isEmpty()) {
+                        log.info("Handling controlled device disconnection .. "
+                                         + "flushing all state for dev:{}", devId);
+                        groupService.purgeGroupEntries(devId);
+                        groups.keySet().iterator().forEachRemaining(groupInfo -> {
+                            if (groupInfo.device.equals(devId)) {
+                                log.debug("Removing next key {} from distributed mcast map", groupInfo.group);
+                                groups.remove(groupInfo);
+                            }
+                        });
+                    } else {
+                        log.info("Disconnected device has available ports .. "
+                                         + "assuming temporary disconnection, "
+                                         + "retaining state for device {}", devId);
+                    }
+                } else {
+                    log.debug("Device is connected {}, " +
+                                      "currently not handling", devId);
+                }
+            });
+
+        }
+
+        @Override
+        public boolean isRelevant(DeviceEvent event) {
+            return isLocalLeader(event.subject().id())
+                    && event.type().equals(DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED);
+        }
+    }
+
 }