Adding subscriber channel surfing experience.
This includes support to test multiple subscribers in parallel on multiple channels.
Various other changes to igmp, dhcp and Channels interface.
diff --git a/src/test/igmp/igmpTest.py b/src/test/igmp/igmpTest.py
index 7439e6f..bb04af9 100644
--- a/src/test/igmp/igmpTest.py
+++ b/src/test/igmp/igmpTest.py
@@ -331,7 +331,7 @@
'''For now, restricting it to 50/100'''
s = (224 << 24) | 1
#e = (225 << 24) | (255 << 16) | (255 << 16) | 255
- e = (224 << 24) | 50
+ e = (224 << 24) | 25
for i in xrange(s, e+1):
if i&0xff:
ip = '%d.%d.%d.%d'%((i>>24)&0xff, (i>>16)&0xff, (i>>8)&0xff, i&0xff)
diff --git a/src/test/subscriber/subscriberTest.py b/src/test/subscriber/subscriberTest.py
index cc0589f..6b7e9ff 100644
--- a/src/test/subscriber/subscriberTest.py
+++ b/src/test/subscriber/subscriberTest.py
@@ -12,8 +12,9 @@
from OnosCtrl import OnosCtrl
from DHCP import DHCPTest
from EapTLS import TLSAuthTest
-from Channels import Channels
+from Channels import Channels, IgmpChannel
from subscriberDb import SubscriberDB
+from threadPool import ThreadPool
log.setLevel('INFO')
class Subscriber(Channels):
@@ -23,9 +24,11 @@
STATS_JOIN = 2
STATS_LEAVE = 3
SUBSCRIBER_SERVICES = 'DHCP IGMP TLS'
- def __init__(self, name = 'sub', service = SUBSCRIBER_SERVICES, num = 1, iface = 'veth0', iface_mcast = 'veth2',
+ def __init__(self, name = 'sub', service = SUBSCRIBER_SERVICES, num = 1, channel_start = 0,
+ iface = 'veth0', iface_mcast = 'veth2',
mcast_cb = None, loginType = 'wireless'):
- Channels.__init__(self, num, iface = iface, iface_mcast = iface_mcast, mcast_cb = mcast_cb)
+ Channels.__init__(self, num, channel_start = channel_start,
+ iface = iface, iface_mcast = iface_mcast, mcast_cb = mcast_cb)
self.name = name
self.service = service
self.service_map = {}
@@ -93,8 +96,32 @@
self.join_map[c][stats_type].update(packets = packets, t = t)
def channel_receive(self, chan, cb = None, count = 1):
+ log.info('Subscriber %s receiving from group %s, channel %d' %(self.name, self.gaddr(chan), chan))
self.recv(chan, cb = cb, count = count)
+ def recv_channel_cb(self, pkt):
+ ##First verify that we have received the packet for the joined instance
+ log.debug('Packet received for group %s, subscriber %s' %(pkt[IP].dst, self.name))
+ chan = self.caddr(pkt[IP].dst)
+ assert_equal(chan in self.join_map.keys(), True)
+ recv_time = monotonic.monotonic() * 1000000
+ join_time = self.join_map[chan][self.STATS_JOIN].start
+ delta = recv_time - join_time
+ self.join_rx_stats.update(packets=1, t = delta, usecs = True)
+ self.channel_update(chan, self.STATS_RX, 1, t = delta)
+ log.debug('Packet received in %.3f usecs for group %s after join' %(delta, pkt[IP].dst))
+
+class subscriber_pool:
+
+ def __init__(self, subscriber, test_cbs):
+ self.subscriber = subscriber
+ self.test_cbs = test_cbs
+
+ def pool_cb(self):
+ for cb in self.test_cbs:
+ if cb:
+ cb(self.subscriber)
+
class subscriber_exchange(unittest.TestCase):
apps = [ 'org.onosproject.aaa', 'org.onosproject.dhcp' ]
@@ -112,6 +139,8 @@
"endip": "10.1.11.100"
}
+ aaa_loaded = False
+
def setUp(self):
''' Activate the dhcp and igmp apps'''
for app in self.apps:
@@ -127,11 +156,14 @@
onos_ctrl.deactivate()
def onos_aaa_load(self):
+ if self.aaa_loaded:
+ return
aaa_dict = {'apps' : { 'org.onosproject.aaa' : { 'AAA' : { 'radiusSecret': 'radius_password',
'radiusIp': '172.17.0.2' } } } }
radius_ip = os.getenv('ONOS_AAA_IP') or '172.17.0.2'
aaa_dict['apps']['org.onosproject.aaa']['AAA']['radiusIp'] = radius_ip
self.onos_load_config('org.onosproject.aaa', aaa_dict)
+ self.aaa_loaded = True
def onos_dhcp_table_load(self, config = None):
dhcp_dict = {'apps' : { 'org.onosproject.dhcp' : { 'dhcp' : copy.copy(self.dhcp_server_config) } } }
@@ -158,13 +190,13 @@
(cip, sip, self.dhcp.get_mac(cip)[0]))
return cip,sip
- def dhcp_request(self, seed_ip = '10.10.10.1', iface = 'veth0'):
+ def dhcp_request(self, seed_ip = '10.10.10.1', iface = 'veth0', update_seed = False):
config = {'startip':'10.10.10.20', 'endip':'10.10.10.69',
'ip':'10.10.10.2', 'mac': "ca:fe:ca:fe:ca:fe",
'subnet': '255.255.255.0', 'broadcast':'10.10.10.255', 'router':'10.10.10.1'}
self.onos_dhcp_table_load(config)
self.dhcp = DHCPTest(seed_ip = seed_ip, iface = iface)
- cip, sip = self.dhcp_sndrcv()
+ cip, sip = self.dhcp_sndrcv(update_seed = update_seed)
return cip, sip
def recv_channel_cb(self, pkt):
@@ -176,10 +208,74 @@
delta = recv_time - join_time
self.subscriber.join_rx_stats.update(packets=1, t = delta, usecs = True)
self.subscriber.channel_update(chan, self.subscriber.STATS_RX, 1, t = delta)
- log.info('Packet received in %.3f usecs for group %s after join' %(delta, pkt[IP].dst))
+ log.debug('Packet received in %.3f usecs for group %s after join' %(delta, pkt[IP].dst))
self.test_status = True
- def subscriber_load(self, create = True, num = 10):
+ def tls_verify(self, subscriber):
+ if subscriber.has_service('TLS'):
+ time.sleep(2)
+ tls = TLSAuthTest()
+ log.info('Running subscriber %s tls auth test' %subscriber.name)
+ tls.runTest()
+ self.test_status = True
+
+ def dhcp_verify(self, subscriber):
+ cip, sip = self.dhcp_request(iface = subscriber.iface, update_seed = True)
+ log.info('Subscriber %s got client ip %s from server %s' %(subscriber.name, cip, sip))
+ subscriber.src_list = [cip]
+ self.test_status = True
+
+ def dhcp_jump_verify(self, subscriber):
+ cip, sip = self.dhcp_request(seed_ip = '10.10.200.1', iface = subscriber.iface)
+ log.info('Subscriber %s got client ip %s from server %s' %(subscriber.name, cip, sip))
+ subscriber.src_list = [cip]
+ self.test_status = True
+
+ def dhcp_next_verify(self, subscriber):
+ cip, sip = self.dhcp_request(seed_ip = '10.10.150.1', iface = subscriber.iface)
+ log.info('Subscriber %s got client ip %s from server %s' %(subscriber.name, cip, sip))
+ subscriber.src_list = [cip]
+ self.test_status = True
+
+ def igmp_verify(self, subscriber):
+ chan = 0
+ if subscriber.has_service('IGMP'):
+ for i in range(5):
+ log.info('Joining channel %d for subscriber %s' %(chan, subscriber.name))
+ subscriber.channel_join(chan, delay = 0)
+ subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 1)
+ log.info('Leaving channel %d for subscriber %s' %(chan, subscriber.name))
+ subscriber.channel_leave(chan)
+ time.sleep(3)
+ log.info('Join RX stats for subscriber %s, %s' %(subscriber.name,subscriber.join_rx_stats))
+ self.test_status = True
+
+ def igmp_jump_verify(self, subscriber):
+ if subscriber.has_service('IGMP'):
+ for i in xrange(subscriber.num):
+ log.info('Subscriber %s jumping channel' %subscriber.name)
+ chan = subscriber.channel_jump(delay=0)
+ subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 1)
+ log.info('Verified receive for channel %d, subscriber %s' %(chan, subscriber.name))
+ time.sleep(3)
+ log.info('Join RX stats for subscriber %s, %s' %(subscriber.name, subscriber.join_rx_stats))
+ self.test_status = True
+
+ def igmp_next_verify(self, subscriber):
+ if subscriber.has_service('IGMP'):
+ for i in xrange(subscriber.num):
+ if i:
+ chan = subscriber.channel_join_next(delay=0)
+ else:
+ chan = subscriber.channel_join(i, delay=0)
+ log.info('Joined next channel %d for subscriber %s' %(chan, subscriber.name))
+ subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count=1)
+ log.info('Verified receive for channel %d, subscriber %s' %(chan, subscriber.name))
+ time.sleep(3)
+ log.info('Join Next RX stats for subscriber %s, %s' %(subscriber.name, subscriber.join_rx_stats))
+ self.test_status = True
+
+ def subscriber_load(self, create = True, num = 10, num_channels = 1, channel_start = 0):
'''Load the subscriber from the database'''
self.subscriber_db = SubscriberDB(create = create)
if create is True:
@@ -187,89 +283,52 @@
self.subscriber_info = self.subscriber_db.read(num)
self.subscriber_list = []
for info in self.subscriber_info:
- self.subscriber_list.append(Subscriber(info['Name'], info['Service']))
+ self.subscriber_list.append(Subscriber(name=info['Name'],
+ service=info['Service'],
+ num=num_channels,
+ channel_start = channel_start))
+ channel_start += num_channels
+
+ #load the ssm list for all subscriber channels
+ igmpChannel = IgmpChannel()
+ ssm_groups = map(lambda sub: sub.channels, self.subscriber_list)
+ ssm_list = reduce(lambda ssm1, ssm2: ssm1+ssm2, ssm_groups)
+ igmpChannel.igmp_load_ssm_config(ssm_list)
- def test_subscriber_join_recv( self, chan = 0):
- """Test 1 subscriber join and receive"""
+ def subscriber_join_verify( self, num_subscribers = 10, num_channels = 1,
+ channel_start = 0, cbs = None):
self.test_status = False
- self.num_subscribers = 5
- self.subscriber_load(create = True, num = self.num_subscribers)
+ self.num_subscribers = num_subscribers
+ self.subscriber_load(create = True, num = self.num_subscribers,
+ num_channels = num_channels, channel_start = channel_start)
self.onos_aaa_load()
+ self.thread_pool = ThreadPool(min(100, self.num_subscribers), queue_size=1, wait_timeout=1)
+ if cbs is None:
+ cbs = (self.tls_verify, self.dhcp_verify, self.igmp_verify)
for subscriber in self.subscriber_list:
- self.subscriber = subscriber
- self.subscriber.start()
- log.info('Testing subscriber %s for %s' %(subscriber.name, subscriber.service))
- if self.subscriber.has_service('TLS'):
- time.sleep(2)
- tls = TLSAuthTest()
- tls.runTest()
- self.test_status = True
- if self.subscriber.has_service('DHCP'):
- cip, sip = self.dhcp_request(iface = self.subscriber.iface)
- log.info('Got client ip %s from server %s' %(cip, sip))
- self.subscriber.src_list = [cip]
- self.test_status = True
- if self.subscriber.has_service('IGMP'):
- for i in range(5):
- log.info('Joining channel %d' %chan)
- self.subscriber.channel_join(chan, delay = 0)
- self.subscriber.channel_receive(chan, cb = self.recv_channel_cb, count = 1)
- log.info('Leaving channel %d' %chan)
- self.subscriber.channel_leave(chan)
- time.sleep(3)
- log.info('Join RX stats %s' %self.subscriber.join_rx_stats)
- self.subscriber.stop()
- ##Terminate the tests on success
- assert_equal(self.test_status, True)
+ subscriber.start()
+ pool_object = subscriber_pool(subscriber, cbs)
+ self.thread_pool.addTask(pool_object.pool_cb)
+ self.thread_pool.cleanUpThreads()
+ for subscriber in self.subscriber_list:
+ subscriber.stop()
+ return self.test_status
+ def test_subscriber_join_recv(self):
+ """Test subscriber join and receive"""
+ test_status = self.subscriber_join_verify(num_subscribers = 50, num_channels = 1)
+ assert_equal(test_status, True)
def test_subscriber_join_jump(self):
- """Test 1 subscriber join and receive"""
- self.test_status = False
- self.subscriber = Subscriber(50)
- self.subscriber.start()
- self.onos_aaa_load()
- #tls = TLSAuthTest()
- #tls.runTest()
- ##Next get dhcp
- cip, sip = self.dhcp_request(seed_ip = '10.10.200.1', iface = self.subscriber.iface)
- log.info('Got client ip %s from server %s' %(cip, sip))
- self.subscriber.src_list = [cip]
- for i in range(50):
- log.info('Jumping channel')
- chan = self.subscriber.channel_jump(delay=0)
- self.subscriber.channel_receive(chan, cb = self.recv_channel_cb, count = 1)
- log.info('Verified receive for channel %d' %chan)
- time.sleep(3)
-
- log.info('Join RX stats %s' %self.subscriber.join_rx_stats)
- self.subscriber.stop()
- ##Terminate the tests on success
- assert_equal(self.test_status, True)
+ """Test subscriber join and receive for channel surfing"""
+ test_status = self.subscriber_join_verify(num_subscribers = 5,
+ num_channels = 50,
+ cbs = (self.tls_verify, self.dhcp_jump_verify, self.igmp_jump_verify))
+ assert_equal(test_status, True)
def test_subscriber_join_next(self):
"""Test subscriber join next for channels"""
- self.test_status = False
- self.subscriber = Subscriber(10)
- self.subscriber.start()
- self.onos_aaa_load()
- #tls = TLSAuthTest()
- #tls.runTest()
- ##Next get dhcp
- cip, sip = self.dhcp_request(seed_ip = '10.10.150.1', iface = self.subscriber.iface)
- log.info('Got client ip %s from server %s' %(cip, sip))
- self.subscriber.src_list = [cip]
- for i in range(10):
- if i:
- chan = self.subscriber.channel_join_next(delay=0)
- else:
- chan = self.subscriber.channel_join(i, delay=0)
- log.info('Joined next channel %d' %chan)
- self.subscriber.channel_receive(chan, cb = self.recv_channel_cb, count=1)
- log.info('Verified receive for channel %d' %chan)
- time.sleep(3)
-
- log.info('Join Next RX stats %s' %self.subscriber.join_rx_stats)
- self.subscriber.stop()
- ##Terminate the tests on success
- assert_equal(self.test_status, True)
+ test_status = self.subscriber_join_verify(num_subscribers = 5,
+ num_channels = 50,
+ cbs = (self.tls_verify, self.dhcp_next_verify, self.igmp_next_verify))
+ assert_equal(test_status, True)
diff --git a/src/test/tls/tlsAuthTest.py b/src/test/tls/tlsAuthTest.py
index c239f38..5a30c49 100644
--- a/src/test/tls/tlsAuthTest.py
+++ b/src/test/tls/tlsAuthTest.py
@@ -11,8 +11,6 @@
def setUp(self):
self.onos_ctrl = OnosCtrl(self.app)
- self.onos_ctrl.deactivate()
- time.sleep(2)
self.onos_aaa_config()
def onos_aaa_config(self):
diff --git a/src/test/utils/Channels.py b/src/test/utils/Channels.py
index 3c4ff45..083abd0 100644
--- a/src/test/utils/Channels.py
+++ b/src/test/utils/Channels.py
@@ -21,20 +21,22 @@
IP_DST = '224.0.1.1'
igmp_eth = Ether(dst = IGMP_DST_MAC, src = IGMP_SRC_MAC, type = ETH_P_IP)
igmp_ip = IP(dst = IP_DST, src = IP_SRC)
+ ssm_list = []
- def __init__(self, iface = 'veth0', src_list = ['1.2.3.4'], delay = 2):
+ def __init__(self, iface = 'veth0', ssm_list = [], src_list = ['1.2.3.4'], delay = 2):
self.iface = iface
+ self.ssm_list += ssm_list
self.src_list = src_list
self.delay = delay
self.onos_ctrl = OnosCtrl('org.onosproject.igmp')
self.onos_ctrl.activate()
+
+ def igmp_load_ssm_config(self, ssm_list = []):
+ if not ssm_list:
+ ssm_list = self.ssm_list
+ self.ssm_table_load(ssm_list)
- def igmp_load_ssm_config(self, groups):
- self.ssm_table_load(groups)
-
- def igmp_join(self, groups, ssm_load = False):
- if ssm_load:
- self.igmp_load_ssm_config(groups)
+ def igmp_join(self, groups):
igmp = IGMPv3(type = IGMP_TYPE_V3_MEMBERSHIP_REPORT, max_resp_code=30,
gaddr='224.0.1.1')
for g in groups:
@@ -84,10 +86,12 @@
Started = 1
Idle = 0
Joined = 1
- def __init__(self, num, iface = 'veth0', iface_mcast = 'veth2', mcast_cb = None):
+ def __init__(self, num, channel_start = 0, iface = 'veth0', iface_mcast = 'veth2', mcast_cb = None):
self.num = num
- self.channels = self.generate(self.num)
+ self.channel_start = channel_start
+ self.channels = self.generate(self.num, self.channel_start)
self.group_channel_map = {}
+ #assert_equal(len(self.channels), self.num)
for i in range(self.num):
self.group_channel_map[self.channels[i]] = i
self.state = self.Stopped
@@ -99,18 +103,24 @@
self.mcast_cb = mcast_cb
for c in range(self.num):
self.channel_states[c] = [self.Idle]
-
- IgmpChannel.__init__(self, iface=iface)
-
- def generate(self, num):
- start = (224 << 24) | 1
- end = start + num + num/256
+ IgmpChannel.__init__(self, ssm_list = self.channels, iface=iface)
+
+ def generate(self, num, channel_start = 0):
+ start = (224 << 24) | ( ( (channel_start >> 16) & 0xff) << 16 ) | \
+ ( ( (channel_start >> 8) & 0xff ) << 8 ) | (channel_start) & 0xff
+ start += channel_start/256 + 1
+ end = start + num
group_addrs = []
- for i in range(start, end):
- if i&255:
- g = '%s.%s.%s.%s' %((i>>24) &0xff, (i>>16)&0xff, (i>>8)&0xff, i&0xff)
- log.debug('Adding group %s' %g)
- group_addrs.append(g)
+ count = 0
+ while count != num:
+ for i in range(start, end):
+ if i&255:
+ g = '%s.%s.%s.%s' %((i>>24) &0xff, (i>>16)&0xff, (i>>8)&0xff, i&0xff)
+ log.debug('Adding group %s' %g)
+ group_addrs.append(g)
+ count += 1
+ start = end
+ end = start + 1
return group_addrs
def start(self):
@@ -132,8 +142,6 @@
return chan, 0
groups = [self.channels[chan]]
- #load the ssm table first
- self.igmp_load_ssm_config(groups)
join_start = monotonic.monotonic()
self.igmp_join(groups)
self.set_state(chan, self.Joined)
@@ -228,8 +236,15 @@
self.channel_states[chan][0] = state
if __name__ == '__main__':
- num = 2
- channels = Channels(num)
+ num = 5
+ start = 0
+ ssm_list = []
+ for i in xrange(2):
+ channels = Channels(num, start)
+ ssm_list += channels.channels
+ start += num
+ igmpChannel = IgmpChannel()
+ igmpChannel.igmp_load_ssm_config(ssm_list)
channels.start()
for i in range(num):
channels.join(i)
diff --git a/src/test/utils/DHCP.py b/src/test/utils/DHCP.py
index 64ae753..8dec80c 100644
--- a/src/test/utils/DHCP.py
+++ b/src/test/utils/DHCP.py
@@ -40,6 +40,7 @@
print("Failed to acquire IP via DHCP for %s on interface %s" %(mac, self.iface))
return (None, None)
+ subnet_mask = "0.0.0.0"
for x in resp.lastlayer().options:
if(x == 'end'):
break
@@ -52,7 +53,7 @@
L5 = BOOTP(chaddr=chmac, yiaddr=srcIP)
L6 = DHCP(options=[("message-type","request"), ("server_id",server_id),
("subnet_mask",subnet_mask), ("requested_addr",srcIP), "end"])
- srp1(L2/L3/L4/L5/L6, filter="udp and port 68", timeout=5, iface=self.iface)
+ srp(L2/L3/L4/L5/L6, filter="udp and port 68", timeout=5, iface=self.iface)
self.mac_map[mac] = (srcIP, serverIP)
self.mac_inverse_map[srcIP] = (mac, serverIP)
return (srcIP, serverIP)
diff --git a/src/test/utils/threadPool.py b/src/test/utils/threadPool.py
new file mode 100644
index 0000000..bd3e21e
--- /dev/null
+++ b/src/test/utils/threadPool.py
@@ -0,0 +1,80 @@
+import threading
+import Queue
+
+class PoolThread(threading.Thread):
+
+ def __init__(self, requests_queue, wait_timeout, daemon, **kwds):
+ threading.Thread.__init__(self, **kwds)
+ self.daemon = daemon
+ self._queue = requests_queue
+ self._wait_timeout = wait_timeout
+ self._finished = threading.Event()
+ self.start()
+
+ def run(self):
+ while True:
+ if(self._finished.isSet()):
+ break
+
+ try:
+ work = self._queue.get(block=True, timeout=self._wait_timeout)
+ except Queue.Empty:
+ continue
+ else:
+ try:
+ work.__call__()
+ finally:
+ self._queue.task_done()
+
+
+
+class ThreadPool:
+
+ def __init__(self, pool_size, daemon=False, queue_size=0, wait_timeout=5):
+ """Set up the thread pool and create pool_size threads
+ """
+ self._queue = Queue.Queue(queue_size)
+ self._daemon = daemon
+ self._threads = []
+ self._pool_size = pool_size
+ self._wait_timeout = wait_timeout
+ self.createThreads()
+
+
+ def addTask(self, callableObject):
+ if (callable(callableObject)):
+ self._queue.put(callableObject, block=True)
+
+ def cleanUpThreads(self):
+ self._queue.join()
+
+ for t in self._threads:
+ t._finished.set()
+
+
+ def createThreads(self):
+ for i in range(self._pool_size):
+ self._threads.append(PoolThread(self._queue, self._wait_timeout, self._daemon))
+
+
+class CallObject:
+ def __init__(self, v = 0):
+ self.v = v
+ def callCb(self):
+ print 'Inside callback for %d' %self.v
+
+if __name__ == '__main__':
+ import multiprocessing
+ callList = []
+ cpu_count = multiprocessing.cpu_count()
+ for i in xrange(cpu_count * 2):
+ callList.append(CallObject(i))
+ tp = ThreadPool(cpu_count * 2, queue_size=1, wait_timeout=1)
+ for i in range(40):
+ callObject = callList[i% (cpu_count*2)]
+ f = callObject.callCb
+ tp.addTask(f)
+
+ tp.cleanUpThreads()
+
+