Adding subscriber channel surfing experience.
This includes support to test multiple subscribers in parallel on multiple channels.
Various other changes to igmp, dhcp and Channels interface.
diff --git a/src/test/subscriber/subscriberTest.py b/src/test/subscriber/subscriberTest.py
index cc0589f..6b7e9ff 100644
--- a/src/test/subscriber/subscriberTest.py
+++ b/src/test/subscriber/subscriberTest.py
@@ -12,8 +12,9 @@
 from OnosCtrl import OnosCtrl
 from DHCP import DHCPTest
 from EapTLS import TLSAuthTest
-from Channels import Channels
+from Channels import Channels, IgmpChannel
 from subscriberDb import SubscriberDB
+from threadPool import ThreadPool
 log.setLevel('INFO')
 
 class Subscriber(Channels):
@@ -23,9 +24,11 @@
       STATS_JOIN = 2
       STATS_LEAVE = 3
       SUBSCRIBER_SERVICES = 'DHCP IGMP TLS'
-      def __init__(self, name = 'sub', service = SUBSCRIBER_SERVICES, num = 1, iface = 'veth0', iface_mcast = 'veth2', 
+      def __init__(self, name = 'sub', service = SUBSCRIBER_SERVICES, num = 1, channel_start = 0, 
+                   iface = 'veth0', iface_mcast = 'veth2', 
                    mcast_cb = None, loginType = 'wireless'):
-            Channels.__init__(self, num, iface = iface, iface_mcast = iface_mcast, mcast_cb = mcast_cb)
+            Channels.__init__(self, num, channel_start = channel_start, 
+                              iface = iface, iface_mcast = iface_mcast, mcast_cb = mcast_cb)
             self.name = name
             self.service = service
             self.service_map = {}
@@ -93,8 +96,32 @@
                         self.join_map[c][stats_type].update(packets = packets, t = t)
 
       def channel_receive(self, chan, cb = None, count = 1):
+            log.info('Subscriber %s receiving from group %s, channel %d' %(self.name, self.gaddr(chan), chan))
             self.recv(chan, cb = cb, count = count)
 
+      def recv_channel_cb(self, pkt):
+            ##First verify that we have received the packet for the joined instance
+            log.debug('Packet received for group %s, subscriber %s' %(pkt[IP].dst, self.name))
+            chan = self.caddr(pkt[IP].dst)
+            assert_equal(chan in self.join_map.keys(), True)
+            recv_time = monotonic.monotonic() * 1000000
+            join_time = self.join_map[chan][self.STATS_JOIN].start
+            delta = recv_time - join_time
+            self.join_rx_stats.update(packets=1, t = delta, usecs = True)
+            self.channel_update(chan, self.STATS_RX, 1, t = delta)
+            log.debug('Packet received in %.3f usecs for group %s after join' %(delta, pkt[IP].dst))
+
+class subscriber_pool:
+
+      def __init__(self, subscriber, test_cbs):
+            self.subscriber = subscriber
+            self.test_cbs = test_cbs
+
+      def pool_cb(self):
+            for cb in self.test_cbs:
+                  if cb:
+                        cb(self.subscriber)
+      
 class subscriber_exchange(unittest.TestCase):
 
       apps = [ 'org.onosproject.aaa', 'org.onosproject.dhcp' ]
@@ -112,6 +139,8 @@
         "endip": "10.1.11.100"
       }
 
+      aaa_loaded = False
+
       def setUp(self):
           ''' Activate the dhcp and igmp apps'''
           for app in self.apps:
@@ -127,11 +156,14 @@
               onos_ctrl.deactivate()
 
       def onos_aaa_load(self):
+            if self.aaa_loaded:
+                  return
             aaa_dict = {'apps' : { 'org.onosproject.aaa' : { 'AAA' : { 'radiusSecret': 'radius_password', 
                                                                        'radiusIp': '172.17.0.2' } } } }
             radius_ip = os.getenv('ONOS_AAA_IP') or '172.17.0.2'
             aaa_dict['apps']['org.onosproject.aaa']['AAA']['radiusIp'] = radius_ip
             self.onos_load_config('org.onosproject.aaa', aaa_dict)
+            self.aaa_loaded = True
 
       def onos_dhcp_table_load(self, config = None):
           dhcp_dict = {'apps' : { 'org.onosproject.dhcp' : { 'dhcp' : copy.copy(self.dhcp_server_config) } } }
@@ -158,13 +190,13 @@
                      (cip, sip, self.dhcp.get_mac(cip)[0]))
             return cip,sip
 
-      def dhcp_request(self, seed_ip = '10.10.10.1', iface = 'veth0'):
+      def dhcp_request(self, seed_ip = '10.10.10.1', iface = 'veth0', update_seed = False):
             config = {'startip':'10.10.10.20', 'endip':'10.10.10.69',
                       'ip':'10.10.10.2', 'mac': "ca:fe:ca:fe:ca:fe",
                       'subnet': '255.255.255.0', 'broadcast':'10.10.10.255', 'router':'10.10.10.1'}
             self.onos_dhcp_table_load(config)
             self.dhcp = DHCPTest(seed_ip = seed_ip, iface = iface)
-            cip, sip = self.dhcp_sndrcv()
+            cip, sip = self.dhcp_sndrcv(update_seed = update_seed)
             return cip, sip
 
       def recv_channel_cb(self, pkt):
@@ -176,10 +208,74 @@
             delta = recv_time - join_time
             self.subscriber.join_rx_stats.update(packets=1, t = delta, usecs = True)
             self.subscriber.channel_update(chan, self.subscriber.STATS_RX, 1, t = delta)
-            log.info('Packet received in %.3f usecs for group %s after join' %(delta, pkt[IP].dst))
+            log.debug('Packet received in %.3f usecs for group %s after join' %(delta, pkt[IP].dst))
             self.test_status = True
 
-      def subscriber_load(self, create = True, num = 10):
+      def tls_verify(self, subscriber):
+            if subscriber.has_service('TLS'):
+                  time.sleep(2)
+                  tls = TLSAuthTest()
+                  log.info('Running subscriber %s tls auth test' %subscriber.name)
+                  tls.runTest()
+                  self.test_status = True
+
+      def dhcp_verify(self, subscriber):
+            cip, sip = self.dhcp_request(iface = subscriber.iface, update_seed = True)
+            log.info('Subscriber %s got client ip %s from server %s' %(subscriber.name, cip, sip))
+            subscriber.src_list = [cip]
+            self.test_status = True
+
+      def dhcp_jump_verify(self, subscriber):
+          cip, sip = self.dhcp_request(seed_ip = '10.10.200.1', iface = subscriber.iface)
+          log.info('Subscriber %s got client ip %s from server %s' %(subscriber.name, cip, sip))
+          subscriber.src_list = [cip]
+          self.test_status = True
+
+      def dhcp_next_verify(self, subscriber):
+          cip, sip = self.dhcp_request(seed_ip = '10.10.150.1', iface = subscriber.iface)
+          log.info('Subscriber %s got client ip %s from server %s' %(subscriber.name, cip, sip))
+          subscriber.src_list = [cip]
+          self.test_status = True
+
+      def igmp_verify(self, subscriber):
+            chan = 0
+            if subscriber.has_service('IGMP'):
+                  for i in range(5):
+                        log.info('Joining channel %d for subscriber %s' %(chan, subscriber.name))
+                        subscriber.channel_join(chan, delay = 0)
+                        subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 1)
+                        log.info('Leaving channel %d for subscriber %s' %(chan, subscriber.name))
+                        subscriber.channel_leave(chan)
+                        time.sleep(3)
+                        log.info('Join RX stats for subscriber %s, %s' %(subscriber.name,subscriber.join_rx_stats))
+                  self.test_status = True
+
+      def igmp_jump_verify(self, subscriber):
+            if subscriber.has_service('IGMP'):
+                  for i in xrange(subscriber.num):
+                        log.info('Subscriber %s jumping channel' %subscriber.name)
+                        chan = subscriber.channel_jump(delay=0)
+                        subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 1)
+                        log.info('Verified receive for channel %d, subscriber %s' %(chan, subscriber.name))
+                        time.sleep(3)
+                  log.info('Join RX stats for subscriber %s, %s' %(subscriber.name, subscriber.join_rx_stats))
+                  self.test_status = True
+
+      def igmp_next_verify(self, subscriber):
+            if subscriber.has_service('IGMP'):
+                  for i in xrange(subscriber.num):
+                        if i:
+                              chan = subscriber.channel_join_next(delay=0)
+                        else:
+                              chan = subscriber.channel_join(i, delay=0)
+                        log.info('Joined next channel %d for subscriber %s' %(chan, subscriber.name))
+                        subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count=1)
+                        log.info('Verified receive for channel %d, subscriber %s' %(chan, subscriber.name))
+                        time.sleep(3)
+                  log.info('Join Next RX stats for subscriber %s, %s' %(subscriber.name, subscriber.join_rx_stats))
+                  self.test_status = True
+
+      def subscriber_load(self, create = True, num = 10, num_channels = 1, channel_start = 0):
             '''Load the subscriber from the database'''
             self.subscriber_db = SubscriberDB(create = create)
             if create is True:
@@ -187,89 +283,52 @@
             self.subscriber_info = self.subscriber_db.read(num)
             self.subscriber_list = []
             for info in self.subscriber_info:
-                  self.subscriber_list.append(Subscriber(info['Name'], info['Service']))
+                  self.subscriber_list.append(Subscriber(name=info['Name'], 
+                                                         service=info['Service'],
+                                                         num=num_channels,
+                                                         channel_start = channel_start))
+                  channel_start += num_channels
+            
+            #load the ssm list for all subscriber channels
+            igmpChannel = IgmpChannel()
+            ssm_groups = map(lambda sub: sub.channels, self.subscriber_list)
+            ssm_list = reduce(lambda ssm1, ssm2: ssm1+ssm2, ssm_groups)
+            igmpChannel.igmp_load_ssm_config(ssm_list)
 
-      def test_subscriber_join_recv( self, chan = 0):
-          """Test 1 subscriber join and receive""" 
+      def subscriber_join_verify( self, num_subscribers = 10, num_channels = 1, 
+                                  channel_start = 0, cbs = None):
           self.test_status = False
-          self.num_subscribers = 5
-          self.subscriber_load(create = True, num = self.num_subscribers)
+          self.num_subscribers = num_subscribers
+          self.subscriber_load(create = True, num = self.num_subscribers, 
+                               num_channels = num_channels, channel_start = channel_start)
           self.onos_aaa_load()
+          self.thread_pool = ThreadPool(min(100, self.num_subscribers), queue_size=1, wait_timeout=1)
+          if cbs is None:
+                cbs = (self.tls_verify, self.dhcp_verify, self.igmp_verify)
           for subscriber in self.subscriber_list:
-                self.subscriber = subscriber
-                self.subscriber.start()
-                log.info('Testing subscriber %s for %s' %(subscriber.name, subscriber.service))
-                if self.subscriber.has_service('TLS'):
-                      time.sleep(2)
-                      tls = TLSAuthTest()
-                      tls.runTest()
-                      self.test_status = True
-                if self.subscriber.has_service('DHCP'):
-                      cip, sip = self.dhcp_request(iface = self.subscriber.iface)
-                      log.info('Got client ip %s from server %s' %(cip, sip))
-                      self.subscriber.src_list = [cip]
-                      self.test_status = True
-                if self.subscriber.has_service('IGMP'):
-                      for i in range(5):
-                            log.info('Joining channel %d' %chan)
-                            self.subscriber.channel_join(chan, delay = 0)
-                            self.subscriber.channel_receive(chan, cb = self.recv_channel_cb, count = 1)
-                            log.info('Leaving channel %d' %chan)
-                            self.subscriber.channel_leave(chan)
-                            time.sleep(3)
-                      log.info('Join RX stats %s' %self.subscriber.join_rx_stats)
-                self.subscriber.stop()
-          ##Terminate the tests on success
-          assert_equal(self.test_status, True)
+                subscriber.start()
+                pool_object = subscriber_pool(subscriber, cbs)
+                self.thread_pool.addTask(pool_object.pool_cb)
+          self.thread_pool.cleanUpThreads()
+          for subscriber in self.subscriber_list:
+                subscriber.stop()
+          return self.test_status
 
+      def test_subscriber_join_recv(self):
+          """Test subscriber join and receive""" 
+          test_status = self.subscriber_join_verify(num_subscribers = 50, num_channels = 1)
+          assert_equal(test_status, True)
 
       def test_subscriber_join_jump(self):
-          """Test 1 subscriber join and receive""" 
-          self.test_status = False
-          self.subscriber = Subscriber(50)
-          self.subscriber.start()
-          self.onos_aaa_load()
-          #tls = TLSAuthTest()
-          #tls.runTest()
-          ##Next get dhcp
-          cip, sip = self.dhcp_request(seed_ip = '10.10.200.1', iface = self.subscriber.iface)
-          log.info('Got client ip %s from server %s' %(cip, sip))
-          self.subscriber.src_list = [cip]
-          for i in range(50):
-                log.info('Jumping channel')
-                chan = self.subscriber.channel_jump(delay=0)
-                self.subscriber.channel_receive(chan, cb = self.recv_channel_cb, count = 1)
-                log.info('Verified receive for channel %d' %chan)
-                time.sleep(3)
-
-          log.info('Join RX stats %s' %self.subscriber.join_rx_stats)
-          self.subscriber.stop()
-          ##Terminate the tests on success
-          assert_equal(self.test_status, True)
+          """Test subscriber join and receive for channel surfing""" 
+          test_status = self.subscriber_join_verify(num_subscribers = 5, 
+                                                    num_channels = 50,
+                                                    cbs = (self.tls_verify, self.dhcp_jump_verify, self.igmp_jump_verify))
+          assert_equal(test_status, True)
 
       def test_subscriber_join_next(self):
           """Test subscriber join next for channels"""
-          self.test_status = False
-          self.subscriber = Subscriber(10)
-          self.subscriber.start()
-          self.onos_aaa_load()
-          #tls = TLSAuthTest()
-          #tls.runTest()
-          ##Next get dhcp
-          cip, sip = self.dhcp_request(seed_ip = '10.10.150.1', iface = self.subscriber.iface)
-          log.info('Got client ip %s from server %s' %(cip, sip))
-          self.subscriber.src_list = [cip]
-          for i in range(10):
-                if i:
-                      chan = self.subscriber.channel_join_next(delay=0)
-                else:
-                      chan = self.subscriber.channel_join(i, delay=0)
-                log.info('Joined next channel %d' %chan)
-                self.subscriber.channel_receive(chan, cb = self.recv_channel_cb, count=1)
-                log.info('Verified receive for channel %d' %chan)
-                time.sleep(3)
-
-          log.info('Join Next RX stats %s' %self.subscriber.join_rx_stats)
-          self.subscriber.stop()
-          ##Terminate the tests on success
-          assert_equal(self.test_status, True)
+          test_status = self.subscriber_join_verify(num_subscribers = 5, 
+                                                    num_channels = 50,
+                                                    cbs = (self.tls_verify, self.dhcp_next_verify, self.igmp_next_verify))
+          assert_equal(test_status, True)