VOL-2620 Optimize igmpProxy-1.2 app by directly using SADIS

Change-Id: I2c0c7480866b65e6c2bd96ef636e16643aa0ec88
diff --git a/pom.xml b/pom.xml
index 1cde2c4..e91abd7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -41,10 +41,9 @@
         <onos.app.url>http://opencord.org</onos.app.url>
         <onos.app.readme>IGMP implementation.</onos.app.readme>
         <onos.app.requires>
-            org.opencord.config,
             org.onosproject.mcast
         </onos.app.requires>
-        <cord.config.version>1.5.0-SNAPSHOT</cord.config.version>
+        <sadis.api.version>3.1.1</sadis.api.version>
     </properties>
 
 
@@ -96,8 +95,9 @@
 
         <dependency>
             <groupId>org.opencord</groupId>
-            <artifactId>cord-config</artifactId>
-            <version>${cord.config.version}</version>
+            <artifactId>sadis-api</artifactId>
+            <version>${sadis.api.version}</version>
+            <scope>provided</scope>
         </dependency>
 
         <dependency>
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpManager.java b/src/main/java/org/opencord/igmpproxy/IgmpManager.java
index 7c5bfaf..d57c731 100644
--- a/src/main/java/org/opencord/igmpproxy/IgmpManager.java
+++ b/src/main/java/org/opencord/igmpproxy/IgmpManager.java
@@ -17,6 +17,10 @@
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import org.onosproject.net.Device;
+import org.opencord.sadis.BaseInformationService;
+import org.opencord.sadis.SadisService;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -64,8 +68,6 @@
 import org.onosproject.net.packet.PacketContext;
 import org.onosproject.net.packet.PacketProcessor;
 import org.onosproject.net.packet.PacketService;
-import org.opencord.cordconfig.access.AccessDeviceConfig;
-import org.opencord.cordconfig.access.AccessDeviceData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,7 +75,6 @@
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -94,8 +95,8 @@
 @Component(immediate = true)
 public class IgmpManager {
 
-    private static final Class<AccessDeviceConfig> CONFIG_CLASS =
-            AccessDeviceConfig.class;
+    private static final String APP_NAME = "org.opencord.igmpproxy";
+
     private static final Class<IgmpproxyConfig> IGMPPROXY_CONFIG_CLASS =
             IgmpproxyConfig.class;
     private static final Class<IgmpproxySsmTranslateConfig> IGMPPROXY_SSM_CONFIG_CLASS =
@@ -105,7 +106,7 @@
 
     public static Map<String, GroupMember> groupMemberMap = Maps.newConcurrentMap();
     private static ApplicationId appId;
-    private static Map<DeviceId, AccessDeviceData> oltData = new ConcurrentHashMap<>();
+
     private static int unSolicitedTimeout = 3; // unit is 1 sec
     private static int keepAliveCount = 3;
     private static int lastQueryInterval = 2;  //unit is 1 sec
@@ -127,6 +128,7 @@
     private static final String REMOVED = "removed";
     private static final String INSTALLATION = "installation";
     private static final String REMOVAL = "removal";
+    private static final String NNI_PREFIX = "nni";
 
     private static boolean pimSSmInterworking = false;
     private static final String DEFAULT_PIMSSM_HOST = "127.0.0.1";
@@ -148,6 +150,10 @@
     protected NetworkConfigRegistry networkConfig;
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected MulticastRouteService multicastService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected SadisService sadisService;
+
     private IgmpPacketProcessor processor = new IgmpPacketProcessor();
     private Logger log = LoggerFactory.getLogger(getClass());
     private ApplicationId coreAppId;
@@ -156,7 +162,7 @@
     private InternalNetworkConfigListener configListener =
             new InternalNetworkConfigListener();
     private DeviceListener deviceListener = new InternalDeviceListener();
-    private ConfigFactory<DeviceId, AccessDeviceConfig> configFactory = null;
+
     private ConfigFactory<ApplicationId, IgmpproxyConfig> igmpproxyConfigFactory =
             new ConfigFactory<ApplicationId, IgmpproxyConfig>(
                     SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_CONFIG_CLASS, "igmpproxy") {
@@ -183,24 +189,15 @@
         return unSolicitedTimeout;
     }
 
+    protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
+
     @Activate
     protected void activate() {
-        appId = coreService.registerApplication("org.opencord.igmpproxy");
+        appId = coreService.registerApplication(APP_NAME);
         coreAppId = coreService.registerApplication(CoreService.CORE_APP_NAME);
         packetService.addProcessor(processor, PacketProcessor.director(4));
         IgmpSender.init(packetService, mastershipService);
 
-        if (networkConfig.getConfigFactory(CONFIG_CLASS) == null) {
-            configFactory =
-                    new ConfigFactory<DeviceId, AccessDeviceConfig>(
-                            SubjectFactories.DEVICE_SUBJECT_FACTORY, CONFIG_CLASS, "accessDevice") {
-                        @Override
-                        public AccessDeviceConfig createConfig() {
-                            return new AccessDeviceConfig();
-                        }
-                    };
-            networkConfig.registerConfigFactory(configFactory);
-        }
         networkConfig.registerConfigFactory(igmpproxySsmConfigFactory);
         networkConfig.registerConfigFactory(igmpproxyConfigFactory);
         networkConfig.addListener(configListener);
@@ -208,18 +205,8 @@
         configListener.reconfigureNetwork(networkConfig.getConfig(appId, IGMPPROXY_CONFIG_CLASS));
         configListener.reconfigureSsmTable(networkConfig.getConfig(appId, IGMPPROXY_SSM_CONFIG_CLASS));
 
-        networkConfig.getSubjects(DeviceId.class, AccessDeviceConfig.class).forEach(
-                subject -> {
-                    AccessDeviceConfig config = networkConfig.getConfig(subject,
-                            AccessDeviceConfig.class);
-                    if (config != null) {
-                        AccessDeviceData data = config.getAccessDevice();
-                        oltData.put(data.deviceId(), data);
-                    }
-                }
-        );
+        subsService = sadisService.getSubscriberInfoService();
 
-        oltData.keySet().forEach(d -> provisionDefaultFlows(d));
         if (connectPointMode) {
             provisionConnectPointFlows();
         } else {
@@ -246,9 +233,6 @@
 
         // de-register and null our handler
         networkConfig.removeListener(configListener);
-        if (configFactory != null) {
-            networkConfig.unregisterConfigFactory(configFactory);
-        }
         networkConfig.unregisterConfigFactory(igmpproxyConfigFactory);
         networkConfig.unregisterConfigFactory(igmpproxySsmConfigFactory);
         deviceService.removeListener(deviceListener);
@@ -283,19 +267,21 @@
 
     private void processIgmpConnectPointQuery(IGMPQuery igmpQuery, ConnectPoint cp, int maxResp) {
 
-        DeviceId deviceId = cp.deviceId();
         Ip4Address gAddr = igmpQuery.getGaddr().getIp4Address();
-        maxResp = calculateMaxResp(maxResp);
+        final int maxResponseTime = calculateMaxResp(maxResp);
         //The query is received on the ConnectPoint
         // send query accordingly to the registered OLT devices.
         if (gAddr != null && !gAddr.isZero()) {
-            for (DeviceId devId : oltData.keySet()) {
-                StateMachine.specialQuery(devId, gAddr, maxResp);
-            }
+            deviceService.getAvailableDevices().forEach(device -> {
+                Optional<SubscriberAndDeviceInformation> accessDevice = getSubscriberAndDeviceInformation(device.id());
+                if (accessDevice.isPresent()) {
+                    StateMachine.specialQuery(device.id(), gAddr, maxResponseTime);
+                }
+            });
         } else {
             //Don't know which group is targeted by the query
             //So query all the members(in all the OLTs) and proxy their reports
-            StateMachine.generalQuery(maxResp);
+            StateMachine.generalQuery(maxResponseTime);
         }
     }
 
@@ -368,21 +354,29 @@
 
         if (join) {
             if (groupMember == null) {
-                if (igmpType == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT) {
-                    groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, true);
-                } else {
-                    groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, false);
-                }
-
                 Optional<ConnectPoint> sourceConfigured = getSource();
                 if (!sourceConfigured.isPresent()) {
                     log.warn("Unable to process IGMP Join from {} since no source " +
                                      "configuration is found.", deviceId);
                     return;
                 }
+
+                Optional<PortNumber> deviceUplink = getDeviceUplink(deviceId);
+                if (!deviceUplink.isPresent()) {
+                    log.warn("Unable to process IGMP Join since uplink port " +
+                     "of the device {} is not found.", deviceId);
+                    return;
+                }
+
+                if (igmpType == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT) {
+                    groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, true);
+                } else {
+                    groupMember = new GroupMember(groupIp, vlan, deviceId, portNumber, false);
+                }
+
                 HashSet<ConnectPoint> sourceConnectPoints = Sets.newHashSet(sourceConfigured.get());
 
-                StateMachine.join(deviceId, groupIp, srcIp);
+                StateMachine.join(deviceId, groupIp, srcIp, deviceUplink.get());
                 groupMemberMap.put(groupMemberKey, groupMember);
                 groupMember.updateList(recordType, sourceList);
                 groupMember.getSourceList().forEach(source -> {
@@ -450,86 +444,96 @@
         @Override
         public void process(PacketContext context) {
             eventExecutor.execute(() -> {
-                InboundPacket pkt = context.inPacket();
-                Ethernet ethPkt = pkt.parsed();
-                if (ethPkt == null) {
-                    return;
-                }
+                try {
+                    InboundPacket pkt = context.inPacket();
+                    Ethernet ethPkt = pkt.parsed();
+                    if (ethPkt == null) {
+                        return;
+                    }
 
-                if (ethPkt.getEtherType() != Ethernet.TYPE_IPV4) {
-                    return;
-                }
+                    if (ethPkt.getEtherType() != Ethernet.TYPE_IPV4) {
+                        return;
+                    }
 
-                IPv4 ipv4Pkt = (IPv4) ethPkt.getPayload();
+                    IPv4 ipv4Pkt = (IPv4) ethPkt.getPayload();
 
-                if (ipv4Pkt.getProtocol() != IPv4.PROTOCOL_IGMP) {
-                    return;
-                }
+                    if (ipv4Pkt.getProtocol() != IPv4.PROTOCOL_IGMP) {
+                        return;
+                    }
 
-                short vlan = ethPkt.getVlanID();
-                DeviceId deviceId = pkt.receivedFrom().deviceId();
+                    short vlan = ethPkt.getVlanID();
+                    DeviceId deviceId = pkt.receivedFrom().deviceId();
 
-                if (oltData.get(deviceId) == null &&
-                        !isConnectPoint(deviceId, pkt.receivedFrom().port())) {
-                    log.error("Device not registered in netcfg :" + deviceId.toString());
-                    return;
-                }
-                IGMP igmp = (IGMP) ipv4Pkt.getPayload();
-                switch (igmp.getIgmpType()) {
-                    case IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY:
-                        //Discard Query from OLT’s non-uplink port’s
-                        if (!pkt.receivedFrom().port().equals(getDeviceUplink(deviceId))) {
-                            if (isConnectPoint(deviceId, pkt.receivedFrom().port())) {
-                                log.info("IGMP Picked up query from connectPoint");
-                                //OK to process packet
-                                processIgmpConnectPointQuery((IGMPQuery) igmp.getGroups().get(0),
-                                                             pkt.receivedFrom(),
-                                                             0xff & igmp.getMaxRespField());
-                                break;
-                            } else {
-                                //Not OK to process packet
-                                log.warn("IGMP Picked up query from non-uplink port");
+                    if (!isConnectPoint(deviceId, pkt.receivedFrom().port()) &&
+                            !getSubscriberAndDeviceInformation(deviceId).isPresent()) {
+                        log.error("Device not registered in netcfg :" + deviceId.toString());
+                        return;
+                    }
+
+                    IGMP igmp = (IGMP) ipv4Pkt.getPayload();
+
+                    Optional<PortNumber> deviceUpLinkOpt = getDeviceUplink(deviceId);
+                    PortNumber upLinkPort =  deviceUpLinkOpt.isPresent() ? deviceUpLinkOpt.get() : null;
+                    switch (igmp.getIgmpType()) {
+                        case IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY:
+                            //Discard Query from OLT’s non-uplink port’s
+                            if (!pkt.receivedFrom().port().equals(upLinkPort)) {
+                                if (isConnectPoint(deviceId, pkt.receivedFrom().port())) {
+                                    log.info("IGMP Picked up query from connectPoint");
+                                    //OK to process packet
+                                    processIgmpConnectPointQuery((IGMPQuery) igmp.getGroups().get(0),
+                                            pkt.receivedFrom(),
+                                            0xff & igmp.getMaxRespField());
+                                    break;
+                                } else {
+                                    //Not OK to process packet
+                                    log.warn("IGMP Picked up query from non-uplink port {}", upLinkPort);
+                                    return;
+                                }
+                            }
+
+                            processIgmpQuery((IGMPQuery) igmp.getGroups().get(0), pkt.receivedFrom(),
+                                    0xff & igmp.getMaxRespField());
+                            break;
+                        case IGMP.TYPE_IGMPV1_MEMBERSHIP_REPORT:
+                            log.debug("IGMP version 1  message types are not currently supported.");
+                            break;
+                        case IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT:
+                        case IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT:
+                        case IGMP.TYPE_IGMPV2_LEAVE_GROUP:
+                            //Discard join/leave from OLT’s uplink port’s
+                            if (pkt.receivedFrom().port().equals(upLinkPort) ||
+                                    isConnectPoint(deviceId, pkt.receivedFrom().port())) {
+                                log.info("IGMP Picked up join/leave from uplink/connectPoint port");
                                 return;
                             }
-                        }
 
-                        processIgmpQuery((IGMPQuery) igmp.getGroups().get(0), pkt.receivedFrom(),
-                                         0xff & igmp.getMaxRespField());
-                        break;
-                    case IGMP.TYPE_IGMPV1_MEMBERSHIP_REPORT:
-                        log.debug("IGMP version 1  message types are not currently supported.");
-                        break;
-                    case IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT:
-                    case IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT:
-                    case IGMP.TYPE_IGMPV2_LEAVE_GROUP:
-                        //Discard join/leave from OLT’s uplink port’s
-                        if (pkt.receivedFrom().port().equals(getDeviceUplink(deviceId)) ||
-                                isConnectPoint(deviceId, pkt.receivedFrom().port())) {
-                            log.info("IGMP Picked up join/leave from uplink/connectPoint port");
-                            return;
-                        }
-
-                        Iterator<IGMPGroup> itr = igmp.getGroups().iterator();
-                        while (itr.hasNext()) {
-                            IGMPGroup group = itr.next();
-                            if (group instanceof IGMPMembership) {
-                                processIgmpReport((IGMPMembership) group, VlanId.vlanId(vlan),
-                                                  pkt.receivedFrom(), igmp.getIgmpType());
-                            } else if (group instanceof IGMPQuery) {
-                                IGMPMembership mgroup;
-                                mgroup = new IGMPMembership(group.getGaddr().getIp4Address());
-                                mgroup.setRecordType(igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT ?
-                                                             IGMPMembership.MODE_IS_EXCLUDE :
-                                                             IGMPMembership.MODE_IS_INCLUDE);
-                                processIgmpReport(mgroup, VlanId.vlanId(vlan),
-                                                  pkt.receivedFrom(), igmp.getIgmpType());
+                            Iterator<IGMPGroup> itr = igmp.getGroups().iterator();
+                            while (itr.hasNext()) {
+                                IGMPGroup group = itr.next();
+                                if (group instanceof IGMPMembership) {
+                                    processIgmpReport((IGMPMembership) group, VlanId.vlanId(vlan),
+                                            pkt.receivedFrom(), igmp.getIgmpType());
+                                } else if (group instanceof IGMPQuery) {
+                                    IGMPMembership mgroup;
+                                    mgroup = new IGMPMembership(group.getGaddr().getIp4Address());
+                                    mgroup.setRecordType(igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT ?
+                                            IGMPMembership.MODE_IS_EXCLUDE :
+                                            IGMPMembership.MODE_IS_INCLUDE);
+                                    processIgmpReport(mgroup, VlanId.vlanId(vlan),
+                                            pkt.receivedFrom(), igmp.getIgmpType());
+                                }
                             }
-                        }
-                        break;
+                            break;
 
-                    default:
-                        log.info("wrong IGMP v3 type:" + igmp.getIgmpType());
-                        break;
+                        default:
+                            log.info("wrong IGMP v3 type:" + igmp.getIgmpType());
+                            break;
+                    }
+
+                } catch (Exception ex) {
+                    log.error("igmp process error : {} ", ex);
+                    ex.printStackTrace();
                 }
             });
         }
@@ -589,12 +593,40 @@
 
     }
 
-    public static PortNumber getDeviceUplink(DeviceId devId) {
-        if (oltData.get(devId) != null) {
-            return oltData.get(devId).uplink();
-        } else {
-            return null;
+    public Optional<PortNumber> getDeviceUplink(DeviceId devId) {
+        Device device = deviceService.getDevice(devId);
+        if (device == null || device.serialNumber() == null) {
+            return Optional.empty();
         }
+        Optional<SubscriberAndDeviceInformation> olt = getSubscriberAndDeviceInformation(device.serialNumber());
+
+        if (!olt.isPresent()) {
+            return Optional.empty();
+        }
+        PortNumber portNumber = PortNumber.portNumber(olt.get().uplinkPort());
+        return validateUpLinkPort(device.id(), portNumber) ?
+                    Optional.of(portNumber) : Optional.empty();
+    }
+
+    /**
+     *
+     * @param deviceId device id
+     * @param portNumber port number
+     * @return true if the port name starts with NNI_PREFIX; false otherwise.
+     */
+    public boolean validateUpLinkPort(DeviceId deviceId, PortNumber portNumber) {
+        Port port = deviceService.getPort(deviceId, portNumber);
+        if (port == null) {
+            //port is not discovered by ONOS; so cannot validate it.
+            return false;
+        }
+        boolean isValid = port.annotations().value(AnnotationKeys.PORT_NAME) != null &&
+                            port.annotations().value(AnnotationKeys.PORT_NAME).startsWith(NNI_PREFIX);
+        if (!isValid) {
+            log.warn("Port cannot be validated; it is not configured as an NNI port." +
+                    "Device/port: {}/{}", deviceId, portNumber);
+        }
+        return isValid;
     }
 
     public static boolean isIgmpOnPodBasis() {
@@ -648,8 +680,45 @@
     }
 
     private boolean isUplink(DeviceId device, PortNumber port) {
-        return ((!connectPointMode) && oltData.containsKey(device)
-                && oltData.get(device).uplink().equals(port));
+        if (connectPointMode) {
+            return false;
+        }
+        Optional<PortNumber> upLinkPort = getDeviceUplink(device);
+        return upLinkPort.isPresent() && upLinkPort.get().equals(port);
+    }
+
+    /**
+     * Fetches device information associated with the device serial number from SADIS.
+     *
+     * @param serialNumber serial number of a device
+     * @return device information; an empty Optional otherwise.
+     */
+    private Optional<SubscriberAndDeviceInformation> getSubscriberAndDeviceInformation(String serialNumber) {
+        long start = System.currentTimeMillis();
+        try {
+            return Optional.ofNullable(sadisService.getSubscriberInfoService().get(serialNumber));
+        } finally {
+            if (log.isDebugEnabled()) {
+                // SADIS can call remote systems to fetch device data and this calls can take a long time.
+                // This measurement is just for monitoring these kinds of situations.
+                log.debug("Device fetched from SADIS. Elapsed {} msec", System.currentTimeMillis() - start);
+            }
+
+        }
+    }
+
+    /**
+     * Fetches device information associated with the device serial number from SADIS.
+     *
+     * @param deviceId device id
+     * @return device information; an empty Optional otherwise.
+     */
+    private Optional<SubscriberAndDeviceInformation> getSubscriberAndDeviceInformation(DeviceId deviceId) {
+        Device device = deviceService.getDevice(deviceId);
+        if (device == null || device.serialNumber() == null) {
+            return Optional.empty();
+        }
+        return getSubscriberAndDeviceInformation(device.serialNumber());
     }
 
     private class InternalDeviceListener implements DeviceListener {
@@ -657,7 +726,7 @@
         public void event(DeviceEvent event) {
             DeviceId devId = event.subject().id();
             Port p = event.port();
-            if (oltData.get(devId) == null &&
+            if (!getSubscriberAndDeviceInformation(devId).isPresent() &&
                     !(p != null && isConnectPoint(devId, p.number()))) {
                 return;
             }
@@ -674,7 +743,8 @@
                     break;
                 case PORT_ADDED:
                     port = p.number();
-                    if (oltData.containsKey(devId) && !isUplink(devId, port) && !isConnectPoint(devId, port)) {
+                    if (getSubscriberAndDeviceInformation(devId).isPresent() &&
+                        !isUplink(devId, port) && !isConnectPoint(devId, port)) {
                         processFilterObjective(devId, port, false);
                     } else if (isUplink(devId, port)) {
                         provisionUplinkFlows();
@@ -684,7 +754,8 @@
                     break;
                 case PORT_UPDATED:
                     port = p.number();
-                    if (oltData.containsKey(devId) && !isUplink(devId, port) && !isConnectPoint(devId, port)) {
+                    if (getSubscriberAndDeviceInformation(devId).isPresent() &&
+                        !isUplink(devId, port) && !isConnectPoint(devId, port)) {
                         if (event.port().isEnabled()) {
                             processFilterObjective(devId, port, false);
                         } else {
@@ -788,15 +859,6 @@
             switch (event.type()) {
                 case CONFIG_ADDED:
                 case CONFIG_UPDATED:
-                    if (event.configClass().equals(CONFIG_CLASS)) {
-                        AccessDeviceConfig config =
-                                networkConfig.getConfig((DeviceId) event.subject(), CONFIG_CLASS);
-                        if (config != null) {
-                            oltData.put(config.getAccessDevice().deviceId(), config.getAccessDevice());
-                            provisionDefaultFlows((DeviceId) event.subject());
-                            provisionUplinkFlows((DeviceId) event.subject());
-                        }
-                    }
 
                     if (event.configClass().equals(IGMPPROXY_CONFIG_CLASS)) {
                         IgmpproxyConfig config = networkConfig.getConfig(appId, IGMPPROXY_CONFIG_CLASS);
@@ -826,43 +888,46 @@
                     break;
                 case CONFIG_REGISTERED:
                 case CONFIG_UNREGISTERED:
-                    break;
                 case CONFIG_REMOVED:
-                    if (event.configClass().equals(CONFIG_CLASS)) {
-                        oltData.remove(event.subject());
-                    }
-
                 default:
                     break;
             }
         }
     }
 
-    private void provisionDefaultFlows(DeviceId deviceId) {
-        List<Port> ports = deviceService.getPorts(deviceId);
-        ports.stream()
-                .filter(p -> (!oltData.get(p.element().id()).uplink().equals(p.number()) && p.isEnabled()))
-                .forEach(p -> processFilterObjective((DeviceId) p.element().id(), p.number(), false));
-    }
-
     private void provisionUplinkFlows(DeviceId deviceId) {
         if (connectPointMode) {
             return;
         }
 
-        processFilterObjective(deviceId, oltData.get(deviceId).uplink(), false);
+        Optional<PortNumber> upLink = getDeviceUplink(deviceId);
+        if (upLink.isPresent()) {
+            processFilterObjective(deviceId, upLink.get(), false);
+        }
     }
 
     private void provisionUplinkFlows() {
         if (connectPointMode) {
             return;
         }
-
-        oltData.keySet().forEach(deviceId -> provisionUplinkFlows(deviceId));
+        deviceService.getAvailableDevices().forEach(device -> {
+            Optional<SubscriberAndDeviceInformation> accessDevice = getSubscriberAndDeviceInformation(device.id());
+            if (accessDevice.isPresent()) {
+                provisionUplinkFlows(device.id());
+            }
+        });
     }
+
     private void unprovisionUplinkFlows() {
-        oltData.keySet().forEach(deviceId ->
-                processFilterObjective(deviceId, oltData.get(deviceId).uplink(), true));
+        deviceService.getAvailableDevices().forEach(device -> {
+            Optional<SubscriberAndDeviceInformation> accessDevices = getSubscriberAndDeviceInformation(device.id());
+            if (accessDevices.isPresent()) {
+                Optional<PortNumber> upLink = getDeviceUplink(device.id());
+                if (upLink.isPresent()) {
+                    processFilterObjective(device.id(), upLink.get(), true);
+                }
+            }
+        });
     }
 
     private void provisionConnectPointFlows() {
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpSender.java b/src/main/java/org/opencord/igmpproxy/IgmpSender.java
index f880646..b455cb4 100644
--- a/src/main/java/org/opencord/igmpproxy/IgmpSender.java
+++ b/src/main/java/org/opencord/igmpproxy/IgmpSender.java
@@ -195,12 +195,11 @@
         return mac;
     }
 
-    public void sendIgmpPacketUplink(Ethernet ethPkt, DeviceId deviceId) {
+    public void sendIgmpPacketUplink(Ethernet ethPkt, DeviceId deviceId, PortNumber upLinkPort) {
         if (!mastershipService.isLocalMaster(deviceId)) {
             return;
         }
 
-
         if (IgmpManager.connectPointMode) {
             if (IgmpManager.connectPoint == null) {
                 log.warn("cannot find a connectPoint to send the packet uplink");
@@ -208,8 +207,7 @@
             }
             sendIgmpPacket(ethPkt, IgmpManager.connectPoint.deviceId(), IgmpManager.connectPoint.port());
         } else {
-            PortNumber upLink = IgmpManager.getDeviceUplink(deviceId);
-            sendIgmpPacket(ethPkt, deviceId, upLink);
+            sendIgmpPacket(ethPkt, deviceId, upLinkPort);
         }
     }
 
diff --git a/src/main/java/org/opencord/igmpproxy/SingleStateMachine.java b/src/main/java/org/opencord/igmpproxy/SingleStateMachine.java
index c5aaee7..c2b32a4 100644
--- a/src/main/java/org/opencord/igmpproxy/SingleStateMachine.java
+++ b/src/main/java/org/opencord/igmpproxy/SingleStateMachine.java
@@ -18,6 +18,7 @@
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.Ip4Address;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
 
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -42,6 +43,7 @@
     private DeviceId devId;
     private Ip4Address groupIp;
     private Ip4Address srcIp;
+    private PortNumber upLinkPort;
 
     private AtomicInteger count = new AtomicInteger(DEFAULT_COUNT);
     private int timerId = IgmpTimer.INVALID_TIMER_ID;
@@ -61,10 +63,11 @@
             {nonTransition, delayTransition, idleTransition};
     private int currentState = STATE_NON;
 
-    public SingleStateMachine(DeviceId devId, Ip4Address groupIp, Ip4Address src) {
+    public SingleStateMachine(DeviceId devId, Ip4Address groupIp, Ip4Address src, PortNumber upLinkPort) {
         this.devId = devId;
         this.groupIp = groupIp;
         this.srcIp = src;
+        this.upLinkPort = upLinkPort;
     }
 
     public Ip4Address getGroupIp() {
@@ -137,7 +140,7 @@
         public void leave(boolean messageOutAllowed) {
             if (messageOutAllowed) {
                 Ethernet eth = IgmpSender.getInstance().buildIgmpV3Leave(groupIp, srcIp);
-                IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId);
+                IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId, upLinkPort);
             }
         }
 
@@ -153,7 +156,7 @@
         public void join(boolean messageOutAllowed) {
             if (messageOutAllowed) {
                 Ethernet eth = IgmpSender.getInstance().buildIgmpV3Join(groupIp, srcIp);
-                IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId);
+                IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId, upLinkPort);
                 timeOut = getTimeOut(IgmpManager.getUnsolicitedTimeout());
                 timerId = IgmpTimer.start(SingleStateMachine.this, timeOut);
             }
@@ -171,7 +174,7 @@
         public void timeOut() {
             if (sendQuery) {
                 Ethernet eth = IgmpSender.getInstance().buildIgmpV3ResponseQuery(groupIp, srcIp);
-                IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId);
+                IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId, upLinkPort);
                 timeOut = DEFAULT_MAX_RESP;
             }
         }
diff --git a/src/main/java/org/opencord/igmpproxy/StateMachine.java b/src/main/java/org/opencord/igmpproxy/StateMachine.java
index 391bdee..1097560 100644
--- a/src/main/java/org/opencord/igmpproxy/StateMachine.java
+++ b/src/main/java/org/opencord/igmpproxy/StateMachine.java
@@ -18,6 +18,8 @@
 import com.google.common.collect.Maps;
 import org.onlab.packet.Ip4Address;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+
 import java.util.Map;
 import java.util.Set;
 
@@ -52,11 +54,11 @@
         map.remove(getId(devId, groupIp));
     }
 
-    public static boolean join(DeviceId devId, Ip4Address groupIp, Ip4Address srcIP) {
+    public static boolean join(DeviceId devId, Ip4Address groupIp, Ip4Address srcIP, PortNumber upLinkPort) {
         SingleStateMachine machine = get(devId, groupIp);
 
         if (null == machine) {
-            machine = new SingleStateMachine(devId, groupIp, srcIP);
+            machine = new SingleStateMachine(devId, groupIp, srcIP, upLinkPort);
             map.put(getId(devId, groupIp), machine);
 
             boolean shouldSendJoin = true;
diff --git a/src/test/java/org/opencord/igmpproxy/IgmpManagerBase.java b/src/test/java/org/opencord/igmpproxy/IgmpManagerBase.java
index fa4e504..43cc24a 100644
--- a/src/test/java/org/opencord/igmpproxy/IgmpManagerBase.java
+++ b/src/test/java/org/opencord/igmpproxy/IgmpManagerBase.java
@@ -15,33 +15,23 @@
  */
 package org.opencord.igmpproxy;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-
+import com.google.common.collect.ImmutableSet;
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.Ip4Address;
 import org.onlab.packet.IpAddress;
-import org.onlab.packet.VlanId;
+import org.onlab.packet.MacAddress;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.mastership.MastershipServiceAdapter;
 import org.onosproject.mcast.api.McastListener;
 import org.onosproject.mcast.api.McastRoute;
 import org.onosproject.mcast.api.McastRouteData;
 import org.onosproject.mcast.api.MulticastRouteService;
-import org.onosproject.net.config.ConfigFactory;
-import org.onosproject.net.config.NetworkConfigRegistryAdapter;
 import org.onosproject.net.AnnotationKeys;
 import org.onosproject.net.Annotations;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DefaultAnnotations;
 import org.onosproject.net.DefaultDevice;
+import org.onosproject.net.DefaultPort;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.HostId;
@@ -49,6 +39,8 @@
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.SparseAnnotations;
 import org.onosproject.net.config.Config;
+import org.onosproject.net.config.ConfigFactory;
+import org.onosproject.net.config.NetworkConfigRegistryAdapter;
 import org.onosproject.net.config.basics.McastConfig;
 import org.onosproject.net.config.basics.SubjectFactories;
 import org.onosproject.net.device.DeviceServiceAdapter;
@@ -59,13 +51,18 @@
 import org.onosproject.net.packet.PacketContext;
 import org.onosproject.net.packet.PacketProcessor;
 import org.onosproject.net.packet.PacketServiceAdapter;
-import org.opencord.cordconfig.access.AccessDeviceConfig;
-import org.opencord.cordconfig.access.AccessDeviceData;
+import org.opencord.sadis.BandwidthProfileInformation;
+import org.opencord.sadis.BaseInformationService;
+import org.opencord.sadis.SadisService;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
 
 public class IgmpManagerBase {
 
@@ -85,13 +82,18 @@
     // Uplink ports for two olts A and B
     protected static final PortNumber PORT_A = PortNumber.portNumber(1);
     protected static final PortNumber PORT_B = PortNumber.portNumber(2);
+    protected static final PortNumber PORT_NNI = PortNumber.portNumber(65536);
 
     // Connect Point mode for two olts
     protected static final ConnectPoint CONNECT_POINT_A = new ConnectPoint(DEVICE_ID_OF_A, PORT_A);
     protected static final ConnectPoint CONNECT_POINT_B = new ConnectPoint(DEVICE_ID_OF_B, PORT_B);
 
-    // setOfDevices which will store device id of two olts
-    protected Set<DeviceId> setOfDevices = new HashSet<DeviceId>(Arrays.asList(DEVICE_ID_OF_A, DEVICE_ID_OF_B));
+    protected static final String CLIENT_NAS_PORT_ID = "PON 1/1";
+    protected static final String CLIENT_CIRCUIT_ID = "CIR-PON 1/1";
+    protected String dsBpId = "HSIA-DS";
+
+    private static final String NNI_PREFIX = "nni";
+
     protected List<Port> lsPorts = new ArrayList<Port>();
     // Flag for adding two different devices in oltData
     protected boolean flagForDevice = true;
@@ -128,9 +130,19 @@
         public List<Port> getPorts(DeviceId deviceId) {
             return lsPorts;
         }
+
+        @Override
+        public Port getPort(DeviceId deviceId, PortNumber portNumber) {
+            if (portNumber.equals(PORT_NNI)) {
+                DefaultAnnotations.Builder annotationsBuilder = DefaultAnnotations.builder()
+                        .set(AnnotationKeys.PORT_NAME, NNI_PREFIX);
+                Port nni = new DefaultPort(null, portNumber, true, annotationsBuilder.build());
+                return nni;
+            }
+            return super.getPort(deviceId, portNumber);
+        }
     }
 
-    static final Class<AccessDeviceConfig> CONFIG_CLASS = AccessDeviceConfig.class;
     static final Class<IgmpproxyConfig> IGMPPROXY_CONFIG_CLASS = IgmpproxyConfig.class;
     static final Class<IgmpproxySsmTranslateConfig> IGMPPROXY_SSM_CONFIG_CLASS = IgmpproxySsmTranslateConfig.class;
     static final Class<McastConfig> MCAST_CONFIG_CLASS = McastConfig.class;
@@ -177,48 +189,6 @@
     }
 
 
-    static class MockAccessDeviceConfig extends AccessDeviceConfig {
-
-        public MockAccessDeviceConfig() {
-            super();
-        }
-
-        public MockAccessDeviceConfig(DeviceId id) {
-            super();
-            subject = id;
-        }
-
-        @Override
-        public AccessDeviceData getAccessDevice() {
-            PortNumber uplink = PortNumber.portNumber(3);
-            VlanId vlan = VlanId.vlanId((short) 0);
-            ObjectMapper mapper = new ObjectMapper();
-            JsonNode defaultVlanNode = null;
-            try {
-                  defaultVlanNode = (JsonNode) mapper.readTree("{\"driver\":\"pmc-olt\" , \"type \" : \"OLT\"}");
-            } catch (IOException e) {
-                  e.printStackTrace();
-            }
-
-            Optional<VlanId> defaultVlan;
-            if (defaultVlanNode.isMissingNode()) {
-                defaultVlan = Optional.empty();
-            } else {
-                defaultVlan = Optional.of(VlanId.vlanId(defaultVlanNode.shortValue()));
-            }
-            return new AccessDeviceData(subject, uplink, vlan, defaultVlan);
-        }
-    }
-
-    ConfigFactory<DeviceId, AccessDeviceConfig> cf =
-            new ConfigFactory<DeviceId, AccessDeviceConfig>(
-               SubjectFactories.DEVICE_SUBJECT_FACTORY, CONFIG_CLASS, "accessDevice") {
-        @Override
-        public AccessDeviceConfig createConfig() {
-            return new MockAccessDeviceConfig();
-        }
-     };
-
      class TestNetworkConfigRegistry extends NetworkConfigRegistryAdapter {
          Boolean igmpOnPodFlag = false;
          TestNetworkConfigRegistry(Boolean igmpFlag) {
@@ -226,36 +196,24 @@
          }
         @SuppressWarnings("unchecked")
         @Override
-        public <S, C extends Config<S>> C getConfig(S subject, Class<C> configClass) {
-            if (configClass.getName().equalsIgnoreCase("org.opencord.igmpproxy.IgmpproxyConfig")) {
-                IgmpproxyConfig igmpproxyConfig = new MockIgmpProxyConfig(igmpOnPodFlag);
-                return (C) igmpproxyConfig;
-            } else if (configClass.getName().equalsIgnoreCase("org.opencord.cordconfig.access.AccessDeviceConfig")) {
-
-                if (subject.toString().equals(DEVICE_ID_OF_A.toString())) {
-                    AccessDeviceConfig accessDeviceConfig = new MockAccessDeviceConfig(DEVICE_ID_OF_A);
-                return (C) accessDeviceConfig;
-                } else {
-                    AccessDeviceConfig accessDeviceConfig = new MockAccessDeviceConfig(DEVICE_ID_OF_B);
-                    return (C) accessDeviceConfig;
-                }
-            } else {
-                super.getConfig(subject, configClass);
+        public <S> Set<S> getSubjects(Class<S> subjectClass) {
+            if (subjectClass.getName().equalsIgnoreCase("org.onosproject.net.DeviceId")) {
+                return (Set<S>) ImmutableSet.of(DEVICE_ID_OF_A, DEVICE_ID_OF_B);
             }
             return null;
        }
 
-        @SuppressWarnings("unchecked")
-        @Override
-        public <S, C extends Config<S>> ConfigFactory<S, C> getConfigFactory(Class<C> configClass) {
-            return (ConfigFactory<S, C>) cf;
-        }
-
-        @SuppressWarnings("unchecked")
-        @Override
-        public <S, C extends Config<S>> Set<S> getSubjects(Class<S> subjectClass, Class<C> configClass) {
-            return (Set<S>) setOfDevices;
-        }
+         @SuppressWarnings("unchecked")
+         @Override
+         public <S, C extends Config<S>> C getConfig(S subject, Class<C> configClass) {
+             if (configClass.getName().equalsIgnoreCase("org.opencord.igmpproxy.IgmpproxyConfig")) {
+                 IgmpproxyConfig igmpproxyConfig = new MockIgmpProxyConfig(igmpOnPodFlag);
+                 return (C) igmpproxyConfig;
+             } else {
+                 super.getConfig(subject, configClass);
+             }
+             return null;
+         }
     }
 
 
@@ -443,4 +401,87 @@
         }
    }
 
+    protected class MockSadisService implements SadisService {
+
+        @Override
+        public BaseInformationService<SubscriberAndDeviceInformation> getSubscriberInfoService() {
+            return new MockSubService();
+        }
+
+        @Override
+        public BaseInformationService<BandwidthProfileInformation> getBandwidthProfileService() {
+            return new MockBpService();
+        }
+    }
+
+    private class MockBpService implements BaseInformationService<BandwidthProfileInformation> {
+
+        @Override
+        public void invalidateAll() {
+
+        }
+
+        @Override
+        public void invalidateId(String id) {
+
+        }
+
+        @Override
+        public BandwidthProfileInformation get(String id) {
+            if (id.equals(dsBpId)) {
+                BandwidthProfileInformation bpInfo = new BandwidthProfileInformation();
+                bpInfo.setAssuredInformationRate(0);
+                bpInfo.setCommittedInformationRate(10000);
+                bpInfo.setCommittedBurstSize(1000L);
+                bpInfo.setExceededBurstSize(2000L);
+                bpInfo.setExceededInformationRate(20000);
+                return bpInfo;
+            }
+            return null;
+        }
+
+        @Override
+        public BandwidthProfileInformation getfromCache(String id) {
+            return null;
+        }
+    }
+
+    private class MockSubService implements BaseInformationService<SubscriberAndDeviceInformation> {
+        MockSubscriberAndDeviceInformation sub =
+                new MockSubscriberAndDeviceInformation(CLIENT_NAS_PORT_ID,
+                                                       CLIENT_NAS_PORT_ID, CLIENT_CIRCUIT_ID, null, null);
+
+        @Override
+        public SubscriberAndDeviceInformation get(String id) {
+            return sub;
+        }
+
+        @Override
+        public void invalidateAll() {
+        }
+
+        @Override
+        public void invalidateId(String id) {
+        }
+
+        @Override
+        public SubscriberAndDeviceInformation getfromCache(String id) {
+            return null;
+        }
+    }
+
+    private class MockSubscriberAndDeviceInformation extends SubscriberAndDeviceInformation {
+
+        MockSubscriberAndDeviceInformation(String id, String nasPortId,
+                                           String circuitId, MacAddress hardId,
+                                           Ip4Address ipAddress) {
+            this.setHardwareIdentifier(hardId);
+            this.setId(id);
+            this.setIPAddress(ipAddress);
+            this.setNasPortId(nasPortId);
+            this.setCircuitId(circuitId);
+            this.setUplinkPort((int) PORT_NNI.toLong());
+        }
+    }
+
 }
diff --git a/src/test/java/org/opencord/igmpproxy/IgmpManagerTest.java b/src/test/java/org/opencord/igmpproxy/IgmpManagerTest.java
index 62ab4c0..21a83fe 100644
--- a/src/test/java/org/opencord/igmpproxy/IgmpManagerTest.java
+++ b/src/test/java/org/opencord/igmpproxy/IgmpManagerTest.java
@@ -42,6 +42,7 @@
         igmpManager.packetService = new MockPacketService();
         igmpManager.flowRuleService = new FlowRuleServiceAdapter();
         igmpManager.multicastService = new TestMulticastRouteService();
+        igmpManager.sadisService = new MockSadisService();
         // By default - we send query messages
         SingleStateMachine.sendQuery = true;
     }
@@ -127,5 +128,4 @@
         }
         assertEquals(1, savedPackets.size());
     }
-
 }