SEBA-642 igmpproxy should use new mcast store
- switched to onos version 1.13.9-rc4
- configuration of source through IgmpProxyConfig
- new igmpproxy config property with name "enableIgmpProvisioning" added
Change-Id: I78127d2ac1bd86f88bed0fe49d89e1e5bb6d07fa
diff --git a/.gitignore b/.gitignore
index 8bc2e12..321a2aa 100644
--- a/.gitignore
+++ b/.gitignore
@@ -4,3 +4,4 @@
.settings
*.swp
*.iml
+.idea/
\ No newline at end of file
diff --git a/app.xml b/app.xml
index d4abe21..a7b8e52 100644
--- a/app.xml
+++ b/app.xml
@@ -16,7 +16,7 @@
-->
<app name="org.opencord.igmpproxy" origin="Nokia" version="${project.version}"
featuresRepo="mvn:${project.groupId}/${project.artifactId}/${project.version}/xml/features"
- features="${project.artifactId}" apps="org.opencord.mcast">
+ features="${project.artifactId}">
<description>IGMP PROXY APP</description>
<artifact>mvn:${project.groupId}/${project.artifactId}/${project.version}</artifact>
</app>
diff --git a/pom.xml b/pom.xml
index 5fbedac..1d115a0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.onosproject</groupId>
<artifactId>onos-dependencies</artifactId>
- <version>1.13.1</version>
+ <version>1.13.9-rc4</version>
<relativePath></relativePath>
</parent>
@@ -35,14 +35,14 @@
<properties>
<onos.app.name>org.opencord.igmpproxy</onos.app.name>
- <onos.version>1.13.1</onos.version>
+ <onos.version>1.13.9-rc4</onos.version>
<onos.app.category>Traffic Steering</onos.app.category>
<onos.app.title>IGMP proxy app</onos.app.title>
<onos.app.url>http://opencord.org</onos.app.url>
<onos.app.readme>IGMP implementation.</onos.app.readme>
<onos.app.requires>
org.opencord.config,
- org.opencord.mcast
+ org.onosproject.mcast
</onos.app.requires>
<cord.config.version>1.4.0</cord.config.version>
</properties>
@@ -95,7 +95,13 @@
<dependency>
<groupId>org.opencord</groupId>
<artifactId>cord-config</artifactId>
- <version>${cord.config.version}</version>
+ <version>${cord.config.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-apps-mcast-api</artifactId>
+ <version>${onos.version}</version>
</dependency>
</dependencies>
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpManager.java b/src/main/java/org/opencord/igmpproxy/IgmpManager.java
index 0507ec1..40d390e 100644
--- a/src/main/java/org/opencord/igmpproxy/IgmpManager.java
+++ b/src/main/java/org/opencord/igmpproxy/IgmpManager.java
@@ -16,6 +16,7 @@
package org.opencord.igmpproxy;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -57,8 +58,8 @@
import org.onosproject.net.flowobjective.Objective;
import org.onosproject.net.flowobjective.ObjectiveContext;
import org.onosproject.net.flowobjective.ObjectiveError;
-import org.onosproject.net.mcast.McastRoute;
-import org.onosproject.net.mcast.MulticastRouteService;
+import org.onosproject.mcast.api.McastRoute;
+import org.onosproject.mcast.api.MulticastRouteService;
import org.onosproject.net.packet.InboundPacket;
import org.onosproject.net.packet.PacketContext;
import org.onosproject.net.packet.PacketProcessor;
@@ -70,9 +71,11 @@
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
@@ -95,6 +98,7 @@
IgmpproxySsmTranslateConfig.class;
private static final Class<McastConfig> MCAST_CONFIG_CLASS =
McastConfig.class;
+
public static Map<String, GroupMember> groupMemberMap = Maps.newConcurrentMap();
private static ApplicationId appId;
private static Map<DeviceId, AccessDeviceData> oltData = new ConcurrentHashMap<>();
@@ -110,6 +114,14 @@
private static byte igmpCos = 7;
public static boolean connectPointMode = true;
public static ConnectPoint connectPoint = null;
+ private static ConnectPoint sourceDeviceAndPort = null;
+ private static boolean enableIgmpProvisioning = false;
+
+ private static final Integer MAX_PRIORITY = 10000;
+ private static final String INSTALLED = "installed";
+ private static final String REMOVED = "removed";
+ private static final String INSTALLATION = "installation";
+ private static final String REMOVAL = "removal";
private static boolean pimSSmInterworking = false;
private static final String DEFAULT_PIMSSM_HOST = "127.0.0.1";
@@ -135,6 +147,7 @@
private Logger log = LoggerFactory.getLogger(getClass());
private ApplicationId coreAppId;
private Map<Ip4Address, Ip4Address> ssmTranslateTable = new ConcurrentHashMap<>();
+
private InternalNetworkConfigListener configListener =
new InternalNetworkConfigListener();
private DeviceListener deviceListener = new InternalDeviceListener();
@@ -155,6 +168,7 @@
return new IgmpproxySsmTranslateConfig();
}
};
+
private int maxResp = 10; //unit is 1 sec
private int keepAliveInterval = 120; //unit is 1 sec
@@ -349,11 +363,28 @@
} else {
groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, false);
}
+
+ Optional<ConnectPoint> sourceConfigured = getSource();
+ if (!sourceConfigured.isPresent()) {
+ log.warn("Unable to process IGMP Join from {} since no source " +
+ "configuration is found.", deviceId);
+ return;
+ }
+ HashSet<ConnectPoint> sourceConnectPoints = Sets.newHashSet(sourceConfigured.get());
+
StateMachine.join(deviceId, groupIp, srcIp);
groupMemberMap.put(groupMemberKey, groupMember);
groupMember.updateList(recordType, sourceList);
- groupMember.getSourceList().forEach(source -> multicastService.addSink(
- new McastRoute(source, groupIp, McastRoute.Type.IGMP), cp));
+ groupMember.getSourceList().forEach(source -> {
+ McastRoute route = new McastRoute(source, groupIp, McastRoute.Type.IGMP);
+ //add route
+ multicastService.add(route);
+ //add source to the route
+ multicastService.addSources(route, Sets.newHashSet(sourceConnectPoints));
+ //add sink to the route
+ multicastService.addSinks(route, Sets.newHashSet(cp));
+ });
+
}
groupMember.resetAllTimers();
groupMember.updateList(recordType, sourceList);
@@ -377,9 +408,9 @@
private void leaveAction(GroupMember groupMember) {
ConnectPoint cp = new ConnectPoint(groupMember.getDeviceId(), groupMember.getPortNumber());
StateMachine.leave(groupMember.getDeviceId(), groupMember.getGroupIp());
- groupMember.getSourceList().forEach(source -> multicastService.removeSink(
+ groupMember.getSourceList().forEach(source -> multicastService.removeSinks(
new McastRoute(source, groupMember.getGroupIp(),
- McastRoute.Type.IGMP), cp));
+ McastRoute.Type.IGMP), Sets.newHashSet(cp)));
groupMemberMap.remove(groupMember.getId());
}
@@ -394,35 +425,12 @@
IgmpSender.getInstance().sendIgmpPacket(ethpkt, groupMember.getDeviceId(), groupMember.getPortNumber());
}
- private void processFilterObjective(DeviceId devId, Port port, boolean remove) {
-
- //TODO migrate to packet requests when packet service uses filtering objectives
- DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
-
- builder = remove ? builder.deny() : builder.permit();
-
- FilteringObjective igmp = builder
- .withKey(Criteria.matchInPort(port.number()))
- .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
- .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_IGMP))
- .withMeta(DefaultTrafficTreatment.builder().setOutput(PortNumber.CONTROLLER).build())
- .fromApp(appId)
- .withPriority(10000)
- .add(new ObjectiveContext() {
- @Override
- public void onSuccess(Objective objective) {
- log.info("IgmpProxy filter for {} on {} installed.",
- devId, port);
- }
-
- @Override
- public void onError(Objective objective, ObjectiveError error) {
- log.info("IgmpProxy filter for {} on {} failed because {}.",
- devId, port, error);
- }
- });
-
- flowObjectiveService.filter(devId, igmp);
+ /**
+ * @return connect point of the source if configured; and empty Optional otherwise.
+ */
+ public static Optional<ConnectPoint> getSource() {
+ return sourceDeviceAndPort == null ? Optional.empty() :
+ Optional.of(sourceDeviceAndPort);
}
/**
@@ -584,6 +592,10 @@
}
private void processFilterObjective(DeviceId devId, PortNumber port, boolean remove) {
+ if (!enableIgmpProvisioning) {
+ log.debug("IGMP trap rules won't be installed since enableIgmpProvisioning flag is not set");
+ return;
+ }
//TODO migrate to packet requests when packet service uses filtering objectives
DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
@@ -595,22 +607,24 @@
.addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_IGMP))
.withMeta(DefaultTrafficTreatment.builder().setOutput(PortNumber.CONTROLLER).build())
.fromApp(appId)
- .withPriority(10000)
+ .withPriority(MAX_PRIORITY)
.add(new ObjectiveContext() {
@Override
public void onSuccess(Objective objective) {
- log.info("IgmpProxy filter for {} on {} installed.",
- devId, port);
+ log.info("Igmp filter for {} on {} {}.",
+ devId, port, (remove) ? REMOVED : INSTALLED);
}
@Override
public void onError(Objective objective, ObjectiveError error) {
- log.info("IgmpProxy filter for {} on {} failed because {}.",
- devId, port, error);
+ log.info("Igmp filter {} for device {} on port {} failed because of {}",
+ (remove) ? INSTALLATION : REMOVAL, devId, port,
+ error);
}
});
flowObjectiveService.filter(devId, igmp);
+
}
private boolean isConnectPoint(DeviceId device, PortNumber port) {
@@ -699,12 +713,7 @@
private class InternalNetworkConfigListener implements NetworkConfigListener {
private void reconfigureNetwork(IgmpproxyConfig cfg) {
- IgmpproxyConfig newCfg;
- if (cfg == null) {
- newCfg = new IgmpproxyConfig();
- } else {
- newCfg = cfg;
- }
+ IgmpproxyConfig newCfg = cfg == null ? new IgmpproxyConfig() : cfg;
unSolicitedTimeout = newCfg.unsolicitedTimeOut();
maxResp = newCfg.maxResp();
@@ -718,6 +727,7 @@
periodicQuery = newCfg.periodicQuery();
fastLeave = newCfg.fastLeave();
pimSSmInterworking = newCfg.pimSsmInterworking();
+ enableIgmpProvisioning = newCfg.enableIgmpProvisioning();
if (connectPointMode != newCfg.connectPointMode() ||
connectPoint != newCfg.connectPoint()) {
@@ -732,16 +742,24 @@
}
}
if (connectPoint != null) {
- log.info("connect point :" + connectPoint.toString());
+ log.info("connect point : {}", connectPoint);
}
- log.info(" mode: " + connectPointMode);
+ log.info("mode: {}", connectPointMode);
+
+ getSourceConnectPoint(newCfg);
IgmpSender.getInstance().setIgmpCos(igmpCos);
IgmpSender.getInstance().setMaxResp(maxResp);
IgmpSender.getInstance().setMvlan(mvlan);
IgmpSender.getInstance().setWithRADownlink(withRADownlink);
IgmpSender.getInstance().setWithRAUplink(withRAUplink);
+ }
+ void getSourceConnectPoint(IgmpproxyConfig cfg) {
+ sourceDeviceAndPort = cfg.getSourceDeviceAndPort();
+ if (sourceDeviceAndPort != null) {
+ log.debug("source parameter configured to {}", sourceDeviceAndPort);
+ }
}
public void reconfigureSsmTable(IgmpproxySsmTranslateConfig cfg) {
@@ -750,7 +768,7 @@
}
Collection<McastRoute> translations = cfg.getSsmTranslations();
for (McastRoute route : translations) {
- ssmTranslateTable.put(route.group().getIp4Address(), route.source().getIp4Address());
+ ssmTranslateTable.put(route.group().getIp4Address(), route.source().get().getIp4Address());
}
}
@@ -772,6 +790,7 @@
if (event.configClass().equals(IGMPPROXY_CONFIG_CLASS)) {
IgmpproxyConfig config = networkConfig.getConfig(appId, IGMPPROXY_CONFIG_CLASS);
if (config != null) {
+ log.info("igmpproxy config received. {}", config);
reconfigureNetwork(config);
}
}
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpproxyConfig.java b/src/main/java/org/opencord/igmpproxy/IgmpproxyConfig.java
index efd39ba..2e36a50 100644
--- a/src/main/java/org/opencord/igmpproxy/IgmpproxyConfig.java
+++ b/src/main/java/org/opencord/igmpproxy/IgmpproxyConfig.java
@@ -37,6 +37,7 @@
protected static final String DEFAULT_WITH_RA_DOWNLINK = "true";
private static final Boolean DEFAULT_CONNECT_POINT_MODE = true;
private static final Boolean DEFAULT_PIMSSM_INTERWORKING = false;
+ private static final Boolean DEFAULT_IGMP_PROVISIONING_SUPPORT = Boolean.FALSE;
protected static final String CONNECT_POINT_MODE = "globalConnectPointMode";
protected static final String CONNECT_POINT = "globalConnectPoint";
@@ -52,6 +53,9 @@
private static final String WITH_RA_UPLINK = "withRAUpLink";
private static final String WITH_RA_DOWN_LINK = "withRADownLink";
private static final String PIMSSM_INTERWORKING = "pimSSmInterworking";
+ private static final String SOURCE_DEV_PORT = "sourceDeviceAndPort";
+ private static final String ENABLE_IGMP_PROVISIONING = "enableIgmpProvisioning";
+
/**
* Gets the value of a string property, protecting for an empty
@@ -181,4 +185,24 @@
}
return Boolean.parseBoolean(getStringProperty(PIMSSM_INTERWORKING, DEFAULT_PIMSSM_INTERWORKING.toString()));
}
+
+ public ConnectPoint getSourceDeviceAndPort() {
+ if (object == null || object.path(SOURCE_DEV_PORT) == null) {
+ return null;
+ }
+
+ try {
+ return ConnectPoint.deviceConnectPoint(getStringProperty(SOURCE_DEV_PORT, ""));
+ } catch (Exception ex) {
+ return null;
+ }
+ }
+
+ public boolean enableIgmpProvisioning() {
+ if (object == null || object.path(ENABLE_IGMP_PROVISIONING) == null) {
+ return DEFAULT_IGMP_PROVISIONING_SUPPORT;
+ }
+ return Boolean.parseBoolean(getStringProperty(ENABLE_IGMP_PROVISIONING,
+ DEFAULT_IGMP_PROVISIONING_SUPPORT.toString()));
+ }
}
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpproxySsmTranslateConfig.java b/src/main/java/org/opencord/igmpproxy/IgmpproxySsmTranslateConfig.java
index 9c6da25..8565a1b 100644
--- a/src/main/java/org/opencord/igmpproxy/IgmpproxySsmTranslateConfig.java
+++ b/src/main/java/org/opencord/igmpproxy/IgmpproxySsmTranslateConfig.java
@@ -19,8 +19,8 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.onlab.packet.IpAddress;
import org.onosproject.core.ApplicationId;
+import org.onosproject.mcast.api.McastRoute;
import org.onosproject.net.config.Config;
-import org.onosproject.net.mcast.McastRoute;
import java.util.ArrayList;
import java.util.List;