VOL-2620 Optimize igmpProxy app by directly using SADIS
Change-Id: I2c67f6162d446de1fb773fe311a93972f3aa582a
diff --git a/pom.xml b/pom.xml
index 25abd3c..d5764a1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -39,7 +39,6 @@
<onos.app.url>http://opencord.org</onos.app.url>
<onos.app.readme>IGMP implementation.</onos.app.readme>
<onos.app.requires>
- org.opencord.config,
org.onosproject.mcast
</onos.app.requires>
<sadis.api.version>5.0.0</sadis.api.version>
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpManager.java b/src/main/java/org/opencord/igmpproxy/IgmpManager.java
index 0bbfe6d..1cdb55b 100644
--- a/src/main/java/org/opencord/igmpproxy/IgmpManager.java
+++ b/src/main/java/org/opencord/igmpproxy/IgmpManager.java
@@ -129,6 +129,7 @@
private static final String REMOVED = "removed";
private static final String INSTALLATION = "installation";
private static final String REMOVAL = "removal";
+ private static final String NNI_PREFIX = "nni";
private static boolean pimSSmInterworking = false;
private static final String DEFAULT_PIMSSM_HOST = "127.0.0.1";
@@ -198,8 +199,6 @@
}
protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
- private static Map<DeviceId, SubscriberAndDeviceInformation> oltData = new ConcurrentHashMap<>();
-
@Activate
protected void activate() {
@@ -217,13 +216,6 @@
subsService = sadisService.getSubscriberInfoService();
- networkConfig.getSubjects(DeviceId.class).forEach(subject -> {
- SubscriberAndDeviceInformation olt = subsService.get(subject.toString());
- if (olt != null) {
- oltData.put(subject, olt);
- }
- });
-
if (connectPointMode) {
provisionConnectPointFlows();
} else {
@@ -284,19 +276,21 @@
private void processIgmpConnectPointQuery(IGMPQuery igmpQuery, ConnectPoint cp, int maxResp) {
- DeviceId deviceId = cp.deviceId();
Ip4Address gAddr = igmpQuery.getGaddr().getIp4Address();
- maxResp = calculateMaxResp(maxResp);
+ final int maxResponseTime = calculateMaxResp(maxResp);
//The query is received on the ConnectPoint
// send query accordingly to the registered OLT devices.
if (gAddr != null && !gAddr.isZero()) {
- for (DeviceId devId : oltData.keySet()) {
- StateMachine.specialQuery(devId, gAddr, maxResp);
- }
+ deviceService.getAvailableDevices().forEach(device -> {
+ Optional<SubscriberAndDeviceInformation> accessDevice = getSubscriberAndDeviceInformation(device.id());
+ if (accessDevice.isPresent()) {
+ StateMachine.specialQuery(device.id(), gAddr, maxResponseTime);
+ }
+ });
} else {
//Don't know which group is targeted by the query
//So query all the members(in all the OLTs) and proxy their reports
- StateMachine.generalQuery(maxResp);
+ StateMachine.generalQuery(maxResponseTime);
}
}
@@ -369,21 +363,29 @@
if (join) {
if (groupMember == null) {
- if (igmpType == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT) {
- groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, true);
- } else {
- groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, false);
- }
-
Optional<ConnectPoint> sourceConfigured = getSource();
if (!sourceConfigured.isPresent()) {
log.warn("Unable to process IGMP Join from {} since no source " +
"configuration is found.", deviceId);
return;
}
+
+ Optional<PortNumber> deviceUplink = getDeviceUplink(deviceId);
+ if (deviceUplink.isEmpty()) {
+ log.warn("Unable to process IGMP Join since uplink port " +
+ "of the device {} is not found.", deviceId);
+ return;
+ }
+
+ if (igmpType == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT) {
+ groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, true);
+ } else {
+ groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, false);
+ }
+
HashSet<ConnectPoint> sourceConnectPoints = Sets.newHashSet(sourceConfigured.get());
- StateMachine.join(deviceId, groupIp, srcIp);
+ StateMachine.join(deviceId, groupIp, srcIp, deviceUplink.get());
groupMemberMap.put(groupMemberKey, groupMember);
groupMember.updateList(recordType, sourceList);
groupMember.getSourceList().forEach(source -> {
@@ -471,8 +473,8 @@
short vlan = ethPkt.getVlanID();
DeviceId deviceId = pkt.receivedFrom().deviceId();
- if (oltData.get(deviceId) == null &&
- !isConnectPoint(deviceId, pkt.receivedFrom().port())) {
+ if (!isConnectPoint(deviceId, pkt.receivedFrom().port()) &&
+ !getSubscriberAndDeviceInformation(deviceId).isPresent()) {
log.error("Device not registered in netcfg :" + deviceId.toString());
return;
}
@@ -597,12 +599,34 @@
}
- public static PortNumber getDeviceUplink(DeviceId devId) {
- if (oltData.get(devId) != null) {
- return PortNumber.portNumber(oltData.get(devId).uplinkPort());
- } else {
- return null;
+ public Optional<PortNumber> getDeviceUplink(DeviceId devId) {
+ Device device = deviceService.getDevice(devId);
+ if (device == null || device.serialNumber() == null) {
+ return Optional.empty();
}
+ Optional<SubscriberAndDeviceInformation> olt = getSubscriberAndDeviceInformation(device.serialNumber());
+ if (olt.isEmpty()) {
+ return Optional.empty();
+ }
+ PortNumber portNumber = PortNumber.portNumber(olt.get().uplinkPort());
+ return validateUpLinkPort(device.id(), portNumber) ?
+ Optional.of(portNumber) : Optional.empty();
+ }
+
+ /**
+ *
+ * @param deviceId device id
+ * @param portNumber port number
+ * @return true if the port name starts with NNI_PREFIX; false otherwise.
+ */
+ public boolean validateUpLinkPort(DeviceId deviceId, PortNumber portNumber) {
+ Port port = deviceService.getPort(deviceId, portNumber);
+ if (port == null) {
+ //port is not discovered by ONOS; so cannot validate it.
+ return false;
+ }
+ return port.annotations().value(AnnotationKeys.PORT_NAME) != null &&
+ port.annotations().value(AnnotationKeys.PORT_NAME).startsWith(NNI_PREFIX);
}
public static boolean isIgmpOnPodBasis() {
@@ -659,12 +683,46 @@
return ((!connectPointMode) && getDeviceUplink(device).equals(port));
}
+ /**
+ * Fetches device information associated with the device serial number from SADIS.
+ *
+ * @param serialNumber serial number of a device
+ * @return device information; an empty Optional otherwise.
+ */
+ private Optional<SubscriberAndDeviceInformation> getSubscriberAndDeviceInformation(String serialNumber) {
+ long start = System.currentTimeMillis();
+ try {
+ return Optional.ofNullable(sadisService.getSubscriberInfoService().get(serialNumber));
+ } finally {
+ if (log.isDebugEnabled()) {
+ // SADIS can call remote systems to fetch device data and this calls can take a long time.
+ // This measurement is just for monitoring these kinds of situations.
+ log.debug("Device fetched from SADIS. Elapsed {} msec", System.currentTimeMillis() - start);
+ }
+
+ }
+ }
+
+ /**
+ * Fetches device information associated with the device serial number from SADIS.
+ *
+ * @param deviceId device id
+ * @return device information; an empty Optional otherwise.
+ */
+ private Optional<SubscriberAndDeviceInformation> getSubscriberAndDeviceInformation(DeviceId deviceId) {
+ Device device = deviceService.getDevice(deviceId);
+ if (device == null || device.serialNumber() == null) {
+ return Optional.empty();
+ }
+ return getSubscriberAndDeviceInformation(device.serialNumber());
+ }
+
private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
DeviceId devId = event.subject().id();
Port p = event.port();
- if (oltData.get(devId) == null &&
+ if (getSubscriberAndDeviceInformation(devId).isEmpty() &&
!(p != null && isConnectPoint(devId, p.number()))) {
return;
}
@@ -681,7 +739,8 @@
break;
case PORT_ADDED:
port = p.number();
- if (oltData.containsKey(devId) && !isUplink(devId, port) && !isConnectPoint(devId, port)) {
+ if (getSubscriberAndDeviceInformation(devId).isPresent() &&
+ !isUplink(devId, port) && !isConnectPoint(devId, port)) {
processFilterObjective(devId, port, false);
} else if (isUplink(devId, port)) {
provisionUplinkFlows();
@@ -691,7 +750,8 @@
break;
case PORT_UPDATED:
port = p.number();
- if (oltData.containsKey(devId) && !isUplink(devId, port) && !isConnectPoint(devId, port)) {
+ if (getSubscriberAndDeviceInformation(devId).isPresent() &&
+ !isUplink(devId, port) && !isConnectPoint(devId, port)) {
if (event.port().isEnabled()) {
processFilterObjective(devId, port, false);
} else {
@@ -847,19 +907,34 @@
return;
}
- processFilterObjective(deviceId, getDeviceUplink(deviceId), false);
+ Optional<PortNumber> upLink = getDeviceUplink(deviceId);
+ if (upLink.isPresent()) {
+ processFilterObjective(deviceId, upLink.get(), false);
+ }
}
private void provisionUplinkFlows() {
if (connectPointMode) {
return;
}
-
- oltData.keySet().forEach(deviceId -> provisionUplinkFlows(deviceId));
+ deviceService.getAvailableDevices().forEach(device -> {
+ Optional<SubscriberAndDeviceInformation> accessDevice = getSubscriberAndDeviceInformation(device.id());
+ if (accessDevice.isPresent()) {
+ provisionUplinkFlows(device.id());
+ }
+ });
}
+
private void unprovisionUplinkFlows() {
- oltData.keySet().forEach(deviceId ->
- processFilterObjective(deviceId, getDeviceUplink(deviceId), true));
+ deviceService.getAvailableDevices().forEach(device -> {
+ Optional<SubscriberAndDeviceInformation> accessDevices = getSubscriberAndDeviceInformation(device.id());
+ if (accessDevices.isPresent()) {
+ Optional<PortNumber> upLink = getDeviceUplink(device.id());
+ if (upLink.isPresent()) {
+ processFilterObjective(device.id(), upLink.get(), true);
+ }
+ }
+ });
}
private void provisionConnectPointFlows() {
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpSender.java b/src/main/java/org/opencord/igmpproxy/IgmpSender.java
index f880646..b455cb4 100644
--- a/src/main/java/org/opencord/igmpproxy/IgmpSender.java
+++ b/src/main/java/org/opencord/igmpproxy/IgmpSender.java
@@ -195,12 +195,11 @@
return mac;
}
- public void sendIgmpPacketUplink(Ethernet ethPkt, DeviceId deviceId) {
+ public void sendIgmpPacketUplink(Ethernet ethPkt, DeviceId deviceId, PortNumber upLinkPort) {
if (!mastershipService.isLocalMaster(deviceId)) {
return;
}
-
if (IgmpManager.connectPointMode) {
if (IgmpManager.connectPoint == null) {
log.warn("cannot find a connectPoint to send the packet uplink");
@@ -208,8 +207,7 @@
}
sendIgmpPacket(ethPkt, IgmpManager.connectPoint.deviceId(), IgmpManager.connectPoint.port());
} else {
- PortNumber upLink = IgmpManager.getDeviceUplink(deviceId);
- sendIgmpPacket(ethPkt, deviceId, upLink);
+ sendIgmpPacket(ethPkt, deviceId, upLinkPort);
}
}
diff --git a/src/main/java/org/opencord/igmpproxy/SingleStateMachine.java b/src/main/java/org/opencord/igmpproxy/SingleStateMachine.java
index c5aaee7..c2b32a4 100644
--- a/src/main/java/org/opencord/igmpproxy/SingleStateMachine.java
+++ b/src/main/java/org/opencord/igmpproxy/SingleStateMachine.java
@@ -18,6 +18,7 @@
import org.onlab.packet.Ethernet;
import org.onlab.packet.Ip4Address;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
@@ -42,6 +43,7 @@
private DeviceId devId;
private Ip4Address groupIp;
private Ip4Address srcIp;
+ private PortNumber upLinkPort;
private AtomicInteger count = new AtomicInteger(DEFAULT_COUNT);
private int timerId = IgmpTimer.INVALID_TIMER_ID;
@@ -61,10 +63,11 @@
{nonTransition, delayTransition, idleTransition};
private int currentState = STATE_NON;
- public SingleStateMachine(DeviceId devId, Ip4Address groupIp, Ip4Address src) {
+ public SingleStateMachine(DeviceId devId, Ip4Address groupIp, Ip4Address src, PortNumber upLinkPort) {
this.devId = devId;
this.groupIp = groupIp;
this.srcIp = src;
+ this.upLinkPort = upLinkPort;
}
public Ip4Address getGroupIp() {
@@ -137,7 +140,7 @@
public void leave(boolean messageOutAllowed) {
if (messageOutAllowed) {
Ethernet eth = IgmpSender.getInstance().buildIgmpV3Leave(groupIp, srcIp);
- IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId);
+ IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId, upLinkPort);
}
}
@@ -153,7 +156,7 @@
public void join(boolean messageOutAllowed) {
if (messageOutAllowed) {
Ethernet eth = IgmpSender.getInstance().buildIgmpV3Join(groupIp, srcIp);
- IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId);
+ IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId, upLinkPort);
timeOut = getTimeOut(IgmpManager.getUnsolicitedTimeout());
timerId = IgmpTimer.start(SingleStateMachine.this, timeOut);
}
@@ -171,7 +174,7 @@
public void timeOut() {
if (sendQuery) {
Ethernet eth = IgmpSender.getInstance().buildIgmpV3ResponseQuery(groupIp, srcIp);
- IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId);
+ IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId, upLinkPort);
timeOut = DEFAULT_MAX_RESP;
}
}
diff --git a/src/main/java/org/opencord/igmpproxy/StateMachine.java b/src/main/java/org/opencord/igmpproxy/StateMachine.java
index 391bdee..1097560 100644
--- a/src/main/java/org/opencord/igmpproxy/StateMachine.java
+++ b/src/main/java/org/opencord/igmpproxy/StateMachine.java
@@ -18,6 +18,8 @@
import com.google.common.collect.Maps;
import org.onlab.packet.Ip4Address;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+
import java.util.Map;
import java.util.Set;
@@ -52,11 +54,11 @@
map.remove(getId(devId, groupIp));
}
- public static boolean join(DeviceId devId, Ip4Address groupIp, Ip4Address srcIP) {
+ public static boolean join(DeviceId devId, Ip4Address groupIp, Ip4Address srcIP, PortNumber upLinkPort) {
SingleStateMachine machine = get(devId, groupIp);
if (null == machine) {
- machine = new SingleStateMachine(devId, groupIp, srcIP);
+ machine = new SingleStateMachine(devId, groupIp, srcIP, upLinkPort);
map.put(getId(devId, groupIp), machine);
boolean shouldSendJoin = true;
diff --git a/src/test/java/org/opencord/igmpproxy/IgmpManagerBase.java b/src/test/java/org/opencord/igmpproxy/IgmpManagerBase.java
index 82d342c..43cc24a 100644
--- a/src/test/java/org/opencord/igmpproxy/IgmpManagerBase.java
+++ b/src/test/java/org/opencord/igmpproxy/IgmpManagerBase.java
@@ -31,6 +31,7 @@
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DefaultAnnotations;
import org.onosproject.net.DefaultDevice;
+import org.onosproject.net.DefaultPort;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.HostId;
@@ -81,6 +82,7 @@
// Uplink ports for two olts A and B
protected static final PortNumber PORT_A = PortNumber.portNumber(1);
protected static final PortNumber PORT_B = PortNumber.portNumber(2);
+ protected static final PortNumber PORT_NNI = PortNumber.portNumber(65536);
// Connect Point mode for two olts
protected static final ConnectPoint CONNECT_POINT_A = new ConnectPoint(DEVICE_ID_OF_A, PORT_A);
@@ -90,6 +92,8 @@
protected static final String CLIENT_CIRCUIT_ID = "CIR-PON 1/1";
protected String dsBpId = "HSIA-DS";
+ private static final String NNI_PREFIX = "nni";
+
protected List<Port> lsPorts = new ArrayList<Port>();
// Flag for adding two different devices in oltData
protected boolean flagForDevice = true;
@@ -126,6 +130,17 @@
public List<Port> getPorts(DeviceId deviceId) {
return lsPorts;
}
+
+ @Override
+ public Port getPort(DeviceId deviceId, PortNumber portNumber) {
+ if (portNumber.equals(PORT_NNI)) {
+ DefaultAnnotations.Builder annotationsBuilder = DefaultAnnotations.builder()
+ .set(AnnotationKeys.PORT_NAME, NNI_PREFIX);
+ Port nni = new DefaultPort(null, portNumber, true, annotationsBuilder.build());
+ return nni;
+ }
+ return super.getPort(deviceId, portNumber);
+ }
}
static final Class<IgmpproxyConfig> IGMPPROXY_CONFIG_CLASS = IgmpproxyConfig.class;
@@ -465,6 +480,7 @@
this.setIPAddress(ipAddress);
this.setNasPortId(nasPortId);
this.setCircuitId(circuitId);
+ this.setUplinkPort((int) PORT_NNI.toLong());
}
}