SEBA-987 IgmpProxy should restore/remove sources on port up/down events.

Change-Id: I3b71c851776e9d3a515fa94bd6704d1640966f77
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java
index edd23f6..ed3dad8 100644
--- a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java
@@ -822,65 +822,70 @@
     private class InternalDeviceListener implements DeviceListener {
         @Override
         public void event(DeviceEvent event) {
-            DeviceId devId = event.subject().id();
-            Port p = event.port();
-            if (getSubscriberAndDeviceInformation(devId).isEmpty() &&
-                    !(p != null && isConnectPoint(devId, p.number()))) {
-                return;
-            }
-            PortNumber port;
+            eventExecutor.execute(() -> {
+                DeviceId devId = event.subject().id();
+                Port p = event.port();
 
-            switch (event.type()) {
+                if (!igmpLeadershipService.isLocalLeader(devId)) {
+                    return;
+                }
+                if (getSubscriberAndDeviceInformation(devId).isEmpty() &&
+                        !(p != null && isConnectPoint(devId, p.number()))) {
+                    return;
+                }
+                PortNumber port;
 
-                case DEVICE_ADDED:
-                case DEVICE_UPDATED:
-                case DEVICE_REMOVED:
-                case DEVICE_SUSPENDED:
-                case DEVICE_AVAILABILITY_CHANGED:
-                case PORT_STATS_UPDATED:
-                    break;
-                case PORT_ADDED:
-                    port = p.number();
-                    if (getSubscriberAndDeviceInformation(devId).isPresent() &&
-                            !isUplink(devId, port) && !isConnectPoint(devId, port)) {
-                        processFilterObjective(devId, port, false);
-                    } else if (isUplink(devId, port)) {
-                        provisionUplinkFlows();
-                    } else if (isConnectPoint(devId, port)) {
-                        provisionConnectPointFlows();
-                    }
-                    break;
-                case PORT_UPDATED:
-                    port = p.number();
-                    if (getSubscriberAndDeviceInformation(devId).isPresent() &&
-                            !isUplink(devId, port) && !isConnectPoint(devId, port)) {
-                        if (event.port().isEnabled()) {
+                switch (event.type()) {
+
+                    case DEVICE_ADDED:
+                    case DEVICE_UPDATED:
+                    case DEVICE_REMOVED:
+                    case DEVICE_SUSPENDED:
+                    case DEVICE_AVAILABILITY_CHANGED:
+                    case PORT_STATS_UPDATED:
+                        break;
+                    case PORT_ADDED:
+                        port = p.number();
+                        if (getSubscriberAndDeviceInformation(devId).isPresent() &&
+                                !isUplink(devId, port) && !isConnectPoint(devId, port)) {
                             processFilterObjective(devId, port, false);
-                        } else {
-                            processFilterObjective(devId, port, true);
-                        }
-                    } else if (isUplink(devId, port)) {
-                        if (event.port().isEnabled()) {
-                            provisionUplinkFlows(devId);
-                        } else {
-                            processFilterObjective(devId, port, true);
-                        }
-                    } else if (isConnectPoint(devId, port)) {
-                        if (event.port().isEnabled()) {
+                        } else if (isUplink(devId, port)) {
+                            provisionUplinkFlows();
+                        } else if (isConnectPoint(devId, port)) {
                             provisionConnectPointFlows();
-                        } else {
-                            unprovisionConnectPointFlows();
                         }
-                    }
-                    break;
-                case PORT_REMOVED:
-                    port = p.number();
-                    processFilterObjective(devId, port, true);
-                    break;
-                default:
-                    log.info("Unknown device event {}", event.type());
-                    break;
-            }
+                        onSourceStateChanged(devId, port, true);
+                        break;
+                    case PORT_UPDATED:
+                        port = p.number();
+                        if (getSubscriberAndDeviceInformation(devId).isPresent() &&
+                                !isUplink(devId, port) && !isConnectPoint(devId, port)) {
+                            processFilterObjective(devId, port, !event.port().isEnabled());
+                        } else if (isUplink(devId, port)) {
+                            if (event.port().isEnabled()) {
+                                provisionUplinkFlows(devId);
+                            } else {
+                                processFilterObjective(devId, port, true);
+                            }
+                        } else if (isConnectPoint(devId, port)) {
+                            if (event.port().isEnabled()) {
+                                provisionConnectPointFlows();
+                            } else {
+                                unprovisionConnectPointFlows();
+                            }
+                        }
+                        onSourceStateChanged(devId, port, event.port().isEnabled());
+                        break;
+                    case PORT_REMOVED:
+                        port = p.number();
+                        processFilterObjective(devId, port, true);
+                        onSourceStateChanged(devId, port, false);
+                        break;
+                    default:
+                        log.info("Unknown device event {}", event.type());
+                        break;
+                }
+            });
         }
 
         @Override
@@ -889,6 +894,44 @@
         }
     }
 
+    private Set<McastRoute> multicastRoutesOfIgmpProxy() {
+        Set<McastRoute> routes = new HashSet<>(); //using set to eliminate multiple entities
+        groupMemberStore.getAllGroupMemberIds().forEach(groupMemberId -> {
+            GroupMember groupMember = groupMemberStore.getGroupMember(groupMemberId);
+            if (groupMember != null) {
+                groupMember.getSourceList().forEach(source -> {
+                    //regenerate the routes created by this application
+                    routes.add(new McastRoute(source, groupMemberId.getGroupIp(), McastRoute.Type.IGMP));
+                });
+            }
+        });
+        return routes;
+    }
+
+    private void onSourceStateChanged(DeviceId deviceId, PortNumber portNumber, boolean enabled) {
+        if (!(getSource().isPresent() &&
+                getSource().get().deviceId().equals(deviceId) &&
+                getSource().get().port().equals(portNumber))) {
+            //connect point is not configured as the source
+            log.debug("{}/{} is not the source cp. Stopped processing it further", deviceId, portNumber);
+            return;
+        }
+        log.info("source device:port is {}. DeviceId={}, portNumber={}",
+                (enabled ? "enabled. Restoring the source" :
+                        "disabled. Deleting it from multicast routes"), deviceId, portNumber);
+
+        Set<McastRoute> routes = multicastRoutesOfIgmpProxy();
+        routes.forEach(route -> {
+            if (enabled) {
+                //add source to the route
+                multicastService.addSources(route, Sets.newHashSet(new ConnectPoint(deviceId, portNumber)));
+            } else {
+                //remove the source from the route
+                multicastService.removeSources(route, Sets.newHashSet(new ConnectPoint(deviceId, portNumber)));
+            }
+        });
+    }
+
     private class InternalNetworkConfigListener implements NetworkConfigListener {
 
         private void reconfigureNetwork(IgmpproxyConfig cfg) {
@@ -943,10 +986,19 @@
         }
 
         void getSourceConnectPoint(IgmpproxyConfig cfg) {
+            ConnectPoint oldSourceDevPort = sourceDeviceAndPort;
             sourceDeviceAndPort = cfg.getSourceDeviceAndPort();
             if (sourceDeviceAndPort != null) {
                 log.debug("source parameter configured to {}", sourceDeviceAndPort);
             }
+            if (oldSourceDevPort != null && !oldSourceDevPort.equals(sourceDeviceAndPort)) {
+                //source config has changed, remove the old source from multicast routes
+                onSourceStateChanged(oldSourceDevPort.deviceId(), oldSourceDevPort.port(), false);
+            }
+            if (sourceDeviceAndPort != null && !sourceDeviceAndPort.equals(oldSourceDevPort)) {
+                //add new source to the existing routes
+                onSourceStateChanged(sourceDeviceAndPort.deviceId(), sourceDeviceAndPort.port(), true);
+            }
         }
 
         public void reconfigureSsmTable(IgmpproxySsmTranslateConfig cfg) {
diff --git a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java
index a33ab15..2595911 100644
--- a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java
+++ b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java
@@ -359,6 +359,16 @@
         }
 
         @Override
+        public void removeSources(McastRoute route, Set<ConnectPoint> sources) {
+
+        }
+
+        @Override
+        public void removeSources(McastRoute route, HostId hostId, Set<ConnectPoint> connectPoints) {
+
+        }
+
+        @Override
         public void addSink(McastRoute route, HostId hostId) {
 
         }
diff --git a/pom.xml b/pom.xml
index 9a3ca63..05c033d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,7 +22,7 @@
     <parent>
         <groupId>org.onosproject</groupId>
         <artifactId>onos-dependencies</artifactId>
-        <version>2.2.2</version>
+        <version>2.2.3-b1</version>
     </parent>
 
     <groupId>org.opencord</groupId>