Merge pull request #25 from cgaonker/master

Test Plan Doc
diff --git a/src/test/apps/ciena-cordigmp-multitable-1.0-SNAPSHOT.oar b/src/test/apps/ciena-cordigmp-multitable-1.0-SNAPSHOT.oar
new file mode 100644
index 0000000..5d5c39b
--- /dev/null
+++ b/src/test/apps/ciena-cordigmp-multitable-1.0-SNAPSHOT.oar
Binary files differ
diff --git a/src/test/apps/ciena-cordigmp.multi-table/pom.xml b/src/test/apps/ciena-cordigmp.multi-table/pom.xml
new file mode 100644
index 0000000..4c49010
--- /dev/null
+++ b/src/test/apps/ciena-cordigmp.multi-table/pom.xml
@@ -0,0 +1,164 @@
+<?xml version="1.0" encoding="UTF-8"?>

+<!--

+  ~ Copyright 2016 Open Networking Laboratory

+  ~

+  ~ Licensed under the Apache License, Version 2.0 (the "License");

+  ~ you may not use this file except in compliance with the License.

+  ~ You may obtain a copy of the License at

+  ~

+  ~     http://www.apache.org/licenses/LICENSE-2.0

+  ~

+  ~ Unless required by applicable law or agreed to in writing, software

+  ~ distributed under the License is distributed on an "AS IS" BASIS,

+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+  ~ See the License for the specific language governing permissions and

+  ~ limitations under the License.

+  -->

+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

+    <modelVersion>4.0.0</modelVersion>

+

+    <groupId>org.ciena.cordigmp</groupId>

+    <artifactId>ciena-cordigmp</artifactId>

+    <version>1.0-SNAPSHOT</version>

+    <packaging>bundle</packaging>

+

+    <description>Ciena CORD IGMP for OVS</description>

+    <url>http://onosproject.org</url>

+

+    <properties>

+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

+        <onos.version>1.6.0-SNAPSHOT</onos.version>

+        <onos.app.name>org.ciena.cordigmp</onos.app.name>

+        <onos.app.title>Ciena IGMP for OVS</onos.app.title>

+        <onos.app.origin>Ciena Inc.</onos.app.origin>

+        <onos.app.category>default</onos.app.category>

+        <onos.app.url>http://onosproject.org</onos.app.url>

+        <onos.app.readme>ONOS OSGi bundle archetype.</onos.app.readme>

+    </properties>

+

+    <dependencies>

+        <dependency>

+            <groupId>org.onosproject</groupId>

+            <artifactId>onos-api</artifactId>

+            <version>${onos.version}</version>

+        </dependency>

+

+        <dependency>

+            <groupId>org.onosproject</groupId>

+            <artifactId>onos-cord-config</artifactId>

+            <version>${onos.version}</version>

+        </dependency>

+

+        <dependency>

+            <groupId>org.onosproject</groupId>

+            <artifactId>onlab-osgi</artifactId>

+            <version>${onos.version}</version>

+        </dependency>

+

+        <dependency>

+            <groupId>junit</groupId>

+            <artifactId>junit</artifactId>

+            <version>4.12</version>

+            <scope>test</scope>

+        </dependency>

+

+        <dependency>

+            <groupId>org.onosproject</groupId>

+            <artifactId>onos-api</artifactId>

+            <version>${onos.version}</version>

+            <scope>test</scope>

+            <classifier>tests</classifier>

+        </dependency>

+

+        <dependency>

+            <groupId>org.apache.felix</groupId>

+            <artifactId>org.apache.felix.scr.annotations</artifactId>

+            <version>1.9.12</version>

+            <scope>provided</scope>

+        </dependency>

+

+        <dependency>

+            <groupId>org.onosproject</groupId>

+            <artifactId>onos-cli</artifactId>

+            <version>${onos.version}</version>

+        </dependency>

+      <dependency>

+            <groupId>org.osgi</groupId>

+            <artifactId>org.osgi.compendium</artifactId>

+            <version>5.0.0</version>

+        </dependency>

+      <dependency>

+        <groupId>com.google.guava</groupId>

+        <artifactId>guava</artifactId>

+        <version>19.0</version>

+      </dependency>

+    </dependencies>

+

+    <build>

+        <plugins>

+            <plugin>

+                <groupId>org.apache.felix</groupId>

+                <artifactId>maven-bundle-plugin</artifactId>

+                <version>3.0.1</version>

+                <extensions>true</extensions>

+            </plugin>

+            <plugin>

+                <groupId>org.apache.maven.plugins</groupId>

+                <artifactId>maven-compiler-plugin</artifactId>

+                <version>2.5.1</version>

+                <configuration>

+                    <source>1.8</source>

+                    <target>1.8</target>

+                </configuration>

+            </plugin>

+            <plugin>

+                <groupId>org.apache.felix</groupId>

+                <artifactId>maven-scr-plugin</artifactId>

+                <version>1.21.0</version>

+                <executions>

+                    <execution>

+                        <id>generate-scr-srcdescriptor</id>

+                        <goals>

+                            <goal>scr</goal>

+                        </goals>

+                    </execution>

+                </executions>

+                <configuration>

+                    <supportedProjectTypes>

+                        <supportedProjectType>bundle</supportedProjectType>

+                        <supportedProjectType>war</supportedProjectType>

+                    </supportedProjectTypes>

+                </configuration>

+            </plugin>

+            <plugin>

+                <groupId>org.onosproject</groupId>

+                <artifactId>onos-maven-plugin</artifactId>

+                <version>1.9</version>

+                <executions>

+                    <execution>

+                        <id>cfg</id>

+                        <phase>generate-resources</phase>

+                        <goals>

+                            <goal>cfg</goal>

+                        </goals>

+                    </execution>

+                    <execution>

+                        <id>swagger</id>

+                        <phase>generate-sources</phase>

+                        <goals>

+                            <goal>swagger</goal>

+                        </goals>

+                    </execution>

+                    <execution>

+                        <id>app</id>

+                        <phase>package</phase>

+                        <goals>

+                            <goal>app</goal>

+                        </goals>

+                    </execution>

+                </executions>

+            </plugin>

+        </plugins>

+    </build>

+

+</project>

diff --git a/src/test/apps/ciena-cordigmp.multi-table/src/main/java/org/ciena/cordigmp/CordIgmp.java b/src/test/apps/ciena-cordigmp.multi-table/src/main/java/org/ciena/cordigmp/CordIgmp.java
new file mode 100644
index 0000000..5b252d2
--- /dev/null
+++ b/src/test/apps/ciena-cordigmp.multi-table/src/main/java/org/ciena/cordigmp/CordIgmp.java
@@ -0,0 +1,418 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.ciena.cordigmp;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.VlanId;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cordconfig.access.AccessDeviceConfig;
+import org.onosproject.cordconfig.access.AccessDeviceData;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.config.ConfigFactory;
+import org.onosproject.net.config.NetworkConfigEvent;
+import org.onosproject.net.config.NetworkConfigListener;
+import org.onosproject.net.config.NetworkConfigRegistry;
+import org.onosproject.net.config.basics.SubjectFactories;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flowobjective.DefaultForwardingObjective;
+import org.onosproject.net.flowobjective.DefaultNextObjective;
+import org.onosproject.net.flowobjective.FlowObjectiveService;
+import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.flowobjective.NextObjective;
+import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveContext;
+import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.mcast.McastEvent;
+import org.onosproject.net.mcast.McastListener;
+import org.onosproject.net.mcast.McastRoute;
+import org.onosproject.net.mcast.McastRouteInfo;
+import org.onosproject.net.mcast.MulticastRouteService;
+
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.onlab.util.Tools.get;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * CORD multicast provisioning application. Operates by listening to
+ * events on the multicast rib and provisioning groups to program multicast
+ * flows on the dataplane.
+ */
+@Component(immediate = true)
+public class CordIgmp {
+
+
+    private static final int DEFAULT_PRIORITY = 500;
+    private static final short DEFAULT_MCAST_VLAN = 4000;
+    private static final boolean DEFAULT_VLAN_ENABLED = false;
+
+    private final Logger log = getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected MulticastRouteService mcastService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected FlowObjectiveService flowObjectiveService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected CoreService coreService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ComponentConfigService componentConfigService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected NetworkConfigRegistry networkConfig;
+
+    protected McastListener listener = new InternalMulticastListener();
+    private InternalNetworkConfigListener configListener =
+            new InternalNetworkConfigListener();
+
+    //TODO: move this to a ec map
+    private Map<IpAddress, Integer> groups = Maps.newConcurrentMap();
+
+    private ApplicationId appId;
+
+    @Property(name = "mcastVlan", intValue = DEFAULT_MCAST_VLAN,
+            label = "VLAN for multicast traffic")
+    private int mcastVlan = DEFAULT_MCAST_VLAN;
+
+    @Property(name = "vlanEnabled", boolValue = DEFAULT_VLAN_ENABLED,
+            label = "Use vlan for multicast traffic?")
+    private boolean vlanEnabled = DEFAULT_VLAN_ENABLED;
+
+    @Property(name = "priority", intValue = DEFAULT_PRIORITY,
+            label = "Priority for multicast rules")
+    private int priority = DEFAULT_PRIORITY;
+
+    private Map<DeviceId, AccessDeviceData> oltData = new ConcurrentHashMap<>();
+
+    private static final Class<AccessDeviceConfig> CONFIG_CLASS =
+            AccessDeviceConfig.class;
+
+    private ConfigFactory<DeviceId, AccessDeviceConfig> configFactory =
+            new ConfigFactory<DeviceId, AccessDeviceConfig>(
+                    SubjectFactories.DEVICE_SUBJECT_FACTORY, CONFIG_CLASS, "accessDevice") {
+                @Override
+                public AccessDeviceConfig createConfig() {
+                    return new AccessDeviceConfig();
+                }
+            };
+
+    @Activate
+    public void activate(ComponentContext context) {
+        componentConfigService.registerProperties(getClass());
+        modified(context);
+
+        appId = coreService.registerApplication("org.ciena.cordigmp");
+
+
+        networkConfig.registerConfigFactory(configFactory);
+        networkConfig.addListener(configListener);
+
+        networkConfig.getSubjects(DeviceId.class, AccessDeviceConfig.class).forEach(
+                subject -> {
+                    AccessDeviceConfig config = networkConfig.getConfig(subject, AccessDeviceConfig.class);
+                    if (config != null) {
+                        AccessDeviceData data = config.getOlt();
+                        oltData.put(data.deviceId(), data);
+                    }
+                }
+        );
+
+
+        mcastService.addListener(listener);
+
+        mcastService.getRoutes().stream()
+                .map(r -> new ImmutablePair<>(r, mcastService.fetchSinks(r)))
+                .filter(pair -> pair.getRight() != null && !pair.getRight().isEmpty())
+                .forEach(pair -> pair.getRight().forEach(sink -> provisionGroup(pair.getLeft(),
+                        sink)));
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        componentConfigService.unregisterProperties(getClass(), false);
+        mcastService.removeListener(listener);
+        networkConfig.unregisterConfigFactory(configFactory);
+        networkConfig.removeListener(configListener);
+        log.info("Stopped");
+    }
+
+    @Modified
+    public void modified(ComponentContext context) {
+        Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
+
+        try {
+            
+            String s = get(properties, "mcastVlan");
+            mcastVlan = isNullOrEmpty(s) ? DEFAULT_MCAST_VLAN : Short.parseShort(s.trim());
+
+            s = get(properties, "vlanEnabled");
+            vlanEnabled = isNullOrEmpty(s) ? DEFAULT_VLAN_ENABLED : Boolean.parseBoolean(s.trim());
+
+            s = get(properties, "priority");
+            priority = isNullOrEmpty(s) ? DEFAULT_PRIORITY : Integer.parseInt(s.trim());
+
+        } catch (Exception e) {
+            mcastVlan = DEFAULT_MCAST_VLAN;
+            vlanEnabled = false;
+            priority = DEFAULT_PRIORITY;
+        }
+    }
+
+    private class InternalMulticastListener implements McastListener {
+        @Override
+        public void event(McastEvent event) {
+            McastRouteInfo info = event.subject();
+            switch (event.type()) {
+                case ROUTE_ADDED:
+                    break;
+                case ROUTE_REMOVED:
+                    break;
+                case SOURCE_ADDED:
+                    break;
+                case SINK_ADDED:
+                    if (!info.sink().isPresent()) {
+                        log.warn("No sink given after sink added event: {}", info);
+                        return;
+                    }
+                    provisionGroup(info.route(), info.sink().get());
+                    break;
+                case SINK_REMOVED:
+                    unprovisionGroup(event.subject());
+                    break;
+                default:
+                    log.warn("Unknown mcast event {}", event.type());
+            }
+        }
+    }
+
+    private void unprovisionGroup(McastRouteInfo info) {
+
+        if (!info.sink().isPresent()) {
+            log.warn("No sink given after sink removed event: {}", info);
+            return;
+        }
+        ConnectPoint loc = info.sink().get();
+        TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder()
+            .matchEthType(Ethernet.TYPE_IPV4)
+            .matchIPDst(info.route().group().toIpPrefix());
+        if(vlanEnabled) {
+            metabuilder.matchVlanId(VlanId.vlanId((short)mcastVlan));
+        }
+        NextObjective next = DefaultNextObjective.builder()
+                .fromApp(appId)
+                .addTreatment(DefaultTrafficTreatment.builder().setOutput(loc.port()).build())
+                .withType(NextObjective.Type.BROADCAST)
+                .withMeta(metabuilder.build())
+                .withId(groups.get(info.route().group()))
+                .removeFromExisting(new ObjectiveContext() {
+                    @Override
+                    public void onSuccess(Objective objective) {
+                        //TODO: change to debug
+                        log.info("Next Objective {} installed", objective.id());
+                    }
+
+                    @Override
+                    public void onError(Objective objective, ObjectiveError error) {
+                        //TODO: change to debug
+                        log.info("Next Objective {} failed, because {}",
+                                objective.id(),
+                                error);
+                    }
+                });
+
+        flowObjectiveService.next(loc.deviceId(), next);
+    }
+
+    private void provisionGroup(McastRoute route, ConnectPoint sink) {
+        checkNotNull(route, "Route cannot be null");
+        checkNotNull(sink, "Sink cannot be null");
+
+        AccessDeviceData oltInfo = oltData.get(sink.deviceId());
+
+        if (oltInfo == null) {
+            log.warn("Unknown OLT device : {}", sink.deviceId());
+            return;
+        }
+
+        final AtomicBoolean sync = new AtomicBoolean(false);
+
+        log.info("Provisioning sink for device {}", sink.deviceId());
+
+        Integer nextId = groups.computeIfAbsent(route.group(), (g) -> {
+            Integer id = flowObjectiveService.allocateNextId();
+            TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder()
+                    .matchEthType(Ethernet.TYPE_IPV4)
+                    .matchIPDst(g.toIpPrefix());
+            if (vlanEnabled) {
+                metabuilder.matchVlanId(VlanId.vlanId((short) mcastVlan));
+            }
+            NextObjective next = DefaultNextObjective.builder()
+                    .fromApp(appId)
+                    .addTreatment(DefaultTrafficTreatment.builder().setOutput(sink.port()).build())
+                    .withType(NextObjective.Type.BROADCAST)
+                    .withMeta(metabuilder.build())
+                    .withId(id)
+                    .add(new ObjectiveContext() {
+                        @Override
+                        public void onSuccess(Objective objective) {
+                            //TODO: change to debug
+                            log.info("Next Objective {} installed", objective.id());
+                        }
+
+                        @Override
+                        public void onError(Objective objective, ObjectiveError error) {
+                            //TODO: change to debug
+                            log.info("Next Objective {} failed, because {}",
+                                    objective.id(),
+                                    error);
+                        }
+                    });
+
+            flowObjectiveService.next(sink.deviceId(), next);
+
+            TrafficSelector.Builder mcast = DefaultTrafficSelector.builder()
+                    .matchInPort(oltInfo.uplink())
+                    .matchEthType(Ethernet.TYPE_IPV4)
+                    .matchIPDst(g.toIpPrefix());
+
+            if (vlanEnabled) {
+                mcast.matchVlanId(VlanId.vlanId((short) mcastVlan));
+            }
+
+            ForwardingObjective fwd = DefaultForwardingObjective.builder()
+                    .fromApp(appId)
+                    .nextStep(id)
+                    .makePermanent()
+                    .withFlag(ForwardingObjective.Flag.VERSATILE)
+                    .withPriority(priority)
+                    .withSelector(mcast.build())
+                    .add(new ObjectiveContext() {
+                        @Override
+                        public void onSuccess(Objective objective) {
+                            //TODO: change to debug
+                            log.info("Forwarding objective installed {}", objective);
+                        }
+
+                        @Override
+                        public void onError(Objective objective, ObjectiveError error) {
+                            //TODO: change to debug
+                            log.info("Forwarding objective failed {}", objective);
+                        }
+                    });
+
+            flowObjectiveService.forward(sink.deviceId(), fwd);
+
+            sync.set(true);
+            log.info("Installed flows for device: {}, id {}, ip {}", sink.deviceId(), id, g.toIpPrefix());
+            return id;
+        });
+
+        if (!sync.get()) {
+            TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder()
+                    .matchEthType(Ethernet.TYPE_IPV4)
+                    .matchIPDst(route.group().toIpPrefix());
+            NextObjective next = DefaultNextObjective.builder()
+                    .fromApp(appId)
+                    .addTreatment(DefaultTrafficTreatment.builder().setOutput(sink.port()).build())
+                    .withType(NextObjective.Type.BROADCAST)
+                    .withId(nextId)
+                    .withMeta(metabuilder.build())
+                    .addToExisting(new ObjectiveContext() {
+                        @Override
+                        public void onSuccess(Objective objective) {
+                            //TODO: change to debug
+                            log.info("Next Objective {} installed", objective.id());
+                        }
+
+                        @Override
+                        public void onError(Objective objective, ObjectiveError error) {
+                            //TODO: change to debug
+                            log.info("Next Objective {} failed, because {}",
+                                    objective.id(),
+                                    error);
+                        }
+                    });
+
+            flowObjectiveService.next(sink.deviceId(), next);
+
+            log.info("Append flows for device {}, id {}, ip {}", sink.deviceId(), nextId, 
+                     route.group().toIpPrefix());
+        }
+
+    }
+
+    private class InternalNetworkConfigListener implements NetworkConfigListener {
+        @Override
+        public void event(NetworkConfigEvent event) {
+            switch (event.type()) {
+
+                case CONFIG_ADDED:
+                case CONFIG_UPDATED:
+                    AccessDeviceConfig config =
+                            networkConfig.getConfig((DeviceId) event.subject(), CONFIG_CLASS);
+                    if (config != null) {
+                        oltData.put(config.getOlt().deviceId(), config.getOlt());
+                    }
+
+                    break;
+                case CONFIG_REGISTERED:
+                case CONFIG_UNREGISTERED:
+                    break;
+                case CONFIG_REMOVED:
+                    oltData.remove(event.subject());
+                    break;
+                default:
+                    break;
+            }
+        }
+
+        @Override
+        public boolean isRelevant(NetworkConfigEvent event) {
+            return event.configClass().equals(CONFIG_CLASS);
+        }
+    }
+}
diff --git a/src/test/setup/cord-test.py b/src/test/setup/cord-test.py
index 2214584..2cdeaa4 100755
--- a/src/test/setup/cord-test.py
+++ b/src/test/setup/cord-test.py
@@ -20,6 +20,7 @@
 sys.path.append(utils_dir)
 from OnosCtrl import OnosCtrl
 from OltConfig import OltConfig
+from threadPool import ThreadPool
 from CordContainer import *
 from CordTestServer import cord_test_server_start, cord_test_server_stop
 
@@ -28,7 +29,7 @@
     sandbox_setup = '/root/test/src/test/setup'
     tester_base = os.path.dirname(os.path.realpath(__file__))
     tester_paths = os.path.realpath(__file__).split(os.path.sep)
-    tester_path_index = tester_paths.index('cord-tester')
+    tester_path_index = tester_paths.index('src') - 1
     sandbox_host = os.path.sep.join(tester_paths[:tester_path_index+1])
 
     host_guest_map = ( (sandbox_host, sandbox),
@@ -39,8 +40,9 @@
     IMAGE = 'cord-test/nose'
     ALL_TESTS = ('tls', 'dhcp', 'igmp', 'subscriber', 'vrouter', 'flows')
 
-    def __init__(self, ctlr_ip = None, image = IMAGE, tag = 'latest',
+    def __init__(self, tests, instance = 0, num_instances = 1, ctlr_ip = None, image = IMAGE, tag = 'latest',
                  env = None, rm = False, update = False):
+        self.tests = tests
         self.ctlr_ip = ctlr_ip
         self.rm = rm
         self.name = self.get_name()
@@ -61,6 +63,10 @@
         else:
             self.olt = False
             self.port_map = None
+        if env is not None:
+            env['TEST_HOST'] = self.name
+            env['TEST_INSTANCE'] = instance
+            env['TEST_INSTANCES'] = num_instances
         print('Starting test container %s, image %s, tag %s' %(self.name, self.image, self.tag))
         self.start(rm = False, volumes = volumes, environment = env, 
                    host_config = host_config, tty = True)
@@ -101,27 +107,24 @@
         if boot_delay:
             time.sleep(boot_delay)
 
-    def setup_intfs(self):
-        if not self.olt:
-            return 0
+    def setup_intfs(self, port_num = 0):
         tester_intf_subnet = '192.168.100'
         res = 0
-        port_num = 0
         host_intf = self.port_map['host']
         start_vlan = self.port_map['start_vlan']
         for port in self.port_map['ports']:
             guest_if = port
-            local_if = guest_if
-            guest_ip = '{0}.{1}/24'.format(tester_intf_subnet, str(port_num+1))
+            local_if = '{0}_{1}'.format(guest_if, port_num+1)
+            guest_ip = '{0}.{1}/24'.format(tester_intf_subnet, port_num+1)
             ##Use pipeworks to configure container interfaces on host/bridge interfaces
             pipework_cmd = 'pipework {0} -i {1} -l {2} {3} {4}'.format(host_intf, guest_if, local_if, self.name, guest_ip)
             if start_vlan != 0:
-                pipework_cmd += ' @{}'.format(str(start_vlan + port_num))
+                pipework_cmd += ' @{}'.format(start_vlan + port_num)
                 
             res += os.system(pipework_cmd)
             port_num += 1
 
-        return res
+        return res, port_num
 
     @classmethod
     def get_name(cls):
@@ -170,14 +173,34 @@
 RUN apt-get -y install arping
 RUN mv /usr/sbin/tcpdump /sbin/
 RUN ln -sf /sbin/tcpdump /usr/sbin/tcpdump
+RUN apt-get install -y git-core autoconf automake autotools-dev pkg-config \
+                        make gcc g++ libtool libc6-dev cmake libpcap-dev libxerces-c2-dev  \
+                        unzip libpcre3-dev flex bison libboost-dev
+WORKDIR /root
+RUN wget -nc http://de.archive.ubuntu.com/ubuntu/pool/main/b/bison/bison_2.5.dfsg-2.1_amd64.deb \
+         http://de.archive.ubuntu.com/ubuntu/pool/main/b/bison/libbison-dev_2.5.dfsg-2.1_amd64.deb
+RUN sudo dpkg -i bison_2.5.dfsg-2.1_amd64.deb libbison-dev_2.5.dfsg-2.1_amd64.deb
+RUN rm bison_2.5.dfsg-2.1_amd64.deb libbison-dev_2.5.dfsg-2.1_amd64.deb
+RUN wget -nc http://www.nbee.org/download/nbeesrc-jan-10-2013.zip && \
+    unzip nbeesrc-jan-10-2013.zip && \
+    cd nbeesrc-jan-10-2013/src && cmake . && make && \
+    cp ../bin/libn*.so /usr/local/lib && ldconfig && \
+    cp -R ../include/* /usr/include/
+WORKDIR /root
+RUN git clone https://github.com/CPqD/ofsoftswitch13.git && \
+    cd ofsoftswitch13 && \
+    ./boot.sh && \
+    ./configure --prefix=/usr --sysconfdir=/etc --localstatedir=/var --disable-ssl && \
+    make && make install
 CMD ["/bin/bash"]
 '''.format(*image_format)
         super(CordTester, cls).build_image(dockerfile, image)
         print('Done building docker image %s' %image)
 
-    def run_tests(self, tests):
+    def run_tests(self):
         '''Run the list of tests'''
-        for t in tests:
+        print('Running tests: %s' %self.tests)
+        for t in self.tests:
             test = t.split(':')[0]
             test_file = '{}Test.py'.format(test)
             if t.find(':') >= 0:
@@ -211,6 +234,9 @@
 def runTest(args):
     #Start the cord test tcp server
     test_server = cord_test_server_start()
+    test_containers = []
+    #These tests end up restarting ONOS/quagga/radius
+    tests_exempt = ('vrouter',)
     if args.test_type.lower() == 'all':
         tests = CordTester.ALL_TESTS
         args.radius = True
@@ -218,6 +244,8 @@
     else:
         tests = args.test_type.split('-')
 
+    tests_parallel = [ t for t in tests if t.split(':')[0] not in tests_exempt ]
+    tests_not_parallel = [ t for t in tests if t.split(':')[0] in tests_exempt ]
     onos_cnt = {'tag':'latest'}
     nose_cnt = {'image': CordTester.IMAGE, 'tag': 'latest'}
     update_map = { 'quagga' : False, 'test' : False, 'radius' : False }
@@ -271,14 +299,48 @@
         olt_conf_test_loc = os.path.join(CordTester.sandbox_setup, 'olt_config.json')
         test_cnt_env['OLT_CONFIG'] = olt_conf_test_loc
 
-    test_cnt = CordTester(ctlr_ip = onos_ip, image = nose_cnt['image'], tag = nose_cnt['tag'],
-                          env = test_cnt_env,
-                          rm = False if args.keep else True,
-                          update = update_map['test'])
-    if args.start_switch or not args.olt:
-        test_cnt.start_switch()
-    test_cnt.setup_intfs()
-    test_cnt.run_tests(tests)
+    port_num = 0
+    num_tests = len(tests_parallel)
+    tests_per_container = max(1, num_tests/args.num_containers)
+    test_slice_start = 0
+    test_slice_end = test_slice_start + tests_per_container
+    num_test_containers = min(num_tests, args.num_containers)
+    if tests_parallel:
+        print('Running %s tests across %d containers in parallel' %(tests_parallel, num_test_containers))
+    for container in range(num_test_containers):
+        test_cnt = CordTester(tests_parallel[test_slice_start:test_slice_end],
+                              instance = container, num_instances = num_test_containers,
+                              ctlr_ip = onos_ip, image = nose_cnt['image'], tag = nose_cnt['tag'],
+                              env = test_cnt_env,
+                              rm = False if args.keep else True,
+                              update = update_map['test'])
+        test_slice_start = test_slice_end
+        test_slice_end = test_slice_start + tests_per_container
+        update_map['test'] = False
+        test_containers.append(test_cnt)
+        if args.start_switch or not args.olt:
+            test_cnt.start_switch()
+        if test_cnt.olt:
+            _, port_num = test_cnt.setup_intfs(port_num = port_num)
+
+    thread_pool = ThreadPool(len(test_containers), queue_size = 1, wait_timeout=1)
+    for test_cnt in test_containers:
+        thread_pool.addTask(test_cnt.run_tests)
+    thread_pool.cleanUpThreads()
+
+    ##Run the linear tests
+    if tests_not_parallel:
+        test_cnt = CordTester(tests_not_parallel,
+                              ctlr_ip = onos_ip, image = nose_cnt['image'], tag = nose_cnt['tag'],
+                              env = test_cnt_env,
+                              rm = False if args.keep else True,
+                              update = update_map['test'])
+        if args.start_switch or not args.olt:
+            test_cnt.start_switch()
+        if test_cnt.olt:
+            test_cnt.setup_intfs(port_num = port_num)
+        test_cnt.run_tests()
+
     cord_test_server_stop(test_server)
 
 def cleanupTests(args):
@@ -322,6 +384,8 @@
                         '    --update=radius to rebuild radius server image.'
                         '    --update=test to rebuild cord test image.(Default)'
                         '    --update=all to rebuild all cord tester images.')
+    parser_run.add_argument('-n', '--num-containers', default=1, type=int,
+                            help='Specify number of test containers to spawn for tests')
     parser_run.set_defaults(func=runTest)
 
     parser_list = subparser.add_parser('list', help='List test cases')
diff --git a/src/test/setup/cpqd.sh b/src/test/setup/cpqd.sh
new file mode 100755
index 0000000..7500fb5
--- /dev/null
+++ b/src/test/setup/cpqd.sh
@@ -0,0 +1,13 @@
+#!/usr/bin/env bash
+dpid=${1:-001122334455}
+num_ports=${2:-200}
+controller=${3:-$ONOS_CONTROLLER_IP}
+num_ports=$(($num_ports-1))
+my_ip=`ifconfig eth0 | grep "inet addr" | tr -s ' ' | cut -d":" -f2 |cut -d" " -f1`
+if_list="veth1"
+for port in $(seq 3 2 $num_ports); do
+    if_list="$if_list"",""veth$port"
+done
+service openvswitch-switch stop
+nohup ofdatapath --no-slicing --datapath-id=$dpid --interfaces=$if_list ptcp:6653 2>&1 >/tmp/nohup.out &
+nohup ofprotocol tcp:$my_ip:6653 tcp:$controller:6633 2>&1 >/tmp/nohup.out &
diff --git a/src/test/setup/of-bridge-local.sh b/src/test/setup/of-bridge-local.sh
index 114725d..8dfc5ad 100755
--- a/src/test/setup/of-bridge-local.sh
+++ b/src/test/setup/of-bridge-local.sh
@@ -7,6 +7,8 @@
 if [ x"$controller" = "x" ]; then
   controller=$ONOS_CONTROLLER_IP
 fi
+pkill -9 ofdatapath
+pkill -9 ofprotocol
 service openvswitch-switch start
 echo "Configuring ovs bridge $bridge"
 ovs-vsctl del-br $bridge
diff --git a/src/test/setup/olt_config_multitable.json b/src/test/setup/olt_config_multitable.json
new file mode 100644
index 0000000..03066a4
--- /dev/null
+++ b/src/test/setup/olt_config_multitable.json
@@ -0,0 +1,2 @@
+{ "olt" : false , "port_map" : { "port" : "veth0", "num_ports" : 100, "host" : "enp0s8", "start_vlan" : 1000 }, "uplink" : 2, "vlan" : 0 }
+  
diff --git a/src/test/subscriberMultiTable/__init__.py b/src/test/subscriberMultiTable/__init__.py
new file mode 100644
index 0000000..374665c
--- /dev/null
+++ b/src/test/subscriberMultiTable/__init__.py
@@ -0,0 +1,24 @@
+# 
+# Copyright 2016-present Ciena Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# 
+# http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import os,sys
+##add the python path to lookup the utils
+working_dir = os.path.dirname(os.path.realpath(sys.argv[-1]))
+utils_dir = os.path.join(working_dir, '../utils')
+fsm_dir = os.path.join(working_dir, '../fsm')
+subscriber_dir = os.path.join(working_dir, '../subscriber')
+__path__.append(utils_dir)
+__path__.append(fsm_dir)
+__path__.append(subscriber_dir)
diff --git a/src/test/subscriberMultiTable/subscriberMultiTableTest.py b/src/test/subscriberMultiTable/subscriberMultiTableTest.py
new file mode 100644
index 0000000..1f19848
--- /dev/null
+++ b/src/test/subscriberMultiTable/subscriberMultiTableTest.py
@@ -0,0 +1,461 @@
+import unittest
+from nose.tools import *
+from nose.twistedtools import reactor, deferred
+from twisted.internet import defer
+from scapy.all import *
+import time, monotonic
+import os, sys
+import tempfile
+import random
+import threading
+import json
+from Stats import Stats
+from OnosCtrl import OnosCtrl
+from DHCP import DHCPTest
+from EapTLS import TLSAuthTest
+from Channels import Channels, IgmpChannel
+from subscriberDb import SubscriberDB
+from threadPool import ThreadPool
+from portmaps import g_subscriber_port_map 
+from OltConfig import *
+from OnosFlowCtrl import get_mac
+from CordTestServer import cord_test_onos_restart
+
+log.setLevel('INFO')
+
+class Subscriber(Channels):
+      PORT_TX_DEFAULT = 2
+      PORT_RX_DEFAULT = 1
+      INTF_TX_DEFAULT = 'veth2'
+      INTF_RX_DEFAULT = 'veth0'
+      STATS_RX = 0
+      STATS_TX = 1
+      STATS_JOIN = 2
+      STATS_LEAVE = 3
+      SUBSCRIBER_SERVICES = 'DHCP IGMP TLS'
+      def __init__(self, name = 'sub', service = SUBSCRIBER_SERVICES, port_map = None,
+                   num = 1, channel_start = 0,
+                   tx_port = PORT_TX_DEFAULT, rx_port = PORT_RX_DEFAULT,
+                   iface = INTF_RX_DEFAULT, iface_mcast = INTF_TX_DEFAULT,
+                   mcast_cb = None, loginType = 'wireless'):
+            self.tx_port = tx_port
+            self.rx_port = rx_port
+            self.port_map = port_map or g_subscriber_port_map
+            try:
+                  self.tx_intf = self.port_map[tx_port]
+                  self.rx_intf = self.port_map[rx_port]
+            except:
+                  self.tx_intf = self.port_map[self.PORT_TX_DEFAULT]
+                  self.rx_intf = self.port_map[self.PORT_RX_DEFAULT]
+
+            Channels.__init__(self, num, channel_start = channel_start, 
+                              iface = self.rx_intf, iface_mcast = self.tx_intf, mcast_cb = mcast_cb)
+            self.name = name
+            self.service = service
+            self.service_map = {}
+            services = self.service.strip().split(' ')
+            for s in services:
+                  self.service_map[s] = True
+            self.loginType = loginType
+            ##start streaming channels
+            self.join_map = {}
+            ##accumulated join recv stats
+            self.join_rx_stats = Stats()
+
+      def has_service(self, service):
+            if self.service_map.has_key(service):
+                  return self.service_map[service]
+            if self.service_map.has_key(service.upper()):
+                  return self.service_map[service.upper()]
+            return False
+
+      def channel_join_update(self, chan, join_time):
+            self.join_map[chan] = ( Stats(), Stats(), Stats(), Stats() )
+            self.channel_update(chan, self.STATS_JOIN, 1, t = join_time)
+
+      def channel_join(self, chan = 0, delay = 2):
+            '''Join a channel and create a send/recv stats map'''
+            if self.join_map.has_key(chan):
+                  del self.join_map[chan]
+            self.delay = delay
+            chan, join_time = self.join(chan)
+            self.channel_join_update(chan, join_time)
+            return chan
+
+      def channel_join_next(self, delay = 2):
+            '''Joins the next channel leaving the last channel'''
+            if self.last_chan:
+                  if self.join_map.has_key(self.last_chan):
+                        del self.join_map[self.last_chan]
+            self.delay = delay
+            chan, join_time = self.join_next()
+            self.channel_join_update(chan, join_time)
+            return chan
+
+      def channel_jump(self, delay = 2):
+            '''Jumps randomly to the next channel leaving the last channel'''
+            if self.last_chan is not None:
+                  if self.join_map.has_key(self.last_chan):
+                        del self.join_map[self.last_chan]
+            self.delay = delay
+            chan, join_time = self.jump()
+            self.channel_join_update(chan, join_time)
+            return chan
+
+      def channel_leave(self, chan = 0):
+            if self.join_map.has_key(chan):
+                  del self.join_map[chan]
+            self.leave(chan)
+
+      def channel_update(self, chan, stats_type, packets, t=0):
+            if type(chan) == type(0):
+                  chan_list = (chan,)
+            else:
+                  chan_list = chan
+            for c in chan_list: 
+                  if self.join_map.has_key(c):
+                        self.join_map[c][stats_type].update(packets = packets, t = t)
+
+      def channel_receive(self, chan, cb = None, count = 1):
+            log.info('Subscriber %s receiving from group %s, channel %d' %(self.name, self.gaddr(chan), chan))
+            self.recv(chan, cb = cb, count = count)
+
+      def recv_channel_cb(self, pkt):
+            ##First verify that we have received the packet for the joined instance
+            log.debug('Packet received for group %s, subscriber %s' %(pkt[IP].dst, self.name))
+            chan = self.caddr(pkt[IP].dst)
+            assert_equal(chan in self.join_map.keys(), True)
+            recv_time = monotonic.monotonic() * 1000000
+            join_time = self.join_map[chan][self.STATS_JOIN].start
+            delta = recv_time - join_time
+            self.join_rx_stats.update(packets=1, t = delta, usecs = True)
+            self.channel_update(chan, self.STATS_RX, 1, t = delta)
+            log.debug('Packet received in %.3f usecs for group %s after join' %(delta, pkt[IP].dst))
+
+class subscriber_pool:
+
+      def __init__(self, subscriber, test_cbs):
+            self.subscriber = subscriber
+            self.test_cbs = test_cbs
+
+      def pool_cb(self):
+            for cb in self.test_cbs:
+                  if cb:
+                        cb(self.subscriber)
+      
+class subscriber_exchange(unittest.TestCase):
+
+      apps = ('org.onosproject.aaa', 'org.onosproject.dhcp')
+      olt_apps = () #'org.onosproject.cordmcast')
+      table_app = 'org.ciena.cordigmp'
+      dhcp_server_config = {
+        "ip": "10.1.11.50",
+        "mac": "ca:fe:ca:fe:ca:fe",
+        "subnet": "255.255.252.0",
+        "broadcast": "10.1.11.255",
+        "router": "10.1.8.1",
+        "domain": "8.8.8.8",
+        "ttl": "63",
+        "delay": "2",
+        "startip": "10.1.11.51",
+        "endip": "10.1.11.100"
+      }
+
+      aaa_loaded = False
+      test_path = os.path.dirname(os.path.realpath(__file__))
+      table_app_file = os.path.join(test_path, '..', 'apps/ciena-cordigmp-multitable-1.0-SNAPSHOT.oar')
+      app_file = os.path.join(test_path, '..', 'apps/ciena-cordigmp-1.0-SNAPSHOT.oar')
+      onos_config_path = os.path.join(test_path, '..', 'setup/onos-config')
+      olt_conf_file = os.path.join(test_path, '..', 'setup/olt_config_multitable.json')
+      cpqd_path = os.path.join(test_path, '..', 'setup')
+      ovs_path = cpqd_path
+      device_id = 'of:' + get_mac('ovsbr0')
+      cpqd_device_dict = { "devices" : {
+                  "{}".format(device_id) : {
+                        "basic" : {
+                              "driver" : "spring-open-cpqd"
+                              }
+                        }
+                  },
+              }
+
+      @classmethod
+      def setUpClass(cls):
+          '''Load the OLT config and activate relevant apps'''
+          ## First restart ONOS with cpqd driver config for OVS
+          #cls.start_onos(network_cfg = cls.cpqd_device_dict)
+          cls.install_app_table()
+          cls.start_cpqd(mac = RandMAC()._fix())
+          cls.olt = OltConfig(olt_conf_file = cls.olt_conf_file)
+          OnosCtrl.cord_olt_config(cls.olt.olt_device_data())
+          cls.port_map, cls.port_list = cls.olt.olt_port_map_multi()
+          cls.activate_apps(cls.apps + cls.olt_apps)
+
+      @classmethod
+      def tearDownClass(cls):
+          '''Deactivate the olt apps and restart OVS back'''
+          apps = cls.olt_apps + ( cls.table_app,)
+          for app in apps:
+              onos_ctrl = OnosCtrl(app)
+              onos_ctrl.deactivate()
+          cls.uninstall_app_table()
+          cls.start_ovs()
+
+      @classmethod
+      def activate_apps(cls, apps):
+            for app in apps:
+                  onos_ctrl = OnosCtrl(app)
+                  status, _ = onos_ctrl.activate()
+                  assert_equal(status, True)
+                  time.sleep(2)
+
+      @classmethod
+      def install_app_table(cls):
+            ##Uninstall the existing app if any
+            OnosCtrl.uninstall_app(cls.table_app)
+            time.sleep(2)
+            log.info('Installing the multi table app %s for subscriber test' %(cls.table_app_file))
+            OnosCtrl.install_app(cls.table_app_file)
+            time.sleep(3)
+
+      @classmethod
+      def uninstall_app_table(cls):
+            ##Uninstall the table app on class exit
+            OnosCtrl.uninstall_app(cls.table_app)
+            time.sleep(2)
+            log.info('Installing back the cord igmp app %s for subscriber test on exit' %(cls.app_file))
+            OnosCtrl.install_app(cls.app_file)
+
+      @classmethod
+      def start_onos(cls, network_cfg = None):
+            if network_cfg is None:
+                  network_cfg = cls.cpqd_device_dict
+
+            if type(network_cfg) is tuple:
+                  res = []
+                  for v in network_cfg:
+                        res += v.items()
+                  config = dict(res)
+            else:
+                  config = network_cfg
+            log.info('Restarting ONOS with new network configuration')
+            cfg = json.dumps(config)
+            with open('{}/network-cfg.json'.format(cls.onos_config_path), 'w') as f:
+                  f.write(cfg)
+
+            return cord_test_onos_restart()
+
+      @classmethod
+      def start_cpqd(cls, mac = '00:11:22:33:44:55'):
+            dpid = mac.replace(':', '')
+            cpqd_file = os.sep.join( (cls.cpqd_path, 'cpqd.sh') )
+            cpqd_cmd = '{} {}'.format(cpqd_file, dpid)
+            ret = os.system(cpqd_cmd)
+            assert_equal(ret, 0)
+            time.sleep(10)
+
+      @classmethod
+      def start_ovs(cls):
+            ovs_file = os.sep.join( (cls.ovs_path, 'of-bridge-local.sh') )
+            ret = os.system(ovs_file)
+            assert_equal(ret, 0)
+            time.sleep(2)
+
+      def onos_aaa_load(self):
+            if self.aaa_loaded:
+                  return
+            aaa_dict = {'apps' : { 'org.onosproject.aaa' : { 'AAA' : { 'radiusSecret': 'radius_password', 
+                                                                       'radiusIp': '172.17.0.2' } } } }
+            radius_ip = os.getenv('ONOS_AAA_IP') or '172.17.0.2'
+            aaa_dict['apps']['org.onosproject.aaa']['AAA']['radiusIp'] = radius_ip
+            self.onos_load_config('org.onosproject.aaa', aaa_dict)
+            self.aaa_loaded = True
+
+      def onos_dhcp_table_load(self, config = None):
+          dhcp_dict = {'apps' : { 'org.onosproject.dhcp' : { 'dhcp' : copy.copy(self.dhcp_server_config) } } }
+          dhcp_config = dhcp_dict['apps']['org.onosproject.dhcp']['dhcp']
+          if config:
+              for k in config.keys():
+                  if dhcp_config.has_key(k):
+                      dhcp_config[k] = config[k]
+          self.onos_load_config('org.onosproject.dhcp', dhcp_dict)
+
+      def onos_load_config(self, app, config):
+          status, code = OnosCtrl.config(config)
+          if status is False:
+             log.info('JSON config request for app %s returned status %d' %(app, code))
+             assert_equal(status, True)
+          time.sleep(2)
+
+      def dhcp_sndrcv(self, dhcp, update_seed = False):
+            cip, sip = dhcp.discover(update_seed = update_seed)
+            assert_not_equal(cip, None)
+            assert_not_equal(sip, None)
+            log.info('Got dhcp client IP %s from server %s for mac %s' %
+                     (cip, sip, dhcp.get_mac(cip)[0]))
+            return cip,sip
+
+      def dhcp_request(self, subscriber, seed_ip = '10.10.10.1', update_seed = False):
+            config = {'startip':'10.10.10.20', 'endip':'10.10.10.200',
+                      'ip':'10.10.10.2', 'mac': "ca:fe:ca:fe:ca:fe",
+                      'subnet': '255.255.255.0', 'broadcast':'10.10.10.255', 'router':'10.10.10.1'}
+            self.onos_dhcp_table_load(config)
+            dhcp = DHCPTest(seed_ip = seed_ip, iface = subscriber.iface)
+            cip, sip = self.dhcp_sndrcv(dhcp, update_seed = update_seed)
+            return cip, sip
+
+      def recv_channel_cb(self, pkt):
+            ##First verify that we have received the packet for the joined instance
+            chan = self.subscriber.caddr(pkt[IP].dst)
+            assert_equal(chan in self.subscriber.join_map.keys(), True)
+            recv_time = monotonic.monotonic() * 1000000
+            join_time = self.subscriber.join_map[chan][self.subscriber.STATS_JOIN].start
+            delta = recv_time - join_time
+            self.subscriber.join_rx_stats.update(packets=1, t = delta, usecs = True)
+            self.subscriber.channel_update(chan, self.subscriber.STATS_RX, 1, t = delta)
+            log.debug('Packet received in %.3f usecs for group %s after join' %(delta, pkt[IP].dst))
+            self.test_status = True
+
+      def tls_verify(self, subscriber):
+            if subscriber.has_service('TLS'):
+                  time.sleep(2)
+                  tls = TLSAuthTest()
+                  log.info('Running subscriber %s tls auth test' %subscriber.name)
+                  tls.runTest()
+                  self.test_status = True
+
+      def dhcp_verify(self, subscriber):
+            cip, sip = self.dhcp_request(subscriber, update_seed = True)
+            log.info('Subscriber %s got client ip %s from server %s' %(subscriber.name, cip, sip))
+            subscriber.src_list = [cip]
+            self.test_status = True
+
+      def dhcp_jump_verify(self, subscriber):
+          cip, sip = self.dhcp_request(subscriber, seed_ip = '10.10.200.1')
+          log.info('Subscriber %s got client ip %s from server %s' %(subscriber.name, cip, sip))
+          subscriber.src_list = [cip]
+          self.test_status = True
+
+      def dhcp_next_verify(self, subscriber):
+          cip, sip = self.dhcp_request(subscriber, seed_ip = '10.10.150.1')
+          log.info('Subscriber %s got client ip %s from server %s' %(subscriber.name, cip, sip))
+          subscriber.src_list = [cip]
+          self.test_status = True
+
+      def igmp_verify(self, subscriber):
+            chan = 0
+            if subscriber.has_service('IGMP'):
+                  for i in range(5):
+                        log.info('Joining channel %d for subscriber %s' %(chan, subscriber.name))
+                        subscriber.channel_join(chan, delay = 0)
+                        subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 1)
+                        log.info('Leaving channel %d for subscriber %s' %(chan, subscriber.name))
+                        subscriber.channel_leave(chan)
+                        time.sleep(3)
+                        log.info('Interface %s Join RX stats for subscriber %s, %s' %(subscriber.iface, subscriber.name,subscriber.join_rx_stats))
+                  self.test_status = True
+
+      def igmp_jump_verify(self, subscriber):
+            if subscriber.has_service('IGMP'):
+                  for i in xrange(subscriber.num):
+                        log.info('Subscriber %s jumping channel' %subscriber.name)
+                        chan = subscriber.channel_jump(delay=0)
+                        subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 1)
+                        log.info('Verified receive for channel %d, subscriber %s' %(chan, subscriber.name))
+                        time.sleep(3)
+                  log.info('Interface %s Jump RX stats for subscriber %s, %s' %(subscriber.iface, subscriber.name, subscriber.join_rx_stats))
+                  self.test_status = True
+
+      def igmp_next_verify(self, subscriber):
+            if subscriber.has_service('IGMP'):
+                  for i in xrange(subscriber.num):
+                        if i:
+                              chan = subscriber.channel_join_next(delay=0)
+                        else:
+                              chan = subscriber.channel_join(i, delay=0)
+                        log.info('Joined next channel %d for subscriber %s' %(chan, subscriber.name))
+                        subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count=1)
+                        log.info('Verified receive for channel %d, subscriber %s' %(chan, subscriber.name))
+                        time.sleep(3)
+                  log.info('Interface %s Join Next RX stats for subscriber %s, %s' %(subscriber.iface, subscriber.name, subscriber.join_rx_stats))
+                  self.test_status = True
+
+      def generate_port_list(self, subscribers, channels):
+            return self.port_list[:subscribers]
+
+      def subscriber_load(self, create = True, num = 10, num_channels = 1, channel_start = 0, port_list = []):
+            '''Load the subscriber from the database'''
+            self.subscriber_db = SubscriberDB(create = create)
+            if create is True:
+                  self.subscriber_db.generate(num)
+            self.subscriber_info = self.subscriber_db.read(num)
+            self.subscriber_list = []
+            if not port_list:
+                  port_list = self.generate_port_list(num, num_channels)
+
+            index = 0
+            for info in self.subscriber_info:
+                  self.subscriber_list.append(Subscriber(name=info['Name'], 
+                                                         service=info['Service'],
+                                                         port_map = self.port_map,
+                                                         num=num_channels,
+                                                         channel_start = channel_start,
+                                                         tx_port = port_list[index][0],
+                                                         rx_port = port_list[index][1]))
+                  if num_channels > 1:
+                        channel_start += num_channels
+                  index += 1
+
+            #load the ssm list for all subscriber channels
+            igmpChannel = IgmpChannel()
+            ssm_groups = map(lambda sub: sub.channels, self.subscriber_list)
+            ssm_list = reduce(lambda ssm1, ssm2: ssm1+ssm2, ssm_groups)
+            igmpChannel.igmp_load_ssm_config(ssm_list)
+
+      def subscriber_join_verify( self, num_subscribers = 10, num_channels = 1, 
+                                  channel_start = 0, cbs = None, port_list = []):
+          self.test_status = False
+          self.num_subscribers = num_subscribers
+          self.subscriber_load(create = True, num = num_subscribers,
+                               num_channels = num_channels, channel_start = channel_start, port_list = port_list)
+          self.onos_aaa_load()
+          self.thread_pool = ThreadPool(min(100, self.num_subscribers), queue_size=1, wait_timeout=1)
+          if cbs is None:
+                cbs = (self.tls_verify, self.dhcp_verify, self.igmp_verify)
+          for subscriber in self.subscriber_list:
+                subscriber.start()
+                pool_object = subscriber_pool(subscriber, cbs)
+                self.thread_pool.addTask(pool_object.pool_cb)
+          self.thread_pool.cleanUpThreads()
+          for subscriber in self.subscriber_list:
+                subscriber.stop()
+          return self.test_status
+
+      def test_subscriber_join_recv(self):
+          """Test subscriber join and receive"""
+          num_subscribers = 5
+          num_channels = 1
+          test_status = self.subscriber_join_verify(num_subscribers = num_subscribers, 
+                                                    num_channels = num_channels,
+                                                    port_list = self.generate_port_list(num_subscribers, num_channels))
+          assert_equal(test_status, True)
+
+      def test_subscriber_join_jump(self):
+          """Test subscriber join and receive for channel surfing""" 
+          num_subscribers = 5
+          num_channels = 50
+          test_status = self.subscriber_join_verify(num_subscribers = num_subscribers, 
+                                                    num_channels = num_channels,
+                                                    cbs = (self.tls_verify, self.dhcp_jump_verify, self.igmp_jump_verify),
+                                                    port_list = self.generate_port_list(num_subscribers, num_channels))
+          assert_equal(test_status, True)
+
+      def test_subscriber_join_next(self):
+          """Test subscriber join next for channels"""
+          num_subscribers = 5
+          num_channels = 50
+          test_status = self.subscriber_join_verify(num_subscribers = num_subscribers, 
+                                                    num_channels = num_channels,
+                                                    cbs = (self.tls_verify, self.dhcp_next_verify, self.igmp_next_verify),
+                                                    port_list = self.generate_port_list(num_subscribers, num_channels))
+          assert_equal(test_status, True)
diff --git a/src/test/utils/OltConfig.py b/src/test/utils/OltConfig.py
index a9fa3c3..bb78563 100644
--- a/src/test/utils/OltConfig.py
+++ b/src/test/utils/OltConfig.py
@@ -56,6 +56,40 @@
         else:
             return None
 
+    def olt_port_map_multi(self):
+        if self.on_olt() and self.olt_conf.has_key('port_map'):
+            port_map = {}
+            if self.olt_conf['port_map'].has_key('ports'):
+                port_map['ports'] = self.olt_conf['port_map']['ports']
+            else:
+                port_map['ports'] = []
+                num_ports = int(self.olt_conf['port_map']['num_ports'])
+                port_map['port'] = self.olt_conf['port_map']['port']
+                for port in xrange(0, num_ports, 2):
+                    port_map['ports'].append('veth{}'.format(port))
+            port_num = 1
+            port_map['uplink'] = int(self.olt_conf['uplink'])
+            port_list = []
+            ##build the port map and inverse port map
+            for port in port_map['ports']:
+                port_map[port_num] = port
+                port_map[port] = port_num
+                if port_num != port_map['uplink']:
+                    ##create tx,rx map
+                    port_list.append( (port_map['uplink'], port_num ) )
+                port_num += 1
+            port_map['start_vlan'] = 0
+            if self.olt_conf['port_map'].has_key('host'):
+                port_map['host'] = self.olt_conf['port_map']['host']
+            else:
+                port_map['host'] = 'ovsbr0'
+            if self.olt_conf['port_map'].has_key('start_vlan'):
+                port_map['start_vlan'] = int(self.olt_conf['port_map']['start_vlan'])
+
+            return port_map, port_list
+        else:
+            return None, None
+
     def olt_device_data(self):
         if self.on_olt():
             accessDeviceDict = {}
diff --git a/src/test/utils/OnosCtrl.py b/src/test/utils/OnosCtrl.py
index 91d5586..880e8f6 100644
--- a/src/test/utils/OnosCtrl.py
+++ b/src/test/utils/OnosCtrl.py
@@ -95,3 +95,12 @@
                                    params = params, headers = headers,
                                    data = payload)
         return result.ok, result.status_code
+
+    @classmethod
+    def uninstall_app(cls, app_name, onos_ip = None):
+        params = {'activate':'true'}
+        headers = {'content-type':'application/octet-stream'}
+        url = cls.applications_url if onos_ip is None else 'http://{0}:8181/onos/v1/applications'.format(onos_ip)
+        app_url = '{}/{}'.format(url, app_name)
+        resp = requests.delete(app_url, auth = cls.auth)
+        return resp.ok, resp.status_code