SEBA-987 IgmpProxy should restore/remove sources on port up/down events.
Change-Id: I3b71c851776e9d3a515fa94bd6704d1640966f77
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java
index edd23f6..ed3dad8 100644
--- a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java
@@ -822,65 +822,70 @@
private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
- DeviceId devId = event.subject().id();
- Port p = event.port();
- if (getSubscriberAndDeviceInformation(devId).isEmpty() &&
- !(p != null && isConnectPoint(devId, p.number()))) {
- return;
- }
- PortNumber port;
+ eventExecutor.execute(() -> {
+ DeviceId devId = event.subject().id();
+ Port p = event.port();
- switch (event.type()) {
+ if (!igmpLeadershipService.isLocalLeader(devId)) {
+ return;
+ }
+ if (getSubscriberAndDeviceInformation(devId).isEmpty() &&
+ !(p != null && isConnectPoint(devId, p.number()))) {
+ return;
+ }
+ PortNumber port;
- case DEVICE_ADDED:
- case DEVICE_UPDATED:
- case DEVICE_REMOVED:
- case DEVICE_SUSPENDED:
- case DEVICE_AVAILABILITY_CHANGED:
- case PORT_STATS_UPDATED:
- break;
- case PORT_ADDED:
- port = p.number();
- if (getSubscriberAndDeviceInformation(devId).isPresent() &&
- !isUplink(devId, port) && !isConnectPoint(devId, port)) {
- processFilterObjective(devId, port, false);
- } else if (isUplink(devId, port)) {
- provisionUplinkFlows();
- } else if (isConnectPoint(devId, port)) {
- provisionConnectPointFlows();
- }
- break;
- case PORT_UPDATED:
- port = p.number();
- if (getSubscriberAndDeviceInformation(devId).isPresent() &&
- !isUplink(devId, port) && !isConnectPoint(devId, port)) {
- if (event.port().isEnabled()) {
+ switch (event.type()) {
+
+ case DEVICE_ADDED:
+ case DEVICE_UPDATED:
+ case DEVICE_REMOVED:
+ case DEVICE_SUSPENDED:
+ case DEVICE_AVAILABILITY_CHANGED:
+ case PORT_STATS_UPDATED:
+ break;
+ case PORT_ADDED:
+ port = p.number();
+ if (getSubscriberAndDeviceInformation(devId).isPresent() &&
+ !isUplink(devId, port) && !isConnectPoint(devId, port)) {
processFilterObjective(devId, port, false);
- } else {
- processFilterObjective(devId, port, true);
- }
- } else if (isUplink(devId, port)) {
- if (event.port().isEnabled()) {
- provisionUplinkFlows(devId);
- } else {
- processFilterObjective(devId, port, true);
- }
- } else if (isConnectPoint(devId, port)) {
- if (event.port().isEnabled()) {
+ } else if (isUplink(devId, port)) {
+ provisionUplinkFlows();
+ } else if (isConnectPoint(devId, port)) {
provisionConnectPointFlows();
- } else {
- unprovisionConnectPointFlows();
}
- }
- break;
- case PORT_REMOVED:
- port = p.number();
- processFilterObjective(devId, port, true);
- break;
- default:
- log.info("Unknown device event {}", event.type());
- break;
- }
+ onSourceStateChanged(devId, port, true);
+ break;
+ case PORT_UPDATED:
+ port = p.number();
+ if (getSubscriberAndDeviceInformation(devId).isPresent() &&
+ !isUplink(devId, port) && !isConnectPoint(devId, port)) {
+ processFilterObjective(devId, port, !event.port().isEnabled());
+ } else if (isUplink(devId, port)) {
+ if (event.port().isEnabled()) {
+ provisionUplinkFlows(devId);
+ } else {
+ processFilterObjective(devId, port, true);
+ }
+ } else if (isConnectPoint(devId, port)) {
+ if (event.port().isEnabled()) {
+ provisionConnectPointFlows();
+ } else {
+ unprovisionConnectPointFlows();
+ }
+ }
+ onSourceStateChanged(devId, port, event.port().isEnabled());
+ break;
+ case PORT_REMOVED:
+ port = p.number();
+ processFilterObjective(devId, port, true);
+ onSourceStateChanged(devId, port, false);
+ break;
+ default:
+ log.info("Unknown device event {}", event.type());
+ break;
+ }
+ });
}
@Override
@@ -889,6 +894,44 @@
}
}
+ private Set<McastRoute> multicastRoutesOfIgmpProxy() {
+ Set<McastRoute> routes = new HashSet<>(); //using set to eliminate multiple entities
+ groupMemberStore.getAllGroupMemberIds().forEach(groupMemberId -> {
+ GroupMember groupMember = groupMemberStore.getGroupMember(groupMemberId);
+ if (groupMember != null) {
+ groupMember.getSourceList().forEach(source -> {
+ //regenerate the routes created by this application
+ routes.add(new McastRoute(source, groupMemberId.getGroupIp(), McastRoute.Type.IGMP));
+ });
+ }
+ });
+ return routes;
+ }
+
+ private void onSourceStateChanged(DeviceId deviceId, PortNumber portNumber, boolean enabled) {
+ if (!(getSource().isPresent() &&
+ getSource().get().deviceId().equals(deviceId) &&
+ getSource().get().port().equals(portNumber))) {
+ //connect point is not configured as the source
+ log.debug("{}/{} is not the source cp. Stopped processing it further", deviceId, portNumber);
+ return;
+ }
+ log.info("source device:port is {}. DeviceId={}, portNumber={}",
+ (enabled ? "enabled. Restoring the source" :
+ "disabled. Deleting it from multicast routes"), deviceId, portNumber);
+
+ Set<McastRoute> routes = multicastRoutesOfIgmpProxy();
+ routes.forEach(route -> {
+ if (enabled) {
+ //add source to the route
+ multicastService.addSources(route, Sets.newHashSet(new ConnectPoint(deviceId, portNumber)));
+ } else {
+ //remove the source from the route
+ multicastService.removeSources(route, Sets.newHashSet(new ConnectPoint(deviceId, portNumber)));
+ }
+ });
+ }
+
private class InternalNetworkConfigListener implements NetworkConfigListener {
private void reconfigureNetwork(IgmpproxyConfig cfg) {
@@ -943,10 +986,19 @@
}
void getSourceConnectPoint(IgmpproxyConfig cfg) {
+ ConnectPoint oldSourceDevPort = sourceDeviceAndPort;
sourceDeviceAndPort = cfg.getSourceDeviceAndPort();
if (sourceDeviceAndPort != null) {
log.debug("source parameter configured to {}", sourceDeviceAndPort);
}
+ if (oldSourceDevPort != null && !oldSourceDevPort.equals(sourceDeviceAndPort)) {
+ //source config has changed, remove the old source from multicast routes
+ onSourceStateChanged(oldSourceDevPort.deviceId(), oldSourceDevPort.port(), false);
+ }
+ if (sourceDeviceAndPort != null && !sourceDeviceAndPort.equals(oldSourceDevPort)) {
+ //add new source to the existing routes
+ onSourceStateChanged(sourceDeviceAndPort.deviceId(), sourceDeviceAndPort.port(), true);
+ }
}
public void reconfigureSsmTable(IgmpproxySsmTranslateConfig cfg) {
diff --git a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java
index a33ab15..2595911 100644
--- a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java
+++ b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java
@@ -359,6 +359,16 @@
}
@Override
+ public void removeSources(McastRoute route, Set<ConnectPoint> sources) {
+
+ }
+
+ @Override
+ public void removeSources(McastRoute route, HostId hostId, Set<ConnectPoint> connectPoints) {
+
+ }
+
+ @Override
public void addSink(McastRoute route, HostId hostId) {
}
diff --git a/pom.xml b/pom.xml
index 9a3ca63..05c033d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.onosproject</groupId>
<artifactId>onos-dependencies</artifactId>
- <version>2.2.2</version>
+ <version>2.2.3-b1</version>
</parent>
<groupId>org.opencord</groupId>