VOL-2620 Optimize igmpProxy app by directly using SADIS

Change-Id: I2c67f6162d446de1fb773fe311a93972f3aa582a
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpManager.java b/src/main/java/org/opencord/igmpproxy/IgmpManager.java
index 0bbfe6d..1cdb55b 100644
--- a/src/main/java/org/opencord/igmpproxy/IgmpManager.java
+++ b/src/main/java/org/opencord/igmpproxy/IgmpManager.java
@@ -129,6 +129,7 @@
     private static final String REMOVED = "removed";
     private static final String INSTALLATION = "installation";
     private static final String REMOVAL = "removal";
+    private static final String NNI_PREFIX = "nni";
 
     private static boolean pimSSmInterworking = false;
     private static final String DEFAULT_PIMSSM_HOST = "127.0.0.1";
@@ -198,8 +199,6 @@
     }
 
     protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
-    private static Map<DeviceId, SubscriberAndDeviceInformation> oltData = new ConcurrentHashMap<>();
-
 
     @Activate
     protected void activate() {
@@ -217,13 +216,6 @@
 
         subsService = sadisService.getSubscriberInfoService();
 
-        networkConfig.getSubjects(DeviceId.class).forEach(subject -> {
-            SubscriberAndDeviceInformation olt = subsService.get(subject.toString());
-            if (olt != null) {
-                oltData.put(subject, olt);
-            }
-        });
-
         if (connectPointMode) {
             provisionConnectPointFlows();
         } else {
@@ -284,19 +276,21 @@
 
     private void processIgmpConnectPointQuery(IGMPQuery igmpQuery, ConnectPoint cp, int maxResp) {
 
-        DeviceId deviceId = cp.deviceId();
         Ip4Address gAddr = igmpQuery.getGaddr().getIp4Address();
-        maxResp = calculateMaxResp(maxResp);
+        final int maxResponseTime = calculateMaxResp(maxResp);
         //The query is received on the ConnectPoint
         // send query accordingly to the registered OLT devices.
         if (gAddr != null && !gAddr.isZero()) {
-            for (DeviceId devId : oltData.keySet()) {
-                StateMachine.specialQuery(devId, gAddr, maxResp);
-            }
+            deviceService.getAvailableDevices().forEach(device -> {
+                Optional<SubscriberAndDeviceInformation> accessDevice = getSubscriberAndDeviceInformation(device.id());
+                if (accessDevice.isPresent()) {
+                    StateMachine.specialQuery(device.id(), gAddr, maxResponseTime);
+                }
+            });
         } else {
             //Don't know which group is targeted by the query
             //So query all the members(in all the OLTs) and proxy their reports
-            StateMachine.generalQuery(maxResp);
+            StateMachine.generalQuery(maxResponseTime);
         }
     }
 
@@ -369,21 +363,29 @@
 
         if (join) {
             if (groupMember == null) {
-                if (igmpType == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT) {
-                    groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, true);
-                } else {
-                    groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, false);
-                }
-
                 Optional<ConnectPoint> sourceConfigured = getSource();
                 if (!sourceConfigured.isPresent()) {
                     log.warn("Unable to process IGMP Join from {} since no source " +
                                      "configuration is found.", deviceId);
                     return;
                 }
+
+                Optional<PortNumber> deviceUplink = getDeviceUplink(deviceId);
+                if (deviceUplink.isEmpty()) {
+                    log.warn("Unable to process IGMP Join since uplink port " +
+                     "of the device {} is not found.", deviceId);
+                    return;
+                }
+
+                if (igmpType == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT) {
+                    groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, true);
+                } else {
+                    groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, false);
+                }
+
                 HashSet<ConnectPoint> sourceConnectPoints = Sets.newHashSet(sourceConfigured.get());
 
-                StateMachine.join(deviceId, groupIp, srcIp);
+                StateMachine.join(deviceId, groupIp, srcIp, deviceUplink.get());
                 groupMemberMap.put(groupMemberKey, groupMember);
                 groupMember.updateList(recordType, sourceList);
                 groupMember.getSourceList().forEach(source -> {
@@ -471,8 +473,8 @@
                     short vlan = ethPkt.getVlanID();
                     DeviceId deviceId = pkt.receivedFrom().deviceId();
 
-                    if (oltData.get(deviceId) == null &&
-                            !isConnectPoint(deviceId, pkt.receivedFrom().port())) {
+                    if (!isConnectPoint(deviceId, pkt.receivedFrom().port()) &&
+                            !getSubscriberAndDeviceInformation(deviceId).isPresent()) {
                         log.error("Device not registered in netcfg :" + deviceId.toString());
                         return;
                     }
@@ -597,12 +599,34 @@
 
     }
 
-    public static PortNumber getDeviceUplink(DeviceId devId) {
-        if (oltData.get(devId) != null) {
-            return PortNumber.portNumber(oltData.get(devId).uplinkPort());
-        } else {
-            return null;
+    public Optional<PortNumber> getDeviceUplink(DeviceId devId) {
+        Device device = deviceService.getDevice(devId);
+        if (device == null || device.serialNumber() == null) {
+            return Optional.empty();
         }
+        Optional<SubscriberAndDeviceInformation> olt = getSubscriberAndDeviceInformation(device.serialNumber());
+        if (olt.isEmpty()) {
+            return Optional.empty();
+        }
+        PortNumber portNumber = PortNumber.portNumber(olt.get().uplinkPort());
+        return validateUpLinkPort(device.id(), portNumber) ?
+                    Optional.of(portNumber) : Optional.empty();
+    }
+
+    /**
+     *
+     * @param deviceId device id
+     * @param portNumber port number
+     * @return true if the port name starts with NNI_PREFIX; false otherwise.
+     */
+    public boolean validateUpLinkPort(DeviceId deviceId, PortNumber portNumber) {
+        Port port = deviceService.getPort(deviceId, portNumber);
+        if (port == null) {
+            //port is not discovered by ONOS; so cannot validate it.
+            return false;
+        }
+        return port.annotations().value(AnnotationKeys.PORT_NAME) != null &&
+                port.annotations().value(AnnotationKeys.PORT_NAME).startsWith(NNI_PREFIX);
     }
 
     public static boolean isIgmpOnPodBasis() {
@@ -659,12 +683,46 @@
         return ((!connectPointMode) && getDeviceUplink(device).equals(port));
     }
 
+    /**
+     * Fetches device information associated with the device serial number from SADIS.
+     *
+     * @param serialNumber serial number of a device
+     * @return device information; an empty Optional otherwise.
+     */
+    private Optional<SubscriberAndDeviceInformation> getSubscriberAndDeviceInformation(String serialNumber) {
+        long start = System.currentTimeMillis();
+        try {
+            return Optional.ofNullable(sadisService.getSubscriberInfoService().get(serialNumber));
+        } finally {
+            if (log.isDebugEnabled()) {
+                // SADIS can call remote systems to fetch device data and this calls can take a long time.
+                // This measurement is just for monitoring these kinds of situations.
+                log.debug("Device fetched from SADIS. Elapsed {} msec", System.currentTimeMillis() - start);
+            }
+
+        }
+    }
+
+    /**
+     * Fetches device information associated with the device serial number from SADIS.
+     *
+     * @param deviceId device id
+     * @return device information; an empty Optional otherwise.
+     */
+    private Optional<SubscriberAndDeviceInformation> getSubscriberAndDeviceInformation(DeviceId deviceId) {
+        Device device = deviceService.getDevice(deviceId);
+        if (device == null || device.serialNumber() == null) {
+            return Optional.empty();
+        }
+        return getSubscriberAndDeviceInformation(device.serialNumber());
+    }
+
     private class InternalDeviceListener implements DeviceListener {
         @Override
         public void event(DeviceEvent event) {
             DeviceId devId = event.subject().id();
             Port p = event.port();
-            if (oltData.get(devId) == null &&
+            if (getSubscriberAndDeviceInformation(devId).isEmpty() &&
                     !(p != null && isConnectPoint(devId, p.number()))) {
                 return;
             }
@@ -681,7 +739,8 @@
                     break;
                 case PORT_ADDED:
                     port = p.number();
-                    if (oltData.containsKey(devId) && !isUplink(devId, port) && !isConnectPoint(devId, port)) {
+                    if (getSubscriberAndDeviceInformation(devId).isPresent() &&
+                        !isUplink(devId, port) && !isConnectPoint(devId, port)) {
                         processFilterObjective(devId, port, false);
                     } else if (isUplink(devId, port)) {
                         provisionUplinkFlows();
@@ -691,7 +750,8 @@
                     break;
                 case PORT_UPDATED:
                     port = p.number();
-                    if (oltData.containsKey(devId) && !isUplink(devId, port) && !isConnectPoint(devId, port)) {
+                    if (getSubscriberAndDeviceInformation(devId).isPresent() &&
+                        !isUplink(devId, port) && !isConnectPoint(devId, port)) {
                         if (event.port().isEnabled()) {
                             processFilterObjective(devId, port, false);
                         } else {
@@ -847,19 +907,34 @@
             return;
         }
 
-        processFilterObjective(deviceId, getDeviceUplink(deviceId), false);
+        Optional<PortNumber> upLink = getDeviceUplink(deviceId);
+        if (upLink.isPresent()) {
+            processFilterObjective(deviceId, upLink.get(), false);
+        }
     }
 
     private void provisionUplinkFlows() {
         if (connectPointMode) {
             return;
         }
-
-        oltData.keySet().forEach(deviceId -> provisionUplinkFlows(deviceId));
+        deviceService.getAvailableDevices().forEach(device -> {
+            Optional<SubscriberAndDeviceInformation> accessDevice = getSubscriberAndDeviceInformation(device.id());
+            if (accessDevice.isPresent()) {
+                provisionUplinkFlows(device.id());
+            }
+        });
     }
+
     private void unprovisionUplinkFlows() {
-        oltData.keySet().forEach(deviceId ->
-                processFilterObjective(deviceId, getDeviceUplink(deviceId), true));
+        deviceService.getAvailableDevices().forEach(device -> {
+            Optional<SubscriberAndDeviceInformation> accessDevices = getSubscriberAndDeviceInformation(device.id());
+            if (accessDevices.isPresent()) {
+                Optional<PortNumber> upLink = getDeviceUplink(device.id());
+                if (upLink.isPresent()) {
+                    processFilterObjective(device.id(), upLink.get(), true);
+                }
+            }
+        });
     }
 
     private void provisionConnectPointFlows() {