SEBA-642 igmpproxy should use new mcast store
- switched to onos version 1.13.9-rc4
- configuration of source through IgmpProxyConfig
- new igmpproxy config property with name "enableIgmpProvisioning" added
Change-Id: I78127d2ac1bd86f88bed0fe49d89e1e5bb6d07fa
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpManager.java b/src/main/java/org/opencord/igmpproxy/IgmpManager.java
index 0507ec1..40d390e 100644
--- a/src/main/java/org/opencord/igmpproxy/IgmpManager.java
+++ b/src/main/java/org/opencord/igmpproxy/IgmpManager.java
@@ -16,6 +16,7 @@
package org.opencord.igmpproxy;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -57,8 +58,8 @@
import org.onosproject.net.flowobjective.Objective;
import org.onosproject.net.flowobjective.ObjectiveContext;
import org.onosproject.net.flowobjective.ObjectiveError;
-import org.onosproject.net.mcast.McastRoute;
-import org.onosproject.net.mcast.MulticastRouteService;
+import org.onosproject.mcast.api.McastRoute;
+import org.onosproject.mcast.api.MulticastRouteService;
import org.onosproject.net.packet.InboundPacket;
import org.onosproject.net.packet.PacketContext;
import org.onosproject.net.packet.PacketProcessor;
@@ -70,9 +71,11 @@
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
@@ -95,6 +98,7 @@
IgmpproxySsmTranslateConfig.class;
private static final Class<McastConfig> MCAST_CONFIG_CLASS =
McastConfig.class;
+
public static Map<String, GroupMember> groupMemberMap = Maps.newConcurrentMap();
private static ApplicationId appId;
private static Map<DeviceId, AccessDeviceData> oltData = new ConcurrentHashMap<>();
@@ -110,6 +114,14 @@
private static byte igmpCos = 7;
public static boolean connectPointMode = true;
public static ConnectPoint connectPoint = null;
+ private static ConnectPoint sourceDeviceAndPort = null;
+ private static boolean enableIgmpProvisioning = false;
+
+ private static final Integer MAX_PRIORITY = 10000;
+ private static final String INSTALLED = "installed";
+ private static final String REMOVED = "removed";
+ private static final String INSTALLATION = "installation";
+ private static final String REMOVAL = "removal";
private static boolean pimSSmInterworking = false;
private static final String DEFAULT_PIMSSM_HOST = "127.0.0.1";
@@ -135,6 +147,7 @@
private Logger log = LoggerFactory.getLogger(getClass());
private ApplicationId coreAppId;
private Map<Ip4Address, Ip4Address> ssmTranslateTable = new ConcurrentHashMap<>();
+
private InternalNetworkConfigListener configListener =
new InternalNetworkConfigListener();
private DeviceListener deviceListener = new InternalDeviceListener();
@@ -155,6 +168,7 @@
return new IgmpproxySsmTranslateConfig();
}
};
+
private int maxResp = 10; //unit is 1 sec
private int keepAliveInterval = 120; //unit is 1 sec
@@ -349,11 +363,28 @@
} else {
groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, false);
}
+
+ Optional<ConnectPoint> sourceConfigured = getSource();
+ if (!sourceConfigured.isPresent()) {
+ log.warn("Unable to process IGMP Join from {} since no source " +
+ "configuration is found.", deviceId);
+ return;
+ }
+ HashSet<ConnectPoint> sourceConnectPoints = Sets.newHashSet(sourceConfigured.get());
+
StateMachine.join(deviceId, groupIp, srcIp);
groupMemberMap.put(groupMemberKey, groupMember);
groupMember.updateList(recordType, sourceList);
- groupMember.getSourceList().forEach(source -> multicastService.addSink(
- new McastRoute(source, groupIp, McastRoute.Type.IGMP), cp));
+ groupMember.getSourceList().forEach(source -> {
+ McastRoute route = new McastRoute(source, groupIp, McastRoute.Type.IGMP);
+ //add route
+ multicastService.add(route);
+ //add source to the route
+ multicastService.addSources(route, Sets.newHashSet(sourceConnectPoints));
+ //add sink to the route
+ multicastService.addSinks(route, Sets.newHashSet(cp));
+ });
+
}
groupMember.resetAllTimers();
groupMember.updateList(recordType, sourceList);
@@ -377,9 +408,9 @@
private void leaveAction(GroupMember groupMember) {
ConnectPoint cp = new ConnectPoint(groupMember.getDeviceId(), groupMember.getPortNumber());
StateMachine.leave(groupMember.getDeviceId(), groupMember.getGroupIp());
- groupMember.getSourceList().forEach(source -> multicastService.removeSink(
+ groupMember.getSourceList().forEach(source -> multicastService.removeSinks(
new McastRoute(source, groupMember.getGroupIp(),
- McastRoute.Type.IGMP), cp));
+ McastRoute.Type.IGMP), Sets.newHashSet(cp)));
groupMemberMap.remove(groupMember.getId());
}
@@ -394,35 +425,12 @@
IgmpSender.getInstance().sendIgmpPacket(ethpkt, groupMember.getDeviceId(), groupMember.getPortNumber());
}
- private void processFilterObjective(DeviceId devId, Port port, boolean remove) {
-
- //TODO migrate to packet requests when packet service uses filtering objectives
- DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
-
- builder = remove ? builder.deny() : builder.permit();
-
- FilteringObjective igmp = builder
- .withKey(Criteria.matchInPort(port.number()))
- .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
- .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_IGMP))
- .withMeta(DefaultTrafficTreatment.builder().setOutput(PortNumber.CONTROLLER).build())
- .fromApp(appId)
- .withPriority(10000)
- .add(new ObjectiveContext() {
- @Override
- public void onSuccess(Objective objective) {
- log.info("IgmpProxy filter for {} on {} installed.",
- devId, port);
- }
-
- @Override
- public void onError(Objective objective, ObjectiveError error) {
- log.info("IgmpProxy filter for {} on {} failed because {}.",
- devId, port, error);
- }
- });
-
- flowObjectiveService.filter(devId, igmp);
+ /**
+ * @return connect point of the source if configured; and empty Optional otherwise.
+ */
+ public static Optional<ConnectPoint> getSource() {
+ return sourceDeviceAndPort == null ? Optional.empty() :
+ Optional.of(sourceDeviceAndPort);
}
/**
@@ -584,6 +592,10 @@
}
private void processFilterObjective(DeviceId devId, PortNumber port, boolean remove) {
+ if (!enableIgmpProvisioning) {
+ log.debug("IGMP trap rules won't be installed since enableIgmpProvisioning flag is not set");
+ return;
+ }
//TODO migrate to packet requests when packet service uses filtering objectives
DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
@@ -595,22 +607,24 @@
.addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_IGMP))
.withMeta(DefaultTrafficTreatment.builder().setOutput(PortNumber.CONTROLLER).build())
.fromApp(appId)
- .withPriority(10000)
+ .withPriority(MAX_PRIORITY)
.add(new ObjectiveContext() {
@Override
public void onSuccess(Objective objective) {
- log.info("IgmpProxy filter for {} on {} installed.",
- devId, port);
+ log.info("Igmp filter for {} on {} {}.",
+ devId, port, (remove) ? REMOVED : INSTALLED);
}
@Override
public void onError(Objective objective, ObjectiveError error) {
- log.info("IgmpProxy filter for {} on {} failed because {}.",
- devId, port, error);
+ log.info("Igmp filter {} for device {} on port {} failed because of {}",
+ (remove) ? INSTALLATION : REMOVAL, devId, port,
+ error);
}
});
flowObjectiveService.filter(devId, igmp);
+
}
private boolean isConnectPoint(DeviceId device, PortNumber port) {
@@ -699,12 +713,7 @@
private class InternalNetworkConfigListener implements NetworkConfigListener {
private void reconfigureNetwork(IgmpproxyConfig cfg) {
- IgmpproxyConfig newCfg;
- if (cfg == null) {
- newCfg = new IgmpproxyConfig();
- } else {
- newCfg = cfg;
- }
+ IgmpproxyConfig newCfg = cfg == null ? new IgmpproxyConfig() : cfg;
unSolicitedTimeout = newCfg.unsolicitedTimeOut();
maxResp = newCfg.maxResp();
@@ -718,6 +727,7 @@
periodicQuery = newCfg.periodicQuery();
fastLeave = newCfg.fastLeave();
pimSSmInterworking = newCfg.pimSsmInterworking();
+ enableIgmpProvisioning = newCfg.enableIgmpProvisioning();
if (connectPointMode != newCfg.connectPointMode() ||
connectPoint != newCfg.connectPoint()) {
@@ -732,16 +742,24 @@
}
}
if (connectPoint != null) {
- log.info("connect point :" + connectPoint.toString());
+ log.info("connect point : {}", connectPoint);
}
- log.info(" mode: " + connectPointMode);
+ log.info("mode: {}", connectPointMode);
+
+ getSourceConnectPoint(newCfg);
IgmpSender.getInstance().setIgmpCos(igmpCos);
IgmpSender.getInstance().setMaxResp(maxResp);
IgmpSender.getInstance().setMvlan(mvlan);
IgmpSender.getInstance().setWithRADownlink(withRADownlink);
IgmpSender.getInstance().setWithRAUplink(withRAUplink);
+ }
+ void getSourceConnectPoint(IgmpproxyConfig cfg) {
+ sourceDeviceAndPort = cfg.getSourceDeviceAndPort();
+ if (sourceDeviceAndPort != null) {
+ log.debug("source parameter configured to {}", sourceDeviceAndPort);
+ }
}
public void reconfigureSsmTable(IgmpproxySsmTranslateConfig cfg) {
@@ -750,7 +768,7 @@
}
Collection<McastRoute> translations = cfg.getSsmTranslations();
for (McastRoute route : translations) {
- ssmTranslateTable.put(route.group().getIp4Address(), route.source().getIp4Address());
+ ssmTranslateTable.put(route.group().getIp4Address(), route.source().get().getIp4Address());
}
}
@@ -772,6 +790,7 @@
if (event.configClass().equals(IGMPPROXY_CONFIG_CLASS)) {
IgmpproxyConfig config = networkConfig.getConfig(appId, IGMPPROXY_CONFIG_CLASS);
if (config != null) {
+ log.info("igmpproxy config received. {}", config);
reconfigureNetwork(config);
}
}