VOL-2620 Optimize igmpProxy-1.2 app by directly using SADIS
Change-Id: I2c0c7480866b65e6c2bd96ef636e16643aa0ec88
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpManager.java b/src/main/java/org/opencord/igmpproxy/IgmpManager.java
index 7c5bfaf..d57c731 100644
--- a/src/main/java/org/opencord/igmpproxy/IgmpManager.java
+++ b/src/main/java/org/opencord/igmpproxy/IgmpManager.java
@@ -17,6 +17,10 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import org.onosproject.net.Device;
+import org.opencord.sadis.BaseInformationService;
+import org.opencord.sadis.SadisService;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -64,8 +68,6 @@
import org.onosproject.net.packet.PacketContext;
import org.onosproject.net.packet.PacketProcessor;
import org.onosproject.net.packet.PacketService;
-import org.opencord.cordconfig.access.AccessDeviceConfig;
-import org.opencord.cordconfig.access.AccessDeviceData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,7 +75,6 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -94,8 +95,8 @@
@Component(immediate = true)
public class IgmpManager {
- private static final Class<AccessDeviceConfig> CONFIG_CLASS =
- AccessDeviceConfig.class;
+ private static final String APP_NAME = "org.opencord.igmpproxy";
+
private static final Class<IgmpproxyConfig> IGMPPROXY_CONFIG_CLASS =
IgmpproxyConfig.class;
private static final Class<IgmpproxySsmTranslateConfig> IGMPPROXY_SSM_CONFIG_CLASS =
@@ -105,7 +106,7 @@
public static Map<String, GroupMember> groupMemberMap = Maps.newConcurrentMap();
private static ApplicationId appId;
- private static Map<DeviceId, AccessDeviceData> oltData = new ConcurrentHashMap<>();
+
private static int unSolicitedTimeout = 3; // unit is 1 sec
private static int keepAliveCount = 3;
private static int lastQueryInterval = 2; //unit is 1 sec
@@ -127,6 +128,7 @@
private static final String REMOVED = "removed";
private static final String INSTALLATION = "installation";
private static final String REMOVAL = "removal";
+ private static final String NNI_PREFIX = "nni";
private static boolean pimSSmInterworking = false;
private static final String DEFAULT_PIMSSM_HOST = "127.0.0.1";
@@ -148,6 +150,10 @@
protected NetworkConfigRegistry networkConfig;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MulticastRouteService multicastService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected SadisService sadisService;
+
private IgmpPacketProcessor processor = new IgmpPacketProcessor();
private Logger log = LoggerFactory.getLogger(getClass());
private ApplicationId coreAppId;
@@ -156,7 +162,7 @@
private InternalNetworkConfigListener configListener =
new InternalNetworkConfigListener();
private DeviceListener deviceListener = new InternalDeviceListener();
- private ConfigFactory<DeviceId, AccessDeviceConfig> configFactory = null;
+
private ConfigFactory<ApplicationId, IgmpproxyConfig> igmpproxyConfigFactory =
new ConfigFactory<ApplicationId, IgmpproxyConfig>(
SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_CONFIG_CLASS, "igmpproxy") {
@@ -183,24 +189,15 @@
return unSolicitedTimeout;
}
+ protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
+
@Activate
protected void activate() {
- appId = coreService.registerApplication("org.opencord.igmpproxy");
+ appId = coreService.registerApplication(APP_NAME);
coreAppId = coreService.registerApplication(CoreService.CORE_APP_NAME);
packetService.addProcessor(processor, PacketProcessor.director(4));
IgmpSender.init(packetService, mastershipService);
- if (networkConfig.getConfigFactory(CONFIG_CLASS) == null) {
- configFactory =
- new ConfigFactory<DeviceId, AccessDeviceConfig>(
- SubjectFactories.DEVICE_SUBJECT_FACTORY, CONFIG_CLASS, "accessDevice") {
- @Override
- public AccessDeviceConfig createConfig() {
- return new AccessDeviceConfig();
- }
- };
- networkConfig.registerConfigFactory(configFactory);
- }
networkConfig.registerConfigFactory(igmpproxySsmConfigFactory);
networkConfig.registerConfigFactory(igmpproxyConfigFactory);
networkConfig.addListener(configListener);
@@ -208,18 +205,8 @@
configListener.reconfigureNetwork(networkConfig.getConfig(appId, IGMPPROXY_CONFIG_CLASS));
configListener.reconfigureSsmTable(networkConfig.getConfig(appId, IGMPPROXY_SSM_CONFIG_CLASS));
- networkConfig.getSubjects(DeviceId.class, AccessDeviceConfig.class).forEach(
- subject -> {
- AccessDeviceConfig config = networkConfig.getConfig(subject,
- AccessDeviceConfig.class);
- if (config != null) {
- AccessDeviceData data = config.getAccessDevice();
- oltData.put(data.deviceId(), data);
- }
- }
- );
+ subsService = sadisService.getSubscriberInfoService();
- oltData.keySet().forEach(d -> provisionDefaultFlows(d));
if (connectPointMode) {
provisionConnectPointFlows();
} else {
@@ -246,9 +233,6 @@
// de-register and null our handler
networkConfig.removeListener(configListener);
- if (configFactory != null) {
- networkConfig.unregisterConfigFactory(configFactory);
- }
networkConfig.unregisterConfigFactory(igmpproxyConfigFactory);
networkConfig.unregisterConfigFactory(igmpproxySsmConfigFactory);
deviceService.removeListener(deviceListener);
@@ -283,19 +267,21 @@
private void processIgmpConnectPointQuery(IGMPQuery igmpQuery, ConnectPoint cp, int maxResp) {
- DeviceId deviceId = cp.deviceId();
Ip4Address gAddr = igmpQuery.getGaddr().getIp4Address();
- maxResp = calculateMaxResp(maxResp);
+ final int maxResponseTime = calculateMaxResp(maxResp);
//The query is received on the ConnectPoint
// send query accordingly to the registered OLT devices.
if (gAddr != null && !gAddr.isZero()) {
- for (DeviceId devId : oltData.keySet()) {
- StateMachine.specialQuery(devId, gAddr, maxResp);
- }
+ deviceService.getAvailableDevices().forEach(device -> {
+ Optional<SubscriberAndDeviceInformation> accessDevice = getSubscriberAndDeviceInformation(device.id());
+ if (accessDevice.isPresent()) {
+ StateMachine.specialQuery(device.id(), gAddr, maxResponseTime);
+ }
+ });
} else {
//Don't know which group is targeted by the query
//So query all the members(in all the OLTs) and proxy their reports
- StateMachine.generalQuery(maxResp);
+ StateMachine.generalQuery(maxResponseTime);
}
}
@@ -368,21 +354,29 @@
if (join) {
if (groupMember == null) {
- if (igmpType == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT) {
- groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, true);
- } else {
- groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, false);
- }
-
Optional<ConnectPoint> sourceConfigured = getSource();
if (!sourceConfigured.isPresent()) {
log.warn("Unable to process IGMP Join from {} since no source " +
"configuration is found.", deviceId);
return;
}
+
+ Optional<PortNumber> deviceUplink = getDeviceUplink(deviceId);
+ if (!deviceUplink.isPresent()) {
+ log.warn("Unable to process IGMP Join since uplink port " +
+ "of the device {} is not found.", deviceId);
+ return;
+ }
+
+ if (igmpType == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT) {
+ groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, true);
+ } else {
+ groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, false);
+ }
+
HashSet<ConnectPoint> sourceConnectPoints = Sets.newHashSet(sourceConfigured.get());
- StateMachine.join(deviceId, groupIp, srcIp);
+ StateMachine.join(deviceId, groupIp, srcIp, deviceUplink.get());
groupMemberMap.put(groupMemberKey, groupMember);
groupMember.updateList(recordType, sourceList);
groupMember.getSourceList().forEach(source -> {
@@ -450,86 +444,96 @@
@Override
public void process(PacketContext context) {
eventExecutor.execute(() -> {
- InboundPacket pkt = context.inPacket();
- Ethernet ethPkt = pkt.parsed();
- if (ethPkt == null) {
- return;
- }
+ try {
+ InboundPacket pkt = context.inPacket();
+ Ethernet ethPkt = pkt.parsed();
+ if (ethPkt == null) {
+ return;
+ }
- if (ethPkt.getEtherType() != Ethernet.TYPE_IPV4) {
- return;
- }
+ if (ethPkt.getEtherType() != Ethernet.TYPE_IPV4) {
+ return;
+ }
- IPv4 ipv4Pkt = (IPv4) ethPkt.getPayload();
+ IPv4 ipv4Pkt = (IPv4) ethPkt.getPayload();
- if (ipv4Pkt.getProtocol() != IPv4.PROTOCOL_IGMP) {
- return;
- }
+ if (ipv4Pkt.getProtocol() != IPv4.PROTOCOL_IGMP) {
+ return;
+ }
- short vlan = ethPkt.getVlanID();
- DeviceId deviceId = pkt.receivedFrom().deviceId();
+ short vlan = ethPkt.getVlanID();
+ DeviceId deviceId = pkt.receivedFrom().deviceId();
- if (oltData.get(deviceId) == null &&
- !isConnectPoint(deviceId, pkt.receivedFrom().port())) {
- log.error("Device not registered in netcfg :" + deviceId.toString());
- return;
- }
- IGMP igmp = (IGMP) ipv4Pkt.getPayload();
- switch (igmp.getIgmpType()) {
- case IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY:
- //Discard Query from OLT’s non-uplink port’s
- if (!pkt.receivedFrom().port().equals(getDeviceUplink(deviceId))) {
- if (isConnectPoint(deviceId, pkt.receivedFrom().port())) {
- log.info("IGMP Picked up query from connectPoint");
- //OK to process packet
- processIgmpConnectPointQuery((IGMPQuery) igmp.getGroups().get(0),
- pkt.receivedFrom(),
- 0xff & igmp.getMaxRespField());
- break;
- } else {
- //Not OK to process packet
- log.warn("IGMP Picked up query from non-uplink port");
+ if (!isConnectPoint(deviceId, pkt.receivedFrom().port()) &&
+ !getSubscriberAndDeviceInformation(deviceId).isPresent()) {
+ log.error("Device not registered in netcfg :" + deviceId.toString());
+ return;
+ }
+
+ IGMP igmp = (IGMP) ipv4Pkt.getPayload();
+
+ Optional<PortNumber> deviceUpLinkOpt = getDeviceUplink(deviceId);
+ PortNumber upLinkPort = deviceUpLinkOpt.isPresent() ? deviceUpLinkOpt.get() : null;
+ switch (igmp.getIgmpType()) {
+ case IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY:
+ //Discard Query from OLT’s non-uplink port’s
+ if (!pkt.receivedFrom().port().equals(upLinkPort)) {
+ if (isConnectPoint(deviceId, pkt.receivedFrom().port())) {
+ log.info("IGMP Picked up query from connectPoint");
+ //OK to process packet
+ processIgmpConnectPointQuery((IGMPQuery) igmp.getGroups().get(0),
+ pkt.receivedFrom(),
+ 0xff & igmp.getMaxRespField());
+ break;
+ } else {
+ //Not OK to process packet
+ log.warn("IGMP Picked up query from non-uplink port {}", upLinkPort);
+ return;
+ }
+ }
+
+ processIgmpQuery((IGMPQuery) igmp.getGroups().get(0), pkt.receivedFrom(),
+ 0xff & igmp.getMaxRespField());
+ break;
+ case IGMP.TYPE_IGMPV1_MEMBERSHIP_REPORT:
+ log.debug("IGMP version 1 message types are not currently supported.");
+ break;
+ case IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT:
+ case IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT:
+ case IGMP.TYPE_IGMPV2_LEAVE_GROUP:
+ //Discard join/leave from OLT’s uplink port’s
+ if (pkt.receivedFrom().port().equals(upLinkPort) ||
+ isConnectPoint(deviceId, pkt.receivedFrom().port())) {
+ log.info("IGMP Picked up join/leave from uplink/connectPoint port");
return;
}
- }
- processIgmpQuery((IGMPQuery) igmp.getGroups().get(0), pkt.receivedFrom(),
- 0xff & igmp.getMaxRespField());
- break;
- case IGMP.TYPE_IGMPV1_MEMBERSHIP_REPORT:
- log.debug("IGMP version 1 message types are not currently supported.");
- break;
- case IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT:
- case IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT:
- case IGMP.TYPE_IGMPV2_LEAVE_GROUP:
- //Discard join/leave from OLT’s uplink port’s
- if (pkt.receivedFrom().port().equals(getDeviceUplink(deviceId)) ||
- isConnectPoint(deviceId, pkt.receivedFrom().port())) {
- log.info("IGMP Picked up join/leave from uplink/connectPoint port");
- return;
- }
-
- Iterator<IGMPGroup> itr = igmp.getGroups().iterator();
- while (itr.hasNext()) {
- IGMPGroup group = itr.next();
- if (group instanceof IGMPMembership) {
- processIgmpReport((IGMPMembership) group, VlanId.vlanId(vlan),
- pkt.receivedFrom(), igmp.getIgmpType());
- } else if (group instanceof IGMPQuery) {
- IGMPMembership mgroup;
- mgroup = new IGMPMembership(group.getGaddr().getIp4Address());
- mgroup.setRecordType(igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT ?
- IGMPMembership.MODE_IS_EXCLUDE :
- IGMPMembership.MODE_IS_INCLUDE);
- processIgmpReport(mgroup, VlanId.vlanId(vlan),
- pkt.receivedFrom(), igmp.getIgmpType());
+ Iterator<IGMPGroup> itr = igmp.getGroups().iterator();
+ while (itr.hasNext()) {
+ IGMPGroup group = itr.next();
+ if (group instanceof IGMPMembership) {
+ processIgmpReport((IGMPMembership) group, VlanId.vlanId(vlan),
+ pkt.receivedFrom(), igmp.getIgmpType());
+ } else if (group instanceof IGMPQuery) {
+ IGMPMembership mgroup;
+ mgroup = new IGMPMembership(group.getGaddr().getIp4Address());
+ mgroup.setRecordType(igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT ?
+ IGMPMembership.MODE_IS_EXCLUDE :
+ IGMPMembership.MODE_IS_INCLUDE);
+ processIgmpReport(mgroup, VlanId.vlanId(vlan),
+ pkt.receivedFrom(), igmp.getIgmpType());
+ }
}
- }
- break;
+ break;
- default:
- log.info("wrong IGMP v3 type:" + igmp.getIgmpType());
- break;
+ default:
+ log.info("wrong IGMP v3 type:" + igmp.getIgmpType());
+ break;
+ }
+
+ } catch (Exception ex) {
+ log.error("igmp process error : {} ", ex);
+ ex.printStackTrace();
}
});
}
@@ -589,12 +593,40 @@
}
- public static PortNumber getDeviceUplink(DeviceId devId) {
- if (oltData.get(devId) != null) {
- return oltData.get(devId).uplink();
- } else {
- return null;
+ public Optional<PortNumber> getDeviceUplink(DeviceId devId) {
+ Device device = deviceService.getDevice(devId);
+ if (device == null || device.serialNumber() == null) {
+ return Optional.empty();
}
+ Optional<SubscriberAndDeviceInformation> olt = getSubscriberAndDeviceInformation(device.serialNumber());
+
+ if (!olt.isPresent()) {
+ return Optional.empty();
+ }
+ PortNumber portNumber = PortNumber.portNumber(olt.get().uplinkPort());
+ return validateUpLinkPort(device.id(), portNumber) ?
+ Optional.of(portNumber) : Optional.empty();
+ }
+
+ /**
+ *
+ * @param deviceId device id
+ * @param portNumber port number
+ * @return true if the port name starts with NNI_PREFIX; false otherwise.
+ */
+ public boolean validateUpLinkPort(DeviceId deviceId, PortNumber portNumber) {
+ Port port = deviceService.getPort(deviceId, portNumber);
+ if (port == null) {
+ //port is not discovered by ONOS; so cannot validate it.
+ return false;
+ }
+ boolean isValid = port.annotations().value(AnnotationKeys.PORT_NAME) != null &&
+ port.annotations().value(AnnotationKeys.PORT_NAME).startsWith(NNI_PREFIX);
+ if (!isValid) {
+ log.warn("Port cannot be validated; it is not configured as an NNI port." +
+ "Device/port: {}/{}", deviceId, portNumber);
+ }
+ return isValid;
}
public static boolean isIgmpOnPodBasis() {
@@ -648,8 +680,45 @@
}
private boolean isUplink(DeviceId device, PortNumber port) {
- return ((!connectPointMode) && oltData.containsKey(device)
- && oltData.get(device).uplink().equals(port));
+ if (connectPointMode) {
+ return false;
+ }
+ Optional<PortNumber> upLinkPort = getDeviceUplink(device);
+ return upLinkPort.isPresent() && upLinkPort.get().equals(port);
+ }
+
+ /**
+ * Fetches device information associated with the device serial number from SADIS.
+ *
+ * @param serialNumber serial number of a device
+ * @return device information; an empty Optional otherwise.
+ */
+ private Optional<SubscriberAndDeviceInformation> getSubscriberAndDeviceInformation(String serialNumber) {
+ long start = System.currentTimeMillis();
+ try {
+ return Optional.ofNullable(sadisService.getSubscriberInfoService().get(serialNumber));
+ } finally {
+ if (log.isDebugEnabled()) {
+ // SADIS can call remote systems to fetch device data and this calls can take a long time.
+ // This measurement is just for monitoring these kinds of situations.
+ log.debug("Device fetched from SADIS. Elapsed {} msec", System.currentTimeMillis() - start);
+ }
+
+ }
+ }
+
+ /**
+ * Fetches device information associated with the device serial number from SADIS.
+ *
+ * @param deviceId device id
+ * @return device information; an empty Optional otherwise.
+ */
+ private Optional<SubscriberAndDeviceInformation> getSubscriberAndDeviceInformation(DeviceId deviceId) {
+ Device device = deviceService.getDevice(deviceId);
+ if (device == null || device.serialNumber() == null) {
+ return Optional.empty();
+ }
+ return getSubscriberAndDeviceInformation(device.serialNumber());
}
private class InternalDeviceListener implements DeviceListener {
@@ -657,7 +726,7 @@
public void event(DeviceEvent event) {
DeviceId devId = event.subject().id();
Port p = event.port();
- if (oltData.get(devId) == null &&
+ if (!getSubscriberAndDeviceInformation(devId).isPresent() &&
!(p != null && isConnectPoint(devId, p.number()))) {
return;
}
@@ -674,7 +743,8 @@
break;
case PORT_ADDED:
port = p.number();
- if (oltData.containsKey(devId) && !isUplink(devId, port) && !isConnectPoint(devId, port)) {
+ if (getSubscriberAndDeviceInformation(devId).isPresent() &&
+ !isUplink(devId, port) && !isConnectPoint(devId, port)) {
processFilterObjective(devId, port, false);
} else if (isUplink(devId, port)) {
provisionUplinkFlows();
@@ -684,7 +754,8 @@
break;
case PORT_UPDATED:
port = p.number();
- if (oltData.containsKey(devId) && !isUplink(devId, port) && !isConnectPoint(devId, port)) {
+ if (getSubscriberAndDeviceInformation(devId).isPresent() &&
+ !isUplink(devId, port) && !isConnectPoint(devId, port)) {
if (event.port().isEnabled()) {
processFilterObjective(devId, port, false);
} else {
@@ -788,15 +859,6 @@
switch (event.type()) {
case CONFIG_ADDED:
case CONFIG_UPDATED:
- if (event.configClass().equals(CONFIG_CLASS)) {
- AccessDeviceConfig config =
- networkConfig.getConfig((DeviceId) event.subject(), CONFIG_CLASS);
- if (config != null) {
- oltData.put(config.getAccessDevice().deviceId(), config.getAccessDevice());
- provisionDefaultFlows((DeviceId) event.subject());
- provisionUplinkFlows((DeviceId) event.subject());
- }
- }
if (event.configClass().equals(IGMPPROXY_CONFIG_CLASS)) {
IgmpproxyConfig config = networkConfig.getConfig(appId, IGMPPROXY_CONFIG_CLASS);
@@ -826,43 +888,46 @@
break;
case CONFIG_REGISTERED:
case CONFIG_UNREGISTERED:
- break;
case CONFIG_REMOVED:
- if (event.configClass().equals(CONFIG_CLASS)) {
- oltData.remove(event.subject());
- }
-
default:
break;
}
}
}
- private void provisionDefaultFlows(DeviceId deviceId) {
- List<Port> ports = deviceService.getPorts(deviceId);
- ports.stream()
- .filter(p -> (!oltData.get(p.element().id()).uplink().equals(p.number()) && p.isEnabled()))
- .forEach(p -> processFilterObjective((DeviceId) p.element().id(), p.number(), false));
- }
-
private void provisionUplinkFlows(DeviceId deviceId) {
if (connectPointMode) {
return;
}
- processFilterObjective(deviceId, oltData.get(deviceId).uplink(), false);
+ Optional<PortNumber> upLink = getDeviceUplink(deviceId);
+ if (upLink.isPresent()) {
+ processFilterObjective(deviceId, upLink.get(), false);
+ }
}
private void provisionUplinkFlows() {
if (connectPointMode) {
return;
}
-
- oltData.keySet().forEach(deviceId -> provisionUplinkFlows(deviceId));
+ deviceService.getAvailableDevices().forEach(device -> {
+ Optional<SubscriberAndDeviceInformation> accessDevice = getSubscriberAndDeviceInformation(device.id());
+ if (accessDevice.isPresent()) {
+ provisionUplinkFlows(device.id());
+ }
+ });
}
+
private void unprovisionUplinkFlows() {
- oltData.keySet().forEach(deviceId ->
- processFilterObjective(deviceId, oltData.get(deviceId).uplink(), true));
+ deviceService.getAvailableDevices().forEach(device -> {
+ Optional<SubscriberAndDeviceInformation> accessDevices = getSubscriberAndDeviceInformation(device.id());
+ if (accessDevices.isPresent()) {
+ Optional<PortNumber> upLink = getDeviceUplink(device.id());
+ if (upLink.isPresent()) {
+ processFilterObjective(device.id(), upLink.get(), true);
+ }
+ }
+ });
}
private void provisionConnectPointFlows() {
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpSender.java b/src/main/java/org/opencord/igmpproxy/IgmpSender.java
index f880646..b455cb4 100644
--- a/src/main/java/org/opencord/igmpproxy/IgmpSender.java
+++ b/src/main/java/org/opencord/igmpproxy/IgmpSender.java
@@ -195,12 +195,11 @@
return mac;
}
- public void sendIgmpPacketUplink(Ethernet ethPkt, DeviceId deviceId) {
+ public void sendIgmpPacketUplink(Ethernet ethPkt, DeviceId deviceId, PortNumber upLinkPort) {
if (!mastershipService.isLocalMaster(deviceId)) {
return;
}
-
if (IgmpManager.connectPointMode) {
if (IgmpManager.connectPoint == null) {
log.warn("cannot find a connectPoint to send the packet uplink");
@@ -208,8 +207,7 @@
}
sendIgmpPacket(ethPkt, IgmpManager.connectPoint.deviceId(), IgmpManager.connectPoint.port());
} else {
- PortNumber upLink = IgmpManager.getDeviceUplink(deviceId);
- sendIgmpPacket(ethPkt, deviceId, upLink);
+ sendIgmpPacket(ethPkt, deviceId, upLinkPort);
}
}
diff --git a/src/main/java/org/opencord/igmpproxy/SingleStateMachine.java b/src/main/java/org/opencord/igmpproxy/SingleStateMachine.java
index c5aaee7..c2b32a4 100644
--- a/src/main/java/org/opencord/igmpproxy/SingleStateMachine.java
+++ b/src/main/java/org/opencord/igmpproxy/SingleStateMachine.java
@@ -18,6 +18,7 @@
import org.onlab.packet.Ethernet;
import org.onlab.packet.Ip4Address;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
@@ -42,6 +43,7 @@
private DeviceId devId;
private Ip4Address groupIp;
private Ip4Address srcIp;
+ private PortNumber upLinkPort;
private AtomicInteger count = new AtomicInteger(DEFAULT_COUNT);
private int timerId = IgmpTimer.INVALID_TIMER_ID;
@@ -61,10 +63,11 @@
{nonTransition, delayTransition, idleTransition};
private int currentState = STATE_NON;
- public SingleStateMachine(DeviceId devId, Ip4Address groupIp, Ip4Address src) {
+ public SingleStateMachine(DeviceId devId, Ip4Address groupIp, Ip4Address src, PortNumber upLinkPort) {
this.devId = devId;
this.groupIp = groupIp;
this.srcIp = src;
+ this.upLinkPort = upLinkPort;
}
public Ip4Address getGroupIp() {
@@ -137,7 +140,7 @@
public void leave(boolean messageOutAllowed) {
if (messageOutAllowed) {
Ethernet eth = IgmpSender.getInstance().buildIgmpV3Leave(groupIp, srcIp);
- IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId);
+ IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId, upLinkPort);
}
}
@@ -153,7 +156,7 @@
public void join(boolean messageOutAllowed) {
if (messageOutAllowed) {
Ethernet eth = IgmpSender.getInstance().buildIgmpV3Join(groupIp, srcIp);
- IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId);
+ IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId, upLinkPort);
timeOut = getTimeOut(IgmpManager.getUnsolicitedTimeout());
timerId = IgmpTimer.start(SingleStateMachine.this, timeOut);
}
@@ -171,7 +174,7 @@
public void timeOut() {
if (sendQuery) {
Ethernet eth = IgmpSender.getInstance().buildIgmpV3ResponseQuery(groupIp, srcIp);
- IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId);
+ IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId, upLinkPort);
timeOut = DEFAULT_MAX_RESP;
}
}
diff --git a/src/main/java/org/opencord/igmpproxy/StateMachine.java b/src/main/java/org/opencord/igmpproxy/StateMachine.java
index 391bdee..1097560 100644
--- a/src/main/java/org/opencord/igmpproxy/StateMachine.java
+++ b/src/main/java/org/opencord/igmpproxy/StateMachine.java
@@ -18,6 +18,8 @@
import com.google.common.collect.Maps;
import org.onlab.packet.Ip4Address;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+
import java.util.Map;
import java.util.Set;
@@ -52,11 +54,11 @@
map.remove(getId(devId, groupIp));
}
- public static boolean join(DeviceId devId, Ip4Address groupIp, Ip4Address srcIP) {
+ public static boolean join(DeviceId devId, Ip4Address groupIp, Ip4Address srcIP, PortNumber upLinkPort) {
SingleStateMachine machine = get(devId, groupIp);
if (null == machine) {
- machine = new SingleStateMachine(devId, groupIp, srcIP);
+ machine = new SingleStateMachine(devId, groupIp, srcIP, upLinkPort);
map.put(getId(devId, groupIp), machine);
boolean shouldSendJoin = true;