VOL-2620 Optimize igmpProxy app by directly using SADIS

Change-Id: I2c67f6162d446de1fb773fe311a93972f3aa582a
diff --git a/pom.xml b/pom.xml
index 25abd3c..d5764a1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -39,7 +39,6 @@
         <onos.app.url>http://opencord.org</onos.app.url>
         <onos.app.readme>IGMP implementation.</onos.app.readme>
         <onos.app.requires>
-            org.opencord.config,
             org.onosproject.mcast
         </onos.app.requires>
         <sadis.api.version>5.0.0</sadis.api.version>
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpManager.java b/src/main/java/org/opencord/igmpproxy/IgmpManager.java
index 0bbfe6d..1cdb55b 100644
--- a/src/main/java/org/opencord/igmpproxy/IgmpManager.java
+++ b/src/main/java/org/opencord/igmpproxy/IgmpManager.java
@@ -129,6 +129,7 @@
     private static final String REMOVED = "removed";
     private static final String INSTALLATION = "installation";
     private static final String REMOVAL = "removal";
+    private static final String NNI_PREFIX = "nni";
 
     private static boolean pimSSmInterworking = false;
     private static final String DEFAULT_PIMSSM_HOST = "127.0.0.1";
@@ -198,8 +199,6 @@
     }
 
     protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
-    private static Map<DeviceId, SubscriberAndDeviceInformation> oltData = new ConcurrentHashMap<>();
-
 
     @Activate
     protected void activate() {
@@ -217,13 +216,6 @@
 
         subsService = sadisService.getSubscriberInfoService();
 
-        networkConfig.getSubjects(DeviceId.class).forEach(subject -> {
-            SubscriberAndDeviceInformation olt = subsService.get(subject.toString());
-            if (olt != null) {
-                oltData.put(subject, olt);
-            }
-        });
-
         if (connectPointMode) {
             provisionConnectPointFlows();
         } else {
@@ -284,19 +276,21 @@
 
     private void processIgmpConnectPointQuery(IGMPQuery igmpQuery, ConnectPoint cp, int maxResp) {
 
-        DeviceId deviceId = cp.deviceId();
         Ip4Address gAddr = igmpQuery.getGaddr().getIp4Address();
-        maxResp = calculateMaxResp(maxResp);
+        final int maxResponseTime = calculateMaxResp(maxResp);
         //The query is received on the ConnectPoint
         // send query accordingly to the registered OLT devices.
         if (gAddr != null && !gAddr.isZero()) {
-            for (DeviceId devId : oltData.keySet()) {
-                StateMachine.specialQuery(devId, gAddr, maxResp);
-            }
+            deviceService.getAvailableDevices().forEach(device -> {
+                Optional<SubscriberAndDeviceInformation> accessDevice = getSubscriberAndDeviceInformation(device.id());
+                if (accessDevice.isPresent()) {
+                    StateMachine.specialQuery(device.id(), gAddr, maxResponseTime);
+                }
+            });
         } else {
             //Don't know which group is targeted by the query
             //So query all the members(in all the OLTs) and proxy their reports
-            StateMachine.generalQuery(maxResp);
+            StateMachine.generalQuery(maxResponseTime);
         }
     }
 
@@ -369,21 +363,29 @@
 
         if (join) {
             if (groupMember == null) {
-                if (igmpType == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT) {
-                    groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, true);
-                } else {
-                    groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, false);
-                }
-
                 Optional<ConnectPoint> sourceConfigured = getSource();
                 if (!sourceConfigured.isPresent()) {
                     log.warn("Unable to process IGMP Join from {} since no source " +
                                      "configuration is found.", deviceId);
                     return;
                 }
+
+                Optional<PortNumber> deviceUplink = getDeviceUplink(deviceId);
+                if (deviceUplink.isEmpty()) {
+                    log.warn("Unable to process IGMP Join since uplink port " +
+                     "of the device {} is not found.", deviceId);
+                    return;
+                }
+
+                if (igmpType == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT) {
+                    groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, true);
+                } else {
+                    groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, false);
+                }
+
                 HashSet<ConnectPoint> sourceConnectPoints = Sets.newHashSet(sourceConfigured.get());
 
-                StateMachine.join(deviceId, groupIp, srcIp);
+                StateMachine.join(deviceId, groupIp, srcIp, deviceUplink.get());
                 groupMemberMap.put(groupMemberKey, groupMember);
                 groupMember.updateList(recordType, sourceList);
                 groupMember.getSourceList().forEach(source -> {
@@ -471,8 +473,8 @@
                     short vlan = ethPkt.getVlanID();
                     DeviceId deviceId = pkt.receivedFrom().deviceId();
 
-                    if (oltData.get(deviceId) == null &&
-                            !isConnectPoint(deviceId, pkt.receivedFrom().port())) {
+                    if (!isConnectPoint(deviceId, pkt.receivedFrom().port()) &&
+                            !getSubscriberAndDeviceInformation(deviceId).isPresent()) {
                         log.error("Device not registered in netcfg :" + deviceId.toString());
                         return;
                     }
@@ -597,12 +599,34 @@
 
     }
 
-    public static PortNumber getDeviceUplink(DeviceId devId) {
-        if (oltData.get(devId) != null) {
-            return PortNumber.portNumber(oltData.get(devId).uplinkPort());
-        } else {
-            return null;
+    public Optional<PortNumber> getDeviceUplink(DeviceId devId) {
+        Device device = deviceService.getDevice(devId);
+        if (device == null || device.serialNumber() == null) {
+            return Optional.empty();
         }
+        Optional<SubscriberAndDeviceInformation> olt = getSubscriberAndDeviceInformation(device.serialNumber());
+        if (olt.isEmpty()) {
+            return Optional.empty();
+        }
+        PortNumber portNumber = PortNumber.portNumber(olt.get().uplinkPort());
+        return validateUpLinkPort(device.id(), portNumber) ?
+                    Optional.of(portNumber) : Optional.empty();
+    }
+
+    /**
+     *
+     * @param deviceId device id
+     * @param portNumber port number
+     * @return true if the port name starts with NNI_PREFIX; false otherwise.
+     */
+    public boolean validateUpLinkPort(DeviceId deviceId, PortNumber portNumber) {
+        Port port = deviceService.getPort(deviceId, portNumber);
+        if (port == null) {
+            //port is not discovered by ONOS; so cannot validate it.
+            return false;
+        }
+        return port.annotations().value(AnnotationKeys.PORT_NAME) != null &&
+                port.annotations().value(AnnotationKeys.PORT_NAME).startsWith(NNI_PREFIX);
     }
 
     public static boolean isIgmpOnPodBasis() {
@@ -659,12 +683,46 @@
         return ((!connectPointMode) && getDeviceUplink(device).equals(port));
     }
 
+    /**
+     * Fetches device information associated with the device serial number from SADIS.
+     *
+     * @param serialNumber serial number of a device
+     * @return device information; an empty Optional otherwise.
+     */
+    private Optional<SubscriberAndDeviceInformation> getSubscriberAndDeviceInformation(String serialNumber) {
+        long start = System.currentTimeMillis();
+        try {
+            return Optional.ofNullable(sadisService.getSubscriberInfoService().get(serialNumber));
+        } finally {
+            if (log.isDebugEnabled()) {
+                // SADIS can call remote systems to fetch device data and this calls can take a long time.
+                // This measurement is just for monitoring these kinds of situations.
+                log.debug("Device fetched from SADIS. Elapsed {} msec", System.currentTimeMillis() - start);
+            }
+
+        }
+    }
+
+    /**
+     * Fetches device information associated with the device serial number from SADIS.
+     *
+     * @param deviceId device id
+     * @return device information; an empty Optional otherwise.
+     */
+    private Optional<SubscriberAndDeviceInformation> getSubscriberAndDeviceInformation(DeviceId deviceId) {
+        Device device = deviceService.getDevice(deviceId);
+        if (device == null || device.serialNumber() == null) {
+            return Optional.empty();
+        }
+        return getSubscriberAndDeviceInformation(device.serialNumber());
+    }
+
     private class InternalDeviceListener implements DeviceListener {
         @Override
         public void event(DeviceEvent event) {
             DeviceId devId = event.subject().id();
             Port p = event.port();
-            if (oltData.get(devId) == null &&
+            if (getSubscriberAndDeviceInformation(devId).isEmpty() &&
                     !(p != null && isConnectPoint(devId, p.number()))) {
                 return;
             }
@@ -681,7 +739,8 @@
                     break;
                 case PORT_ADDED:
                     port = p.number();
-                    if (oltData.containsKey(devId) && !isUplink(devId, port) && !isConnectPoint(devId, port)) {
+                    if (getSubscriberAndDeviceInformation(devId).isPresent() &&
+                        !isUplink(devId, port) && !isConnectPoint(devId, port)) {
                         processFilterObjective(devId, port, false);
                     } else if (isUplink(devId, port)) {
                         provisionUplinkFlows();
@@ -691,7 +750,8 @@
                     break;
                 case PORT_UPDATED:
                     port = p.number();
-                    if (oltData.containsKey(devId) && !isUplink(devId, port) && !isConnectPoint(devId, port)) {
+                    if (getSubscriberAndDeviceInformation(devId).isPresent() &&
+                        !isUplink(devId, port) && !isConnectPoint(devId, port)) {
                         if (event.port().isEnabled()) {
                             processFilterObjective(devId, port, false);
                         } else {
@@ -847,19 +907,34 @@
             return;
         }
 
-        processFilterObjective(deviceId, getDeviceUplink(deviceId), false);
+        Optional<PortNumber> upLink = getDeviceUplink(deviceId);
+        if (upLink.isPresent()) {
+            processFilterObjective(deviceId, upLink.get(), false);
+        }
     }
 
     private void provisionUplinkFlows() {
         if (connectPointMode) {
             return;
         }
-
-        oltData.keySet().forEach(deviceId -> provisionUplinkFlows(deviceId));
+        deviceService.getAvailableDevices().forEach(device -> {
+            Optional<SubscriberAndDeviceInformation> accessDevice = getSubscriberAndDeviceInformation(device.id());
+            if (accessDevice.isPresent()) {
+                provisionUplinkFlows(device.id());
+            }
+        });
     }
+
     private void unprovisionUplinkFlows() {
-        oltData.keySet().forEach(deviceId ->
-                processFilterObjective(deviceId, getDeviceUplink(deviceId), true));
+        deviceService.getAvailableDevices().forEach(device -> {
+            Optional<SubscriberAndDeviceInformation> accessDevices = getSubscriberAndDeviceInformation(device.id());
+            if (accessDevices.isPresent()) {
+                Optional<PortNumber> upLink = getDeviceUplink(device.id());
+                if (upLink.isPresent()) {
+                    processFilterObjective(device.id(), upLink.get(), true);
+                }
+            }
+        });
     }
 
     private void provisionConnectPointFlows() {
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpSender.java b/src/main/java/org/opencord/igmpproxy/IgmpSender.java
index f880646..b455cb4 100644
--- a/src/main/java/org/opencord/igmpproxy/IgmpSender.java
+++ b/src/main/java/org/opencord/igmpproxy/IgmpSender.java
@@ -195,12 +195,11 @@
         return mac;
     }
 
-    public void sendIgmpPacketUplink(Ethernet ethPkt, DeviceId deviceId) {
+    public void sendIgmpPacketUplink(Ethernet ethPkt, DeviceId deviceId, PortNumber upLinkPort) {
         if (!mastershipService.isLocalMaster(deviceId)) {
             return;
         }
 
-
         if (IgmpManager.connectPointMode) {
             if (IgmpManager.connectPoint == null) {
                 log.warn("cannot find a connectPoint to send the packet uplink");
@@ -208,8 +207,7 @@
             }
             sendIgmpPacket(ethPkt, IgmpManager.connectPoint.deviceId(), IgmpManager.connectPoint.port());
         } else {
-            PortNumber upLink = IgmpManager.getDeviceUplink(deviceId);
-            sendIgmpPacket(ethPkt, deviceId, upLink);
+            sendIgmpPacket(ethPkt, deviceId, upLinkPort);
         }
     }
 
diff --git a/src/main/java/org/opencord/igmpproxy/SingleStateMachine.java b/src/main/java/org/opencord/igmpproxy/SingleStateMachine.java
index c5aaee7..c2b32a4 100644
--- a/src/main/java/org/opencord/igmpproxy/SingleStateMachine.java
+++ b/src/main/java/org/opencord/igmpproxy/SingleStateMachine.java
@@ -18,6 +18,7 @@
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.Ip4Address;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
 
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -42,6 +43,7 @@
     private DeviceId devId;
     private Ip4Address groupIp;
     private Ip4Address srcIp;
+    private PortNumber upLinkPort;
 
     private AtomicInteger count = new AtomicInteger(DEFAULT_COUNT);
     private int timerId = IgmpTimer.INVALID_TIMER_ID;
@@ -61,10 +63,11 @@
             {nonTransition, delayTransition, idleTransition};
     private int currentState = STATE_NON;
 
-    public SingleStateMachine(DeviceId devId, Ip4Address groupIp, Ip4Address src) {
+    public SingleStateMachine(DeviceId devId, Ip4Address groupIp, Ip4Address src, PortNumber upLinkPort) {
         this.devId = devId;
         this.groupIp = groupIp;
         this.srcIp = src;
+        this.upLinkPort = upLinkPort;
     }
 
     public Ip4Address getGroupIp() {
@@ -137,7 +140,7 @@
         public void leave(boolean messageOutAllowed) {
             if (messageOutAllowed) {
                 Ethernet eth = IgmpSender.getInstance().buildIgmpV3Leave(groupIp, srcIp);
-                IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId);
+                IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId, upLinkPort);
             }
         }
 
@@ -153,7 +156,7 @@
         public void join(boolean messageOutAllowed) {
             if (messageOutAllowed) {
                 Ethernet eth = IgmpSender.getInstance().buildIgmpV3Join(groupIp, srcIp);
-                IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId);
+                IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId, upLinkPort);
                 timeOut = getTimeOut(IgmpManager.getUnsolicitedTimeout());
                 timerId = IgmpTimer.start(SingleStateMachine.this, timeOut);
             }
@@ -171,7 +174,7 @@
         public void timeOut() {
             if (sendQuery) {
                 Ethernet eth = IgmpSender.getInstance().buildIgmpV3ResponseQuery(groupIp, srcIp);
-                IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId);
+                IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId, upLinkPort);
                 timeOut = DEFAULT_MAX_RESP;
             }
         }
diff --git a/src/main/java/org/opencord/igmpproxy/StateMachine.java b/src/main/java/org/opencord/igmpproxy/StateMachine.java
index 391bdee..1097560 100644
--- a/src/main/java/org/opencord/igmpproxy/StateMachine.java
+++ b/src/main/java/org/opencord/igmpproxy/StateMachine.java
@@ -18,6 +18,8 @@
 import com.google.common.collect.Maps;
 import org.onlab.packet.Ip4Address;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+
 import java.util.Map;
 import java.util.Set;
 
@@ -52,11 +54,11 @@
         map.remove(getId(devId, groupIp));
     }
 
-    public static boolean join(DeviceId devId, Ip4Address groupIp, Ip4Address srcIP) {
+    public static boolean join(DeviceId devId, Ip4Address groupIp, Ip4Address srcIP, PortNumber upLinkPort) {
         SingleStateMachine machine = get(devId, groupIp);
 
         if (null == machine) {
-            machine = new SingleStateMachine(devId, groupIp, srcIP);
+            machine = new SingleStateMachine(devId, groupIp, srcIP, upLinkPort);
             map.put(getId(devId, groupIp), machine);
 
             boolean shouldSendJoin = true;
diff --git a/src/test/java/org/opencord/igmpproxy/IgmpManagerBase.java b/src/test/java/org/opencord/igmpproxy/IgmpManagerBase.java
index 82d342c..43cc24a 100644
--- a/src/test/java/org/opencord/igmpproxy/IgmpManagerBase.java
+++ b/src/test/java/org/opencord/igmpproxy/IgmpManagerBase.java
@@ -31,6 +31,7 @@
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DefaultAnnotations;
 import org.onosproject.net.DefaultDevice;
+import org.onosproject.net.DefaultPort;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.HostId;
@@ -81,6 +82,7 @@
     // Uplink ports for two olts A and B
     protected static final PortNumber PORT_A = PortNumber.portNumber(1);
     protected static final PortNumber PORT_B = PortNumber.portNumber(2);
+    protected static final PortNumber PORT_NNI = PortNumber.portNumber(65536);
 
     // Connect Point mode for two olts
     protected static final ConnectPoint CONNECT_POINT_A = new ConnectPoint(DEVICE_ID_OF_A, PORT_A);
@@ -90,6 +92,8 @@
     protected static final String CLIENT_CIRCUIT_ID = "CIR-PON 1/1";
     protected String dsBpId = "HSIA-DS";
 
+    private static final String NNI_PREFIX = "nni";
+
     protected List<Port> lsPorts = new ArrayList<Port>();
     // Flag for adding two different devices in oltData
     protected boolean flagForDevice = true;
@@ -126,6 +130,17 @@
         public List<Port> getPorts(DeviceId deviceId) {
             return lsPorts;
         }
+
+        @Override
+        public Port getPort(DeviceId deviceId, PortNumber portNumber) {
+            if (portNumber.equals(PORT_NNI)) {
+                DefaultAnnotations.Builder annotationsBuilder = DefaultAnnotations.builder()
+                        .set(AnnotationKeys.PORT_NAME, NNI_PREFIX);
+                Port nni = new DefaultPort(null, portNumber, true, annotationsBuilder.build());
+                return nni;
+            }
+            return super.getPort(deviceId, portNumber);
+        }
     }
 
     static final Class<IgmpproxyConfig> IGMPPROXY_CONFIG_CLASS = IgmpproxyConfig.class;
@@ -465,6 +480,7 @@
             this.setIPAddress(ipAddress);
             this.setNasPortId(nasPortId);
             this.setCircuitId(circuitId);
+            this.setUplinkPort((int) PORT_NNI.toLong());
         }
     }