SEBA-644 igmpproxy should forward IGMP to BNG on a per pod basis or
a per OLT basis.
- "igmpOnPodBasis" config property added. If true ongoing IGMP
messages are evaluated on PON basis; they are evaluated
OLT basis otherwise.
Change-Id: Ic931598deb99f3d09a5bc5d8117de0c56e92a039
diff --git a/pom.xml b/pom.xml
index 1d115a0..a167628 100644
--- a/pom.xml
+++ b/pom.xml
@@ -110,35 +110,16 @@
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
- <version>3.0.1</version>
+ <version>4.1.0</version>
<extensions>true</extensions>
+ <inherited>true</inherited>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
- <version>2.5.1</version>
+ <version>3.8.0</version>
<configuration>
- <source>1.8</source>
- <target>1.8</target>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.felix</groupId>
- <artifactId>maven-scr-plugin</artifactId>
- <version>1.21.0</version>
- <executions>
- <execution>
- <id>generate-scr-srcdescriptor</id>
- <goals>
- <goal>scr</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <supportedProjectTypes>
- <supportedProjectType>bundle</supportedProjectType>
- <supportedProjectType>war</supportedProjectType>
- </supportedProjectTypes>
+ <release>11</release>
</configuration>
</plugin>
<plugin>
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpManager.java b/src/main/java/org/opencord/igmpproxy/IgmpManager.java
index 40d390e..826f260 100644
--- a/src/main/java/org/opencord/igmpproxy/IgmpManager.java
+++ b/src/main/java/org/opencord/igmpproxy/IgmpManager.java
@@ -79,10 +79,14 @@
import java.util.Set;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+
/**
* Igmp process application, use proxy mode, support first join/ last leave , fast leave
* period query and keep alive, packet out igmp message to uplink port features.
@@ -116,6 +120,7 @@
public static ConnectPoint connectPoint = null;
private static ConnectPoint sourceDeviceAndPort = null;
private static boolean enableIgmpProvisioning = false;
+ private static boolean igmpOnPodBasis = false;
private static final Integer MAX_PRIORITY = 10000;
private static final String INSTALLED = "installed";
@@ -172,6 +177,8 @@
private int maxResp = 10; //unit is 1 sec
private int keepAliveInterval = 120; //unit is 1 sec
+ private ExecutorService eventExecutor;
+
public static int getUnsolicitedTimeout() {
return unSolicitedTimeout;
}
@@ -226,6 +233,8 @@
}
deviceService.addListener(deviceListener);
scheduledExecutorService.scheduleAtFixedRate(new IgmpProxyTimerTask(), 0, 1000, TimeUnit.MILLISECONDS);
+ eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("cord/igmpproxy",
+ "events-igmp-%d", log));
log.info("Started");
}
@@ -233,6 +242,7 @@
@Deactivate
protected void deactivate() {
scheduledExecutorService.shutdown();
+ eventExecutor.shutdown();
// de-register and null our handler
networkConfig.removeListener(configListener);
@@ -439,93 +449,96 @@
private class IgmpPacketProcessor implements PacketProcessor {
@Override
public void process(PacketContext context) {
+ eventExecutor.execute(() -> {
+ try {
+ InboundPacket pkt = context.inPacket();
+ Ethernet ethPkt = pkt.parsed();
+ if (ethPkt == null) {
+ return;
+ }
- try {
- InboundPacket pkt = context.inPacket();
- Ethernet ethPkt = pkt.parsed();
- if (ethPkt == null) {
- return;
- }
+ if (ethPkt.getEtherType() != Ethernet.TYPE_IPV4) {
+ return;
+ }
- if (ethPkt.getEtherType() != Ethernet.TYPE_IPV4) {
- return;
- }
+ IPv4 ipv4Pkt = (IPv4) ethPkt.getPayload();
- IPv4 ipv4Pkt = (IPv4) ethPkt.getPayload();
+ if (ipv4Pkt.getProtocol() != IPv4.PROTOCOL_IGMP) {
+ return;
+ }
- if (ipv4Pkt.getProtocol() != IPv4.PROTOCOL_IGMP) {
- return;
- }
+ short vlan = ethPkt.getVlanID();
+ DeviceId deviceId = pkt.receivedFrom().deviceId();
- short vlan = ethPkt.getVlanID();
- DeviceId deviceId = pkt.receivedFrom().deviceId();
+ if (oltData.get(deviceId) == null &&
+ !isConnectPoint(deviceId, pkt.receivedFrom().port())) {
+ log.error("Device not registered in netcfg :" + deviceId.toString());
+ return;
+ }
- if (oltData.get(deviceId) == null &&
- !isConnectPoint(deviceId, pkt.receivedFrom().port())) {
- log.error("Device not registered in netcfg :" + deviceId.toString());
- return;
- }
+ IGMP igmp = (IGMP) ipv4Pkt.getPayload();
+ switch (igmp.getIgmpType()) {
+ case IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY:
+ //Discard Query from OLT’s non-uplink port’s
+ if (!pkt.receivedFrom().port().equals(getDeviceUplink(deviceId))) {
+ if (isConnectPoint(deviceId, pkt.receivedFrom().port())) {
+ log.info("IGMP Picked up query from connectPoint");
+ //OK to process packet
+ processIgmpConnectPointQuery((IGMPQuery) igmp.getGroups().get(0),
+ pkt.receivedFrom(),
+ 0xff & igmp.getMaxRespField());
+ break;
+ } else {
+ //Not OK to process packet
+ log.warn("IGMP Picked up query from non-uplink port");
+ return;
+ }
+ }
- IGMP igmp = (IGMP) ipv4Pkt.getPayload();
- switch (igmp.getIgmpType()) {
- case IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY:
- //Discard Query from OLT’s non-uplink port’s
- if (!pkt.receivedFrom().port().equals(getDeviceUplink(deviceId))) {
- if (isConnectPoint(deviceId, pkt.receivedFrom().port())) {
- log.info("IGMP Picked up query from connectPoint");
- //OK to process packet
- processIgmpConnectPointQuery((IGMPQuery) igmp.getGroups().get(0), pkt.receivedFrom(),
- 0xff & igmp.getMaxRespField());
- break;
- } else {
- //Not OK to process packet
- log.warn("IGMP Picked up query from non-uplink port");
+ processIgmpQuery((IGMPQuery) igmp.getGroups().get(0), pkt.receivedFrom(),
+ 0xff & igmp.getMaxRespField());
+ break;
+ case IGMP.TYPE_IGMPV1_MEMBERSHIP_REPORT:
+ log.debug("IGMP version 1 message types are not currently supported.");
+ break;
+ case IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT:
+ case IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT:
+ case IGMP.TYPE_IGMPV2_LEAVE_GROUP:
+ //Discard join/leave from OLT’s uplink port’s
+ if (pkt.receivedFrom().port().equals(getDeviceUplink(deviceId)) ||
+ isConnectPoint(deviceId, pkt.receivedFrom().port())) {
+ log.info("IGMP Picked up join/leave from uplink/connectPoint port");
return;
}
- }
- processIgmpQuery((IGMPQuery) igmp.getGroups().get(0), pkt.receivedFrom(),
- 0xff & igmp.getMaxRespField());
- break;
- case IGMP.TYPE_IGMPV1_MEMBERSHIP_REPORT:
- log.debug("IGMP version 1 message types are not currently supported.");
- break;
- case IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT:
- case IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT:
- case IGMP.TYPE_IGMPV2_LEAVE_GROUP:
- //Discard join/leave from OLT’s uplink port’s
- if (pkt.receivedFrom().port().equals(getDeviceUplink(deviceId)) ||
- isConnectPoint(deviceId, pkt.receivedFrom().port())) {
- log.info("IGMP Picked up join/leave from uplink/connectPoint port");
- return;
- }
-
- Iterator<IGMPGroup> itr = igmp.getGroups().iterator();
- while (itr.hasNext()) {
- IGMPGroup group = itr.next();
- if (group instanceof IGMPMembership) {
- processIgmpReport((IGMPMembership) group, VlanId.vlanId(vlan),
- pkt.receivedFrom(), igmp.getIgmpType());
- } else if (group instanceof IGMPQuery) {
- IGMPMembership mgroup;
- mgroup = new IGMPMembership(group.getGaddr().getIp4Address());
- mgroup.setRecordType(igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT ?
- IGMPMembership.MODE_IS_EXCLUDE : IGMPMembership.MODE_IS_INCLUDE);
- processIgmpReport(mgroup, VlanId.vlanId(vlan),
- pkt.receivedFrom(), igmp.getIgmpType());
+ Iterator<IGMPGroup> itr = igmp.getGroups().iterator();
+ while (itr.hasNext()) {
+ IGMPGroup group = itr.next();
+ if (group instanceof IGMPMembership) {
+ processIgmpReport((IGMPMembership) group, VlanId.vlanId(vlan),
+ pkt.receivedFrom(), igmp.getIgmpType());
+ } else if (group instanceof IGMPQuery) {
+ IGMPMembership mgroup;
+ mgroup = new IGMPMembership(group.getGaddr().getIp4Address());
+ mgroup.setRecordType(igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT ?
+ IGMPMembership.MODE_IS_EXCLUDE :
+ IGMPMembership.MODE_IS_INCLUDE);
+ processIgmpReport(mgroup, VlanId.vlanId(vlan),
+ pkt.receivedFrom(), igmp.getIgmpType());
+ }
}
- }
- break;
+ break;
- default:
- log.info("wrong IGMP v3 type:" + igmp.getIgmpType());
- break;
+ default:
+ log.info("wrong IGMP v3 type:" + igmp.getIgmpType());
+ break;
+ }
+
+ } catch (Exception ex) {
+ log.error("igmp process error : {} ", ex);
+ ex.printStackTrace();
}
-
- } catch (Exception ex) {
- log.error("igmp process error : {} ", ex);
- ex.printStackTrace();
- }
+ });
}
}
@@ -591,6 +604,10 @@
}
}
+ public static boolean isIgmpOnPodBasis() {
+ return igmpOnPodBasis;
+ }
+
private void processFilterObjective(DeviceId devId, PortNumber port, boolean remove) {
if (!enableIgmpProvisioning) {
log.debug("IGMP trap rules won't be installed since enableIgmpProvisioning flag is not set");
@@ -728,6 +745,7 @@
fastLeave = newCfg.fastLeave();
pimSSmInterworking = newCfg.pimSsmInterworking();
enableIgmpProvisioning = newCfg.enableIgmpProvisioning();
+ igmpOnPodBasis = newCfg.igmpOnPodBasis();
if (connectPointMode != newCfg.connectPointMode() ||
connectPoint != newCfg.connectPoint()) {
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpproxyConfig.java b/src/main/java/org/opencord/igmpproxy/IgmpproxyConfig.java
index 2e36a50..b59c2c1 100644
--- a/src/main/java/org/opencord/igmpproxy/IgmpproxyConfig.java
+++ b/src/main/java/org/opencord/igmpproxy/IgmpproxyConfig.java
@@ -38,6 +38,7 @@
private static final Boolean DEFAULT_CONNECT_POINT_MODE = true;
private static final Boolean DEFAULT_PIMSSM_INTERWORKING = false;
private static final Boolean DEFAULT_IGMP_PROVISIONING_SUPPORT = Boolean.FALSE;
+ private static final Boolean DEFAULT_IGMP_ON_POD_BASIS = Boolean.FALSE;
protected static final String CONNECT_POINT_MODE = "globalConnectPointMode";
protected static final String CONNECT_POINT = "globalConnectPoint";
@@ -55,7 +56,7 @@
private static final String PIMSSM_INTERWORKING = "pimSSmInterworking";
private static final String SOURCE_DEV_PORT = "sourceDeviceAndPort";
private static final String ENABLE_IGMP_PROVISIONING = "enableIgmpProvisioning";
-
+ private static final String IGMP_ON_POD_BASIS = "igmpOnPodBasis";
/**
* Gets the value of a string property, protecting for an empty
@@ -205,4 +206,12 @@
return Boolean.parseBoolean(getStringProperty(ENABLE_IGMP_PROVISIONING,
DEFAULT_IGMP_PROVISIONING_SUPPORT.toString()));
}
+
+ public boolean igmpOnPodBasis() {
+ if (object == null || object.path(IGMP_ON_POD_BASIS) == null) {
+ return DEFAULT_IGMP_ON_POD_BASIS;
+ }
+ return Boolean.parseBoolean(getStringProperty(IGMP_ON_POD_BASIS,
+ DEFAULT_IGMP_ON_POD_BASIS.toString()));
+ }
}
diff --git a/src/main/java/org/opencord/igmpproxy/SingleStateMachine.java b/src/main/java/org/opencord/igmpproxy/SingleStateMachine.java
index 1675069..657b683 100644
--- a/src/main/java/org/opencord/igmpproxy/SingleStateMachine.java
+++ b/src/main/java/org/opencord/igmpproxy/SingleStateMachine.java
@@ -20,6 +20,7 @@
import org.onosproject.net.DeviceId;
import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* State machine for single IGMP group member. The state machine is implemented on
@@ -39,7 +40,7 @@
private Ip4Address groupIp;
private Ip4Address srcIp;
- private int count = DEFAULT_COUNT;
+ private AtomicInteger count = new AtomicInteger(DEFAULT_COUNT);
private int timerId = IgmpTimer.INVALID_TIMER_ID;
private int timeOut = DEFAULT_MAX_RESP;
private State[] states =
@@ -63,18 +64,21 @@
this.srcIp = src;
}
+ public Ip4Address getGroupIp() {
+ return groupIp;
+ }
public DeviceId getDeviceId() {
return devId;
}
public boolean increaseCounter() {
- count++;
+ count.incrementAndGet();
return true;
}
public boolean decreaseCounter() {
- if (count > 0) {
- count--;
+ if (count.get() > 0) {
+ count.decrementAndGet();
return true;
} else {
return false;
@@ -82,7 +86,7 @@
}
public int getCounter() {
- return count;
+ return count.get();
}
public int currentState() {
return currentState;
@@ -92,13 +96,13 @@
currentState = transition[currentState][msg];
}
- public void join() {
- states[currentState].join();
+ public void join(boolean messageOutAllowed) {
+ states[currentState].join(messageOutAllowed);
next(TRANSITION_JOIN);
}
- public void leave() {
- states[currentState].leave();
+ public void leave(boolean messageOutAllowed) {
+ states[currentState].leave(messageOutAllowed);
next(TRANSITION_LEAVE);
}
@@ -124,12 +128,14 @@
}
class State {
- public void join() {
+ public void join(boolean messageOutAllowed) {
}
- public void leave() {
- Ethernet eth = IgmpSender.getInstance().buildIgmpV3Leave(groupIp, srcIp);
- IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId);
+ public void leave(boolean messageOutAllowed) {
+ if (messageOutAllowed) {
+ Ethernet eth = IgmpSender.getInstance().buildIgmpV3Leave(groupIp, srcIp);
+ IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId);
+ }
}
public void query(int maxResp) {
@@ -141,11 +147,13 @@
}
class NonMember extends State {
- public void join() {
- Ethernet eth = IgmpSender.getInstance().buildIgmpV3Join(groupIp, srcIp);
- IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId);
- timeOut = getTimeOut(IgmpManager.getUnsolicitedTimeout());
- timerId = IgmpTimer.start(SingleStateMachine.this, timeOut);
+ public void join(boolean messageOutAllowed) {
+ if (messageOutAllowed) {
+ Ethernet eth = IgmpSender.getInstance().buildIgmpV3Join(groupIp, srcIp);
+ IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId);
+ timeOut = getTimeOut(IgmpManager.getUnsolicitedTimeout());
+ timerId = IgmpTimer.start(SingleStateMachine.this, timeOut);
+ }
}
}
diff --git a/src/main/java/org/opencord/igmpproxy/StateMachine.java b/src/main/java/org/opencord/igmpproxy/StateMachine.java
index 81ad1d4..391bdee 100644
--- a/src/main/java/org/opencord/igmpproxy/StateMachine.java
+++ b/src/main/java/org/opencord/igmpproxy/StateMachine.java
@@ -26,13 +26,16 @@
* RFC 2236 "6. Host State Diagram".
*/
public final class StateMachine {
+
+ private static final String GROUP = "Group";
+
private StateMachine() {
}
private static Map<String, SingleStateMachine> map = Maps.newConcurrentMap();
private static String getId(DeviceId devId, Ip4Address groupIp) {
- return devId.toString() + "Group" + groupIp.toString();
+ return devId.toString() + GROUP + groupIp.toString();
}
private static SingleStateMachine get(DeviceId devId, Ip4Address groupIp) {
@@ -40,7 +43,7 @@
return map.get(id);
}
- public static void destorySingle(DeviceId devId, Ip4Address groupIp) {
+ public static void destroySingle(DeviceId devId, Ip4Address groupIp) {
SingleStateMachine machine = get(devId, groupIp);
if (null == machine) {
return;
@@ -51,10 +54,20 @@
public static boolean join(DeviceId devId, Ip4Address groupIp, Ip4Address srcIP) {
SingleStateMachine machine = get(devId, groupIp);
+
if (null == machine) {
machine = new SingleStateMachine(devId, groupIp, srcIP);
map.put(getId(devId, groupIp), machine);
- machine.join();
+
+ boolean shouldSendJoin = true;
+ if (IgmpManager.isIgmpOnPodBasis() &&
+ groupListenedByOtherDevices(devId, groupIp)) {
+ // unset the flag if igmp messages are evaluated on POD basis
+ // and there are already active members of this group
+ // across the entire POD
+ shouldSendJoin = false;
+ }
+ machine.join(shouldSendJoin);
return true;
}
machine.increaseCounter();
@@ -66,11 +79,21 @@
if (null == machine) {
return false;
}
- machine.decreaseCounter();
+ machine.decreaseCounter();
+ // make sure machine instance still exists.
+ // it may be removed by the preceding thread
if (machine.getCounter() == 0) {
- machine.leave();
- destorySingle(devId, groupIp);
+ boolean shouldSendLeave = true;
+ if (IgmpManager.isIgmpOnPodBasis() &&
+ groupListenedByOtherDevices(devId, groupIp)) {
+ // unset the flag if igmp messages are evaluated on POD basis
+ // and there are still active members of this group
+ // across the entire POD
+ shouldSendLeave = false;
+ }
+ machine.leave(shouldSendLeave);
+ destroySingle(devId, groupIp);
return true;
}
return false;
@@ -116,4 +139,23 @@
map.clear();
}
+ /**
+ * @param devId id of the device being excluded
+ * @param groupIp group IP address
+ * @return true if this group has at least one listener connected to
+ * any device in the map except for the device specified; false otherwise.
+ */
+ private static boolean groupListenedByOtherDevices(DeviceId devId, Ip4Address groupIp) {
+ for (SingleStateMachine machine : map.values()) {
+ if (machine.getDeviceId().equals(devId)) {
+ continue;
+ }
+ if (machine.getGroupIp().equals(groupIp)) {
+ //means group is being listened by other peers in the domain
+ return true;
+ }
+ }
+ return false;
+ }
+
}