SEBA-644 igmpproxy should forward IGMP to BNG on a per pod basis or
a per OLT basis.
- "igmpOnPodBasis" config property added. If true ongoing IGMP
messages are evaluated on PON basis; they are evaluated
OLT basis otherwise.
Change-Id: Ic931598deb99f3d09a5bc5d8117de0c56e92a039
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpManager.java b/src/main/java/org/opencord/igmpproxy/IgmpManager.java
index 40d390e..691e58f 100644
--- a/src/main/java/org/opencord/igmpproxy/IgmpManager.java
+++ b/src/main/java/org/opencord/igmpproxy/IgmpManager.java
@@ -79,10 +79,14 @@
import java.util.Set;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+
/**
* Igmp process application, use proxy mode, support first join/ last leave , fast leave
* period query and keep alive, packet out igmp message to uplink port features.
@@ -116,6 +120,7 @@
public static ConnectPoint connectPoint = null;
private static ConnectPoint sourceDeviceAndPort = null;
private static boolean enableIgmpProvisioning = false;
+ private static boolean igmpOnPodBasis = false;
private static final Integer MAX_PRIORITY = 10000;
private static final String INSTALLED = "installed";
@@ -172,6 +177,8 @@
private int maxResp = 10; //unit is 1 sec
private int keepAliveInterval = 120; //unit is 1 sec
+ private ExecutorService eventExecutor;
+
public static int getUnsolicitedTimeout() {
return unSolicitedTimeout;
}
@@ -226,6 +233,8 @@
}
deviceService.addListener(deviceListener);
scheduledExecutorService.scheduleAtFixedRate(new IgmpProxyTimerTask(), 0, 1000, TimeUnit.MILLISECONDS);
+ eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("cord/igmpproxy",
+ "events-igmp-%d", log));
log.info("Started");
}
@@ -233,6 +242,7 @@
@Deactivate
protected void deactivate() {
scheduledExecutorService.shutdown();
+ eventExecutor.shutdown();
// de-register and null our handler
networkConfig.removeListener(configListener);
@@ -439,8 +449,7 @@
private class IgmpPacketProcessor implements PacketProcessor {
@Override
public void process(PacketContext context) {
-
- try {
+ eventExecutor.execute(() -> {
InboundPacket pkt = context.inPacket();
Ethernet ethPkt = pkt.parsed();
if (ethPkt == null) {
@@ -474,8 +483,9 @@
if (isConnectPoint(deviceId, pkt.receivedFrom().port())) {
log.info("IGMP Picked up query from connectPoint");
//OK to process packet
- processIgmpConnectPointQuery((IGMPQuery) igmp.getGroups().get(0), pkt.receivedFrom(),
- 0xff & igmp.getMaxRespField());
+ processIgmpConnectPointQuery((IGMPQuery) igmp.getGroups().get(0),
+ pkt.receivedFrom(),
+ 0xff & igmp.getMaxRespField());
break;
} else {
//Not OK to process packet
@@ -485,7 +495,7 @@
}
processIgmpQuery((IGMPQuery) igmp.getGroups().get(0), pkt.receivedFrom(),
- 0xff & igmp.getMaxRespField());
+ 0xff & igmp.getMaxRespField());
break;
case IGMP.TYPE_IGMPV1_MEMBERSHIP_REPORT:
log.debug("IGMP version 1 message types are not currently supported.");
@@ -505,14 +515,15 @@
IGMPGroup group = itr.next();
if (group instanceof IGMPMembership) {
processIgmpReport((IGMPMembership) group, VlanId.vlanId(vlan),
- pkt.receivedFrom(), igmp.getIgmpType());
+ pkt.receivedFrom(), igmp.getIgmpType());
} else if (group instanceof IGMPQuery) {
IGMPMembership mgroup;
mgroup = new IGMPMembership(group.getGaddr().getIp4Address());
mgroup.setRecordType(igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT ?
- IGMPMembership.MODE_IS_EXCLUDE : IGMPMembership.MODE_IS_INCLUDE);
+ IGMPMembership.MODE_IS_EXCLUDE :
+ IGMPMembership.MODE_IS_INCLUDE);
processIgmpReport(mgroup, VlanId.vlanId(vlan),
- pkt.receivedFrom(), igmp.getIgmpType());
+ pkt.receivedFrom(), igmp.getIgmpType());
}
}
break;
@@ -521,11 +532,7 @@
log.info("wrong IGMP v3 type:" + igmp.getIgmpType());
break;
}
-
- } catch (Exception ex) {
- log.error("igmp process error : {} ", ex);
- ex.printStackTrace();
- }
+ });
}
}
@@ -591,6 +598,10 @@
}
}
+ public static boolean isIgmpOnPodBasis() {
+ return igmpOnPodBasis;
+ }
+
private void processFilterObjective(DeviceId devId, PortNumber port, boolean remove) {
if (!enableIgmpProvisioning) {
log.debug("IGMP trap rules won't be installed since enableIgmpProvisioning flag is not set");
@@ -728,6 +739,7 @@
fastLeave = newCfg.fastLeave();
pimSSmInterworking = newCfg.pimSsmInterworking();
enableIgmpProvisioning = newCfg.enableIgmpProvisioning();
+ igmpOnPodBasis = newCfg.igmpOnPodBasis();
if (connectPointMode != newCfg.connectPointMode() ||
connectPoint != newCfg.connectPoint()) {