Adding subscriber channel surfing experience.
This includes support to test multiple subscribers in parallel on multiple channels.
Various other changes to igmp, dhcp and Channels interface.
diff --git a/src/test/utils/Channels.py b/src/test/utils/Channels.py
index 3c4ff45..083abd0 100644
--- a/src/test/utils/Channels.py
+++ b/src/test/utils/Channels.py
@@ -21,20 +21,22 @@
IP_DST = '224.0.1.1'
igmp_eth = Ether(dst = IGMP_DST_MAC, src = IGMP_SRC_MAC, type = ETH_P_IP)
igmp_ip = IP(dst = IP_DST, src = IP_SRC)
+ ssm_list = []
- def __init__(self, iface = 'veth0', src_list = ['1.2.3.4'], delay = 2):
+ def __init__(self, iface = 'veth0', ssm_list = [], src_list = ['1.2.3.4'], delay = 2):
self.iface = iface
+ self.ssm_list += ssm_list
self.src_list = src_list
self.delay = delay
self.onos_ctrl = OnosCtrl('org.onosproject.igmp')
self.onos_ctrl.activate()
+
+ def igmp_load_ssm_config(self, ssm_list = []):
+ if not ssm_list:
+ ssm_list = self.ssm_list
+ self.ssm_table_load(ssm_list)
- def igmp_load_ssm_config(self, groups):
- self.ssm_table_load(groups)
-
- def igmp_join(self, groups, ssm_load = False):
- if ssm_load:
- self.igmp_load_ssm_config(groups)
+ def igmp_join(self, groups):
igmp = IGMPv3(type = IGMP_TYPE_V3_MEMBERSHIP_REPORT, max_resp_code=30,
gaddr='224.0.1.1')
for g in groups:
@@ -84,10 +86,12 @@
Started = 1
Idle = 0
Joined = 1
- def __init__(self, num, iface = 'veth0', iface_mcast = 'veth2', mcast_cb = None):
+ def __init__(self, num, channel_start = 0, iface = 'veth0', iface_mcast = 'veth2', mcast_cb = None):
self.num = num
- self.channels = self.generate(self.num)
+ self.channel_start = channel_start
+ self.channels = self.generate(self.num, self.channel_start)
self.group_channel_map = {}
+ #assert_equal(len(self.channels), self.num)
for i in range(self.num):
self.group_channel_map[self.channels[i]] = i
self.state = self.Stopped
@@ -99,18 +103,24 @@
self.mcast_cb = mcast_cb
for c in range(self.num):
self.channel_states[c] = [self.Idle]
-
- IgmpChannel.__init__(self, iface=iface)
-
- def generate(self, num):
- start = (224 << 24) | 1
- end = start + num + num/256
+ IgmpChannel.__init__(self, ssm_list = self.channels, iface=iface)
+
+ def generate(self, num, channel_start = 0):
+ start = (224 << 24) | ( ( (channel_start >> 16) & 0xff) << 16 ) | \
+ ( ( (channel_start >> 8) & 0xff ) << 8 ) | (channel_start) & 0xff
+ start += channel_start/256 + 1
+ end = start + num
group_addrs = []
- for i in range(start, end):
- if i&255:
- g = '%s.%s.%s.%s' %((i>>24) &0xff, (i>>16)&0xff, (i>>8)&0xff, i&0xff)
- log.debug('Adding group %s' %g)
- group_addrs.append(g)
+ count = 0
+ while count != num:
+ for i in range(start, end):
+ if i&255:
+ g = '%s.%s.%s.%s' %((i>>24) &0xff, (i>>16)&0xff, (i>>8)&0xff, i&0xff)
+ log.debug('Adding group %s' %g)
+ group_addrs.append(g)
+ count += 1
+ start = end
+ end = start + 1
return group_addrs
def start(self):
@@ -132,8 +142,6 @@
return chan, 0
groups = [self.channels[chan]]
- #load the ssm table first
- self.igmp_load_ssm_config(groups)
join_start = monotonic.monotonic()
self.igmp_join(groups)
self.set_state(chan, self.Joined)
@@ -228,8 +236,15 @@
self.channel_states[chan][0] = state
if __name__ == '__main__':
- num = 2
- channels = Channels(num)
+ num = 5
+ start = 0
+ ssm_list = []
+ for i in xrange(2):
+ channels = Channels(num, start)
+ ssm_list += channels.channels
+ start += num
+ igmpChannel = IgmpChannel()
+ igmpChannel.igmp_load_ssm_config(ssm_list)
channels.start()
for i in range(num):
channels.join(i)
diff --git a/src/test/utils/DHCP.py b/src/test/utils/DHCP.py
index 64ae753..8dec80c 100644
--- a/src/test/utils/DHCP.py
+++ b/src/test/utils/DHCP.py
@@ -40,6 +40,7 @@
print("Failed to acquire IP via DHCP for %s on interface %s" %(mac, self.iface))
return (None, None)
+ subnet_mask = "0.0.0.0"
for x in resp.lastlayer().options:
if(x == 'end'):
break
@@ -52,7 +53,7 @@
L5 = BOOTP(chaddr=chmac, yiaddr=srcIP)
L6 = DHCP(options=[("message-type","request"), ("server_id",server_id),
("subnet_mask",subnet_mask), ("requested_addr",srcIP), "end"])
- srp1(L2/L3/L4/L5/L6, filter="udp and port 68", timeout=5, iface=self.iface)
+ srp(L2/L3/L4/L5/L6, filter="udp and port 68", timeout=5, iface=self.iface)
self.mac_map[mac] = (srcIP, serverIP)
self.mac_inverse_map[srcIP] = (mac, serverIP)
return (srcIP, serverIP)
diff --git a/src/test/utils/threadPool.py b/src/test/utils/threadPool.py
new file mode 100644
index 0000000..bd3e21e
--- /dev/null
+++ b/src/test/utils/threadPool.py
@@ -0,0 +1,80 @@
+import threading
+import Queue
+
+class PoolThread(threading.Thread):
+
+ def __init__(self, requests_queue, wait_timeout, daemon, **kwds):
+ threading.Thread.__init__(self, **kwds)
+ self.daemon = daemon
+ self._queue = requests_queue
+ self._wait_timeout = wait_timeout
+ self._finished = threading.Event()
+ self.start()
+
+ def run(self):
+ while True:
+ if(self._finished.isSet()):
+ break
+
+ try:
+ work = self._queue.get(block=True, timeout=self._wait_timeout)
+ except Queue.Empty:
+ continue
+ else:
+ try:
+ work.__call__()
+ finally:
+ self._queue.task_done()
+
+
+
+class ThreadPool:
+
+ def __init__(self, pool_size, daemon=False, queue_size=0, wait_timeout=5):
+ """Set up the thread pool and create pool_size threads
+ """
+ self._queue = Queue.Queue(queue_size)
+ self._daemon = daemon
+ self._threads = []
+ self._pool_size = pool_size
+ self._wait_timeout = wait_timeout
+ self.createThreads()
+
+
+ def addTask(self, callableObject):
+ if (callable(callableObject)):
+ self._queue.put(callableObject, block=True)
+
+ def cleanUpThreads(self):
+ self._queue.join()
+
+ for t in self._threads:
+ t._finished.set()
+
+
+ def createThreads(self):
+ for i in range(self._pool_size):
+ self._threads.append(PoolThread(self._queue, self._wait_timeout, self._daemon))
+
+
+class CallObject:
+ def __init__(self, v = 0):
+ self.v = v
+ def callCb(self):
+ print 'Inside callback for %d' %self.v
+
+if __name__ == '__main__':
+ import multiprocessing
+ callList = []
+ cpu_count = multiprocessing.cpu_count()
+ for i in xrange(cpu_count * 2):
+ callList.append(CallObject(i))
+ tp = ThreadPool(cpu_count * 2, queue_size=1, wait_timeout=1)
+ for i in range(40):
+ callObject = callList[i% (cpu_count*2)]
+ f = callObject.callCb
+ tp.addTask(f)
+
+ tp.cleanUpThreads()
+
+