VOL-2620 Optimize igmpProxy app by directly using SADIS
Change-Id: I2c67f6162d446de1fb773fe311a93972f3aa582a
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpManager.java b/src/main/java/org/opencord/igmpproxy/IgmpManager.java
index 0bbfe6d..1cdb55b 100644
--- a/src/main/java/org/opencord/igmpproxy/IgmpManager.java
+++ b/src/main/java/org/opencord/igmpproxy/IgmpManager.java
@@ -129,6 +129,7 @@
private static final String REMOVED = "removed";
private static final String INSTALLATION = "installation";
private static final String REMOVAL = "removal";
+ private static final String NNI_PREFIX = "nni";
private static boolean pimSSmInterworking = false;
private static final String DEFAULT_PIMSSM_HOST = "127.0.0.1";
@@ -198,8 +199,6 @@
}
protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
- private static Map<DeviceId, SubscriberAndDeviceInformation> oltData = new ConcurrentHashMap<>();
-
@Activate
protected void activate() {
@@ -217,13 +216,6 @@
subsService = sadisService.getSubscriberInfoService();
- networkConfig.getSubjects(DeviceId.class).forEach(subject -> {
- SubscriberAndDeviceInformation olt = subsService.get(subject.toString());
- if (olt != null) {
- oltData.put(subject, olt);
- }
- });
-
if (connectPointMode) {
provisionConnectPointFlows();
} else {
@@ -284,19 +276,21 @@
private void processIgmpConnectPointQuery(IGMPQuery igmpQuery, ConnectPoint cp, int maxResp) {
- DeviceId deviceId = cp.deviceId();
Ip4Address gAddr = igmpQuery.getGaddr().getIp4Address();
- maxResp = calculateMaxResp(maxResp);
+ final int maxResponseTime = calculateMaxResp(maxResp);
//The query is received on the ConnectPoint
// send query accordingly to the registered OLT devices.
if (gAddr != null && !gAddr.isZero()) {
- for (DeviceId devId : oltData.keySet()) {
- StateMachine.specialQuery(devId, gAddr, maxResp);
- }
+ deviceService.getAvailableDevices().forEach(device -> {
+ Optional<SubscriberAndDeviceInformation> accessDevice = getSubscriberAndDeviceInformation(device.id());
+ if (accessDevice.isPresent()) {
+ StateMachine.specialQuery(device.id(), gAddr, maxResponseTime);
+ }
+ });
} else {
//Don't know which group is targeted by the query
//So query all the members(in all the OLTs) and proxy their reports
- StateMachine.generalQuery(maxResp);
+ StateMachine.generalQuery(maxResponseTime);
}
}
@@ -369,21 +363,29 @@
if (join) {
if (groupMember == null) {
- if (igmpType == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT) {
- groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, true);
- } else {
- groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, false);
- }
-
Optional<ConnectPoint> sourceConfigured = getSource();
if (!sourceConfigured.isPresent()) {
log.warn("Unable to process IGMP Join from {} since no source " +
"configuration is found.", deviceId);
return;
}
+
+ Optional<PortNumber> deviceUplink = getDeviceUplink(deviceId);
+ if (deviceUplink.isEmpty()) {
+ log.warn("Unable to process IGMP Join since uplink port " +
+ "of the device {} is not found.", deviceId);
+ return;
+ }
+
+ if (igmpType == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT) {
+ groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, true);
+ } else {
+ groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, false);
+ }
+
HashSet<ConnectPoint> sourceConnectPoints = Sets.newHashSet(sourceConfigured.get());
- StateMachine.join(deviceId, groupIp, srcIp);
+ StateMachine.join(deviceId, groupIp, srcIp, deviceUplink.get());
groupMemberMap.put(groupMemberKey, groupMember);
groupMember.updateList(recordType, sourceList);
groupMember.getSourceList().forEach(source -> {
@@ -471,8 +473,8 @@
short vlan = ethPkt.getVlanID();
DeviceId deviceId = pkt.receivedFrom().deviceId();
- if (oltData.get(deviceId) == null &&
- !isConnectPoint(deviceId, pkt.receivedFrom().port())) {
+ if (!isConnectPoint(deviceId, pkt.receivedFrom().port()) &&
+ !getSubscriberAndDeviceInformation(deviceId).isPresent()) {
log.error("Device not registered in netcfg :" + deviceId.toString());
return;
}
@@ -597,12 +599,34 @@
}
- public static PortNumber getDeviceUplink(DeviceId devId) {
- if (oltData.get(devId) != null) {
- return PortNumber.portNumber(oltData.get(devId).uplinkPort());
- } else {
- return null;
+ public Optional<PortNumber> getDeviceUplink(DeviceId devId) {
+ Device device = deviceService.getDevice(devId);
+ if (device == null || device.serialNumber() == null) {
+ return Optional.empty();
}
+ Optional<SubscriberAndDeviceInformation> olt = getSubscriberAndDeviceInformation(device.serialNumber());
+ if (olt.isEmpty()) {
+ return Optional.empty();
+ }
+ PortNumber portNumber = PortNumber.portNumber(olt.get().uplinkPort());
+ return validateUpLinkPort(device.id(), portNumber) ?
+ Optional.of(portNumber) : Optional.empty();
+ }
+
+ /**
+ *
+ * @param deviceId device id
+ * @param portNumber port number
+ * @return true if the port name starts with NNI_PREFIX; false otherwise.
+ */
+ public boolean validateUpLinkPort(DeviceId deviceId, PortNumber portNumber) {
+ Port port = deviceService.getPort(deviceId, portNumber);
+ if (port == null) {
+ //port is not discovered by ONOS; so cannot validate it.
+ return false;
+ }
+ return port.annotations().value(AnnotationKeys.PORT_NAME) != null &&
+ port.annotations().value(AnnotationKeys.PORT_NAME).startsWith(NNI_PREFIX);
}
public static boolean isIgmpOnPodBasis() {
@@ -659,12 +683,46 @@
return ((!connectPointMode) && getDeviceUplink(device).equals(port));
}
+ /**
+ * Fetches device information associated with the device serial number from SADIS.
+ *
+ * @param serialNumber serial number of a device
+ * @return device information; an empty Optional otherwise.
+ */
+ private Optional<SubscriberAndDeviceInformation> getSubscriberAndDeviceInformation(String serialNumber) {
+ long start = System.currentTimeMillis();
+ try {
+ return Optional.ofNullable(sadisService.getSubscriberInfoService().get(serialNumber));
+ } finally {
+ if (log.isDebugEnabled()) {
+ // SADIS can call remote systems to fetch device data and this calls can take a long time.
+ // This measurement is just for monitoring these kinds of situations.
+ log.debug("Device fetched from SADIS. Elapsed {} msec", System.currentTimeMillis() - start);
+ }
+
+ }
+ }
+
+ /**
+ * Fetches device information associated with the device serial number from SADIS.
+ *
+ * @param deviceId device id
+ * @return device information; an empty Optional otherwise.
+ */
+ private Optional<SubscriberAndDeviceInformation> getSubscriberAndDeviceInformation(DeviceId deviceId) {
+ Device device = deviceService.getDevice(deviceId);
+ if (device == null || device.serialNumber() == null) {
+ return Optional.empty();
+ }
+ return getSubscriberAndDeviceInformation(device.serialNumber());
+ }
+
private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
DeviceId devId = event.subject().id();
Port p = event.port();
- if (oltData.get(devId) == null &&
+ if (getSubscriberAndDeviceInformation(devId).isEmpty() &&
!(p != null && isConnectPoint(devId, p.number()))) {
return;
}
@@ -681,7 +739,8 @@
break;
case PORT_ADDED:
port = p.number();
- if (oltData.containsKey(devId) && !isUplink(devId, port) && !isConnectPoint(devId, port)) {
+ if (getSubscriberAndDeviceInformation(devId).isPresent() &&
+ !isUplink(devId, port) && !isConnectPoint(devId, port)) {
processFilterObjective(devId, port, false);
} else if (isUplink(devId, port)) {
provisionUplinkFlows();
@@ -691,7 +750,8 @@
break;
case PORT_UPDATED:
port = p.number();
- if (oltData.containsKey(devId) && !isUplink(devId, port) && !isConnectPoint(devId, port)) {
+ if (getSubscriberAndDeviceInformation(devId).isPresent() &&
+ !isUplink(devId, port) && !isConnectPoint(devId, port)) {
if (event.port().isEnabled()) {
processFilterObjective(devId, port, false);
} else {
@@ -847,19 +907,34 @@
return;
}
- processFilterObjective(deviceId, getDeviceUplink(deviceId), false);
+ Optional<PortNumber> upLink = getDeviceUplink(deviceId);
+ if (upLink.isPresent()) {
+ processFilterObjective(deviceId, upLink.get(), false);
+ }
}
private void provisionUplinkFlows() {
if (connectPointMode) {
return;
}
-
- oltData.keySet().forEach(deviceId -> provisionUplinkFlows(deviceId));
+ deviceService.getAvailableDevices().forEach(device -> {
+ Optional<SubscriberAndDeviceInformation> accessDevice = getSubscriberAndDeviceInformation(device.id());
+ if (accessDevice.isPresent()) {
+ provisionUplinkFlows(device.id());
+ }
+ });
}
+
private void unprovisionUplinkFlows() {
- oltData.keySet().forEach(deviceId ->
- processFilterObjective(deviceId, getDeviceUplink(deviceId), true));
+ deviceService.getAvailableDevices().forEach(device -> {
+ Optional<SubscriberAndDeviceInformation> accessDevices = getSubscriberAndDeviceInformation(device.id());
+ if (accessDevices.isPresent()) {
+ Optional<PortNumber> upLink = getDeviceUplink(device.id());
+ if (upLink.isPresent()) {
+ processFilterObjective(device.id(), upLink.get(), true);
+ }
+ }
+ });
}
private void provisionConnectPointFlows() {