Adding subscriber channel surfing experience.
This includes support to test multiple subscribers in parallel on multiple channels.
Various other changes to igmp, dhcp and Channels interface.
diff --git a/.gitignore b/.gitignore
index ba74660..cf55afb 100644
--- a/.gitignore
+++ b/.gitignore
@@ -55,3 +55,9 @@
 
 # PyBuilder
 target/
+sub*.db
+*~
+*.swp
+*.jpeg
+*.gv
+*ascpc*@*#
diff --git a/src/test/igmp/igmpTest.py b/src/test/igmp/igmpTest.py
index 7439e6f..bb04af9 100644
--- a/src/test/igmp/igmpTest.py
+++ b/src/test/igmp/igmpTest.py
@@ -331,7 +331,7 @@
           '''For now, restricting it to 50/100'''
           s = (224 << 24) | 1
           #e = (225 << 24) | (255 << 16) | (255 << 16) | 255
-          e = (224 << 24) | 50
+          e = (224 << 24) | 25
           for i in xrange(s, e+1):
                 if i&0xff:
                       ip = '%d.%d.%d.%d'%((i>>24)&0xff, (i>>16)&0xff, (i>>8)&0xff, i&0xff)
diff --git a/src/test/subscriber/subscriberTest.py b/src/test/subscriber/subscriberTest.py
index cc0589f..6b7e9ff 100644
--- a/src/test/subscriber/subscriberTest.py
+++ b/src/test/subscriber/subscriberTest.py
@@ -12,8 +12,9 @@
 from OnosCtrl import OnosCtrl
 from DHCP import DHCPTest
 from EapTLS import TLSAuthTest
-from Channels import Channels
+from Channels import Channels, IgmpChannel
 from subscriberDb import SubscriberDB
+from threadPool import ThreadPool
 log.setLevel('INFO')
 
 class Subscriber(Channels):
@@ -23,9 +24,11 @@
       STATS_JOIN = 2
       STATS_LEAVE = 3
       SUBSCRIBER_SERVICES = 'DHCP IGMP TLS'
-      def __init__(self, name = 'sub', service = SUBSCRIBER_SERVICES, num = 1, iface = 'veth0', iface_mcast = 'veth2', 
+      def __init__(self, name = 'sub', service = SUBSCRIBER_SERVICES, num = 1, channel_start = 0, 
+                   iface = 'veth0', iface_mcast = 'veth2', 
                    mcast_cb = None, loginType = 'wireless'):
-            Channels.__init__(self, num, iface = iface, iface_mcast = iface_mcast, mcast_cb = mcast_cb)
+            Channels.__init__(self, num, channel_start = channel_start, 
+                              iface = iface, iface_mcast = iface_mcast, mcast_cb = mcast_cb)
             self.name = name
             self.service = service
             self.service_map = {}
@@ -93,8 +96,32 @@
                         self.join_map[c][stats_type].update(packets = packets, t = t)
 
       def channel_receive(self, chan, cb = None, count = 1):
+            log.info('Subscriber %s receiving from group %s, channel %d' %(self.name, self.gaddr(chan), chan))
             self.recv(chan, cb = cb, count = count)
 
+      def recv_channel_cb(self, pkt):
+            ##First verify that we have received the packet for the joined instance
+            log.debug('Packet received for group %s, subscriber %s' %(pkt[IP].dst, self.name))
+            chan = self.caddr(pkt[IP].dst)
+            assert_equal(chan in self.join_map.keys(), True)
+            recv_time = monotonic.monotonic() * 1000000
+            join_time = self.join_map[chan][self.STATS_JOIN].start
+            delta = recv_time - join_time
+            self.join_rx_stats.update(packets=1, t = delta, usecs = True)
+            self.channel_update(chan, self.STATS_RX, 1, t = delta)
+            log.debug('Packet received in %.3f usecs for group %s after join' %(delta, pkt[IP].dst))
+
+class subscriber_pool:
+
+      def __init__(self, subscriber, test_cbs):
+            self.subscriber = subscriber
+            self.test_cbs = test_cbs
+
+      def pool_cb(self):
+            for cb in self.test_cbs:
+                  if cb:
+                        cb(self.subscriber)
+      
 class subscriber_exchange(unittest.TestCase):
 
       apps = [ 'org.onosproject.aaa', 'org.onosproject.dhcp' ]
@@ -112,6 +139,8 @@
         "endip": "10.1.11.100"
       }
 
+      aaa_loaded = False
+
       def setUp(self):
           ''' Activate the dhcp and igmp apps'''
           for app in self.apps:
@@ -127,11 +156,14 @@
               onos_ctrl.deactivate()
 
       def onos_aaa_load(self):
+            if self.aaa_loaded:
+                  return
             aaa_dict = {'apps' : { 'org.onosproject.aaa' : { 'AAA' : { 'radiusSecret': 'radius_password', 
                                                                        'radiusIp': '172.17.0.2' } } } }
             radius_ip = os.getenv('ONOS_AAA_IP') or '172.17.0.2'
             aaa_dict['apps']['org.onosproject.aaa']['AAA']['radiusIp'] = radius_ip
             self.onos_load_config('org.onosproject.aaa', aaa_dict)
+            self.aaa_loaded = True
 
       def onos_dhcp_table_load(self, config = None):
           dhcp_dict = {'apps' : { 'org.onosproject.dhcp' : { 'dhcp' : copy.copy(self.dhcp_server_config) } } }
@@ -158,13 +190,13 @@
                      (cip, sip, self.dhcp.get_mac(cip)[0]))
             return cip,sip
 
-      def dhcp_request(self, seed_ip = '10.10.10.1', iface = 'veth0'):
+      def dhcp_request(self, seed_ip = '10.10.10.1', iface = 'veth0', update_seed = False):
             config = {'startip':'10.10.10.20', 'endip':'10.10.10.69',
                       'ip':'10.10.10.2', 'mac': "ca:fe:ca:fe:ca:fe",
                       'subnet': '255.255.255.0', 'broadcast':'10.10.10.255', 'router':'10.10.10.1'}
             self.onos_dhcp_table_load(config)
             self.dhcp = DHCPTest(seed_ip = seed_ip, iface = iface)
-            cip, sip = self.dhcp_sndrcv()
+            cip, sip = self.dhcp_sndrcv(update_seed = update_seed)
             return cip, sip
 
       def recv_channel_cb(self, pkt):
@@ -176,10 +208,74 @@
             delta = recv_time - join_time
             self.subscriber.join_rx_stats.update(packets=1, t = delta, usecs = True)
             self.subscriber.channel_update(chan, self.subscriber.STATS_RX, 1, t = delta)
-            log.info('Packet received in %.3f usecs for group %s after join' %(delta, pkt[IP].dst))
+            log.debug('Packet received in %.3f usecs for group %s after join' %(delta, pkt[IP].dst))
             self.test_status = True
 
-      def subscriber_load(self, create = True, num = 10):
+      def tls_verify(self, subscriber):
+            if subscriber.has_service('TLS'):
+                  time.sleep(2)
+                  tls = TLSAuthTest()
+                  log.info('Running subscriber %s tls auth test' %subscriber.name)
+                  tls.runTest()
+                  self.test_status = True
+
+      def dhcp_verify(self, subscriber):
+            cip, sip = self.dhcp_request(iface = subscriber.iface, update_seed = True)
+            log.info('Subscriber %s got client ip %s from server %s' %(subscriber.name, cip, sip))
+            subscriber.src_list = [cip]
+            self.test_status = True
+
+      def dhcp_jump_verify(self, subscriber):
+          cip, sip = self.dhcp_request(seed_ip = '10.10.200.1', iface = subscriber.iface)
+          log.info('Subscriber %s got client ip %s from server %s' %(subscriber.name, cip, sip))
+          subscriber.src_list = [cip]
+          self.test_status = True
+
+      def dhcp_next_verify(self, subscriber):
+          cip, sip = self.dhcp_request(seed_ip = '10.10.150.1', iface = subscriber.iface)
+          log.info('Subscriber %s got client ip %s from server %s' %(subscriber.name, cip, sip))
+          subscriber.src_list = [cip]
+          self.test_status = True
+
+      def igmp_verify(self, subscriber):
+            chan = 0
+            if subscriber.has_service('IGMP'):
+                  for i in range(5):
+                        log.info('Joining channel %d for subscriber %s' %(chan, subscriber.name))
+                        subscriber.channel_join(chan, delay = 0)
+                        subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 1)
+                        log.info('Leaving channel %d for subscriber %s' %(chan, subscriber.name))
+                        subscriber.channel_leave(chan)
+                        time.sleep(3)
+                        log.info('Join RX stats for subscriber %s, %s' %(subscriber.name,subscriber.join_rx_stats))
+                  self.test_status = True
+
+      def igmp_jump_verify(self, subscriber):
+            if subscriber.has_service('IGMP'):
+                  for i in xrange(subscriber.num):
+                        log.info('Subscriber %s jumping channel' %subscriber.name)
+                        chan = subscriber.channel_jump(delay=0)
+                        subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 1)
+                        log.info('Verified receive for channel %d, subscriber %s' %(chan, subscriber.name))
+                        time.sleep(3)
+                  log.info('Join RX stats for subscriber %s, %s' %(subscriber.name, subscriber.join_rx_stats))
+                  self.test_status = True
+
+      def igmp_next_verify(self, subscriber):
+            if subscriber.has_service('IGMP'):
+                  for i in xrange(subscriber.num):
+                        if i:
+                              chan = subscriber.channel_join_next(delay=0)
+                        else:
+                              chan = subscriber.channel_join(i, delay=0)
+                        log.info('Joined next channel %d for subscriber %s' %(chan, subscriber.name))
+                        subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count=1)
+                        log.info('Verified receive for channel %d, subscriber %s' %(chan, subscriber.name))
+                        time.sleep(3)
+                  log.info('Join Next RX stats for subscriber %s, %s' %(subscriber.name, subscriber.join_rx_stats))
+                  self.test_status = True
+
+      def subscriber_load(self, create = True, num = 10, num_channels = 1, channel_start = 0):
             '''Load the subscriber from the database'''
             self.subscriber_db = SubscriberDB(create = create)
             if create is True:
@@ -187,89 +283,52 @@
             self.subscriber_info = self.subscriber_db.read(num)
             self.subscriber_list = []
             for info in self.subscriber_info:
-                  self.subscriber_list.append(Subscriber(info['Name'], info['Service']))
+                  self.subscriber_list.append(Subscriber(name=info['Name'], 
+                                                         service=info['Service'],
+                                                         num=num_channels,
+                                                         channel_start = channel_start))
+                  channel_start += num_channels
+            
+            #load the ssm list for all subscriber channels
+            igmpChannel = IgmpChannel()
+            ssm_groups = map(lambda sub: sub.channels, self.subscriber_list)
+            ssm_list = reduce(lambda ssm1, ssm2: ssm1+ssm2, ssm_groups)
+            igmpChannel.igmp_load_ssm_config(ssm_list)
 
-      def test_subscriber_join_recv( self, chan = 0):
-          """Test 1 subscriber join and receive""" 
+      def subscriber_join_verify( self, num_subscribers = 10, num_channels = 1, 
+                                  channel_start = 0, cbs = None):
           self.test_status = False
-          self.num_subscribers = 5
-          self.subscriber_load(create = True, num = self.num_subscribers)
+          self.num_subscribers = num_subscribers
+          self.subscriber_load(create = True, num = self.num_subscribers, 
+                               num_channels = num_channels, channel_start = channel_start)
           self.onos_aaa_load()
+          self.thread_pool = ThreadPool(min(100, self.num_subscribers), queue_size=1, wait_timeout=1)
+          if cbs is None:
+                cbs = (self.tls_verify, self.dhcp_verify, self.igmp_verify)
           for subscriber in self.subscriber_list:
-                self.subscriber = subscriber
-                self.subscriber.start()
-                log.info('Testing subscriber %s for %s' %(subscriber.name, subscriber.service))
-                if self.subscriber.has_service('TLS'):
-                      time.sleep(2)
-                      tls = TLSAuthTest()
-                      tls.runTest()
-                      self.test_status = True
-                if self.subscriber.has_service('DHCP'):
-                      cip, sip = self.dhcp_request(iface = self.subscriber.iface)
-                      log.info('Got client ip %s from server %s' %(cip, sip))
-                      self.subscriber.src_list = [cip]
-                      self.test_status = True
-                if self.subscriber.has_service('IGMP'):
-                      for i in range(5):
-                            log.info('Joining channel %d' %chan)
-                            self.subscriber.channel_join(chan, delay = 0)
-                            self.subscriber.channel_receive(chan, cb = self.recv_channel_cb, count = 1)
-                            log.info('Leaving channel %d' %chan)
-                            self.subscriber.channel_leave(chan)
-                            time.sleep(3)
-                      log.info('Join RX stats %s' %self.subscriber.join_rx_stats)
-                self.subscriber.stop()
-          ##Terminate the tests on success
-          assert_equal(self.test_status, True)
+                subscriber.start()
+                pool_object = subscriber_pool(subscriber, cbs)
+                self.thread_pool.addTask(pool_object.pool_cb)
+          self.thread_pool.cleanUpThreads()
+          for subscriber in self.subscriber_list:
+                subscriber.stop()
+          return self.test_status
 
+      def test_subscriber_join_recv(self):
+          """Test subscriber join and receive""" 
+          test_status = self.subscriber_join_verify(num_subscribers = 50, num_channels = 1)
+          assert_equal(test_status, True)
 
       def test_subscriber_join_jump(self):
-          """Test 1 subscriber join and receive""" 
-          self.test_status = False
-          self.subscriber = Subscriber(50)
-          self.subscriber.start()
-          self.onos_aaa_load()
-          #tls = TLSAuthTest()
-          #tls.runTest()
-          ##Next get dhcp
-          cip, sip = self.dhcp_request(seed_ip = '10.10.200.1', iface = self.subscriber.iface)
-          log.info('Got client ip %s from server %s' %(cip, sip))
-          self.subscriber.src_list = [cip]
-          for i in range(50):
-                log.info('Jumping channel')
-                chan = self.subscriber.channel_jump(delay=0)
-                self.subscriber.channel_receive(chan, cb = self.recv_channel_cb, count = 1)
-                log.info('Verified receive for channel %d' %chan)
-                time.sleep(3)
-
-          log.info('Join RX stats %s' %self.subscriber.join_rx_stats)
-          self.subscriber.stop()
-          ##Terminate the tests on success
-          assert_equal(self.test_status, True)
+          """Test subscriber join and receive for channel surfing""" 
+          test_status = self.subscriber_join_verify(num_subscribers = 5, 
+                                                    num_channels = 50,
+                                                    cbs = (self.tls_verify, self.dhcp_jump_verify, self.igmp_jump_verify))
+          assert_equal(test_status, True)
 
       def test_subscriber_join_next(self):
           """Test subscriber join next for channels"""
-          self.test_status = False
-          self.subscriber = Subscriber(10)
-          self.subscriber.start()
-          self.onos_aaa_load()
-          #tls = TLSAuthTest()
-          #tls.runTest()
-          ##Next get dhcp
-          cip, sip = self.dhcp_request(seed_ip = '10.10.150.1', iface = self.subscriber.iface)
-          log.info('Got client ip %s from server %s' %(cip, sip))
-          self.subscriber.src_list = [cip]
-          for i in range(10):
-                if i:
-                      chan = self.subscriber.channel_join_next(delay=0)
-                else:
-                      chan = self.subscriber.channel_join(i, delay=0)
-                log.info('Joined next channel %d' %chan)
-                self.subscriber.channel_receive(chan, cb = self.recv_channel_cb, count=1)
-                log.info('Verified receive for channel %d' %chan)
-                time.sleep(3)
-
-          log.info('Join Next RX stats %s' %self.subscriber.join_rx_stats)
-          self.subscriber.stop()
-          ##Terminate the tests on success
-          assert_equal(self.test_status, True)
+          test_status = self.subscriber_join_verify(num_subscribers = 5, 
+                                                    num_channels = 50,
+                                                    cbs = (self.tls_verify, self.dhcp_next_verify, self.igmp_next_verify))
+          assert_equal(test_status, True)
diff --git a/src/test/tls/tlsAuthTest.py b/src/test/tls/tlsAuthTest.py
index c239f38..5a30c49 100644
--- a/src/test/tls/tlsAuthTest.py
+++ b/src/test/tls/tlsAuthTest.py
@@ -11,8 +11,6 @@
 
       def setUp(self):
             self.onos_ctrl = OnosCtrl(self.app)
-            self.onos_ctrl.deactivate()
-            time.sleep(2)
             self.onos_aaa_config()
 
       def onos_aaa_config(self):
diff --git a/src/test/utils/Channels.py b/src/test/utils/Channels.py
index 3c4ff45..083abd0 100644
--- a/src/test/utils/Channels.py
+++ b/src/test/utils/Channels.py
@@ -21,20 +21,22 @@
     IP_DST = '224.0.1.1'
     igmp_eth = Ether(dst = IGMP_DST_MAC, src = IGMP_SRC_MAC, type = ETH_P_IP)
     igmp_ip = IP(dst = IP_DST, src = IP_SRC)
+    ssm_list = [] 
 
-    def __init__(self, iface = 'veth0', src_list = ['1.2.3.4'], delay = 2):
+    def __init__(self, iface = 'veth0', ssm_list = [], src_list = ['1.2.3.4'], delay = 2):
         self.iface = iface
+        self.ssm_list += ssm_list
         self.src_list = src_list
         self.delay = delay
         self.onos_ctrl = OnosCtrl('org.onosproject.igmp')
         self.onos_ctrl.activate()
+    
+    def igmp_load_ssm_config(self, ssm_list = []):
+        if not ssm_list:
+            ssm_list = self.ssm_list
+        self.ssm_table_load(ssm_list)
 
-    def igmp_load_ssm_config(self, groups):
-        self.ssm_table_load(groups)
-
-    def igmp_join(self, groups, ssm_load = False):
-        if ssm_load:
-            self.igmp_load_ssm_config(groups)
+    def igmp_join(self, groups):
         igmp = IGMPv3(type = IGMP_TYPE_V3_MEMBERSHIP_REPORT, max_resp_code=30,
                       gaddr='224.0.1.1')
         for g in groups:
@@ -84,10 +86,12 @@
     Started = 1
     Idle = 0
     Joined = 1
-    def __init__(self, num, iface = 'veth0', iface_mcast = 'veth2', mcast_cb = None):
+    def __init__(self, num, channel_start = 0, iface = 'veth0', iface_mcast = 'veth2', mcast_cb = None):
         self.num = num
-        self.channels = self.generate(self.num)
+        self.channel_start = channel_start
+        self.channels = self.generate(self.num, self.channel_start)
         self.group_channel_map = {}
+        #assert_equal(len(self.channels), self.num)
         for i in range(self.num):
             self.group_channel_map[self.channels[i]] = i
         self.state = self.Stopped
@@ -99,18 +103,24 @@
         self.mcast_cb = mcast_cb
         for c in range(self.num):
             self.channel_states[c] = [self.Idle]
-
-        IgmpChannel.__init__(self, iface=iface)
-
-    def generate(self, num):
-        start = (224 << 24) | 1
-        end = start + num + num/256 
+        IgmpChannel.__init__(self, ssm_list = self.channels, iface=iface)
+        
+    def generate(self, num, channel_start = 0):
+        start = (224 << 24) | ( ( (channel_start >> 16) & 0xff) << 16 ) | \
+            ( ( (channel_start >> 8) & 0xff ) << 8 ) | (channel_start) & 0xff
+        start += channel_start/256 + 1
+        end = start + num
         group_addrs = []
-        for i in range(start, end):
-            if i&255:
-                g = '%s.%s.%s.%s' %((i>>24) &0xff, (i>>16)&0xff, (i>>8)&0xff, i&0xff)
-                log.debug('Adding group %s' %g)
-                group_addrs.append(g)
+        count = 0
+        while count != num:
+            for i in range(start, end):
+                if i&255:
+                    g = '%s.%s.%s.%s' %((i>>24) &0xff, (i>>16)&0xff, (i>>8)&0xff, i&0xff)
+                    log.debug('Adding group %s' %g)
+                    group_addrs.append(g)
+                    count += 1
+            start = end
+            end = start + 1
         return group_addrs
 
     def start(self):
@@ -132,8 +142,6 @@
             return chan, 0
 
         groups = [self.channels[chan]]
-        #load the ssm table first
-        self.igmp_load_ssm_config(groups)
         join_start = monotonic.monotonic()
         self.igmp_join(groups)
         self.set_state(chan, self.Joined)
@@ -228,8 +236,15 @@
         self.channel_states[chan][0] = state
 
 if __name__ == '__main__':
-    num = 2
-    channels = Channels(num)
+    num = 5
+    start = 0
+    ssm_list = []
+    for i in xrange(2):
+        channels = Channels(num, start)
+        ssm_list += channels.channels
+        start += num
+    igmpChannel = IgmpChannel()
+    igmpChannel.igmp_load_ssm_config(ssm_list)
     channels.start()
     for i in range(num):
         channels.join(i)
diff --git a/src/test/utils/DHCP.py b/src/test/utils/DHCP.py
index 64ae753..8dec80c 100644
--- a/src/test/utils/DHCP.py
+++ b/src/test/utils/DHCP.py
@@ -40,6 +40,7 @@
             print("Failed to acquire IP via DHCP for %s on interface %s" %(mac, self.iface))
             return (None, None)
 
+        subnet_mask = "0.0.0.0"
         for x in resp.lastlayer().options:
             if(x == 'end'):
                 break
@@ -52,7 +53,7 @@
         L5 = BOOTP(chaddr=chmac, yiaddr=srcIP)
         L6 = DHCP(options=[("message-type","request"), ("server_id",server_id), 
                            ("subnet_mask",subnet_mask), ("requested_addr",srcIP), "end"])
-        srp1(L2/L3/L4/L5/L6, filter="udp and port 68", timeout=5, iface=self.iface)
+        srp(L2/L3/L4/L5/L6, filter="udp and port 68", timeout=5, iface=self.iface)
         self.mac_map[mac] = (srcIP, serverIP)
         self.mac_inverse_map[srcIP] = (mac, serverIP)
         return (srcIP, serverIP)
diff --git a/src/test/utils/threadPool.py b/src/test/utils/threadPool.py
new file mode 100644
index 0000000..bd3e21e
--- /dev/null
+++ b/src/test/utils/threadPool.py
@@ -0,0 +1,80 @@
+import threading
+import Queue
+
+class PoolThread(threading.Thread):
+
+    def __init__(self, requests_queue, wait_timeout, daemon, **kwds):
+        threading.Thread.__init__(self, **kwds)
+        self.daemon = daemon
+        self._queue = requests_queue
+        self._wait_timeout = wait_timeout
+        self._finished = threading.Event()
+        self.start()
+
+    def run(self):
+        while True:
+            if(self._finished.isSet()):
+                break
+
+            try:
+                work = self._queue.get(block=True, timeout=self._wait_timeout)
+            except Queue.Empty:
+                continue
+            else:
+                try:
+                    work.__call__()
+                finally:
+                    self._queue.task_done()
+
+
+
+class ThreadPool:
+
+    def __init__(self, pool_size, daemon=False, queue_size=0, wait_timeout=5):
+        """Set up the thread pool and create pool_size threads
+        """
+        self._queue = Queue.Queue(queue_size)
+        self._daemon = daemon
+        self._threads = []
+        self._pool_size = pool_size
+        self._wait_timeout = wait_timeout
+        self.createThreads()
+
+
+    def addTask(self, callableObject):
+        if (callable(callableObject)):
+            self._queue.put(callableObject, block=True)
+
+    def cleanUpThreads(self):
+        self._queue.join()
+
+        for t in self._threads:
+            t._finished.set()
+
+
+    def createThreads(self):
+        for i in range(self._pool_size):
+            self._threads.append(PoolThread(self._queue, self._wait_timeout, self._daemon))
+
+
+class CallObject:
+    def __init__(self, v = 0): 
+        self.v = v
+    def callCb(self): 
+        print 'Inside callback for %d' %self.v
+
+if __name__ == '__main__':
+    import multiprocessing
+    callList = []
+    cpu_count = multiprocessing.cpu_count()
+    for i in xrange(cpu_count * 2):
+        callList.append(CallObject(i))
+    tp = ThreadPool(cpu_count * 2, queue_size=1, wait_timeout=1)
+    for i in range(40):
+        callObject = callList[i% (cpu_count*2)]
+        f = callObject.callCb
+        tp.addTask(f)
+
+    tp.cleanUpThreads()
+
+