Test-Voltha:
Adding util functions and test cases for IGMP
channel surfing.
Change-Id: I8b0fce427509f9de09981a8b58bd5807557afbb0
diff --git a/src/test/utils/Channels.py b/src/test/utils/Channels.py
index b9363f9..c2599a7 100644
--- a/src/test/utils/Channels.py
+++ b/src/test/utils/Channels.py
@@ -41,26 +41,32 @@
igmp_ip = IP(dst = IP_DST, src = IP_SRC)
ssm_list = []
- def __init__(self, iface = 'veth0', ssm_list = [], src_list = ['1.2.3.4'], delay = 2,controller=None):
+ def __init__(self, iface = 'veth0', ssm_list = [], src_list = None, delay = 2,controller=None):
+
self.controller=controller
self.iface = iface
self.ssm_list += ssm_list
- self.src_list = src_list
+ if src_list is None:
+ self.src_list = ['1.2.3.4']
+ else:
+ self.src_list = src_list
self.delay = delay
self.onos_ctrl = OnosCtrl('org.opencord.igmp',controller=self.controller)
self.onos_ctrl.activate()
- def igmp_load_ssm_config(self, ssm_list = []):
+ def igmp_load_ssm_config(self, ssm_list = [], src_list = None):
if not ssm_list:
ssm_list = self.ssm_list
- self.ssm_table_load(ssm_list)
+ self.ssm_table_load(ssm_list, src_list = src_list)
- def igmp_join(self, groups):
+ def igmp_join(self, groups, src_list = None, record_type = None):
+ if record_type is None:
+ record_type = IGMP_V3_GR_TYPE_INCLUDE
igmp = IGMPv3(type = IGMP_TYPE_V3_MEMBERSHIP_REPORT, max_resp_code=30,
gaddr='224.0.1.1')
for g in groups:
- gr = IGMPv3gr(rtype=IGMP_V3_GR_TYPE_INCLUDE, mcaddr=g)
- gr.sources = self.src_list
+ gr = IGMPv3gr(rtype=record_type, mcaddr=g)
+ gr.sources = src_list
igmp.grps.append(gr)
pkt = self.igmp_eth/self.igmp_ip/igmp
@@ -69,12 +75,12 @@
if self.delay != 0:
time.sleep(self.delay)
- def igmp_leave(self, groups):
+ def igmp_leave(self, groups, src_list = None):
igmp = IGMPv3(type = IGMP_TYPE_V3_MEMBERSHIP_REPORT, max_resp_code=30,
gaddr='224.0.1.1')
for g in groups:
gr = IGMPv3gr(rtype=IGMP_V3_GR_TYPE_EXCLUDE, mcaddr=g)
- gr.sources = self.src_list
+ gr.sources = src_list
igmp.grps.append(gr)
pkt = self.igmp_eth/self.igmp_ip/igmp
@@ -89,12 +95,11 @@
log_test.info('JSON config request returned status %d' %code)
time.sleep(2)
- def ssm_table_load(self, groups):
- return
+ def ssm_table_load(self, groups, src_list = None):
ssm_dict = {'apps' : { 'org.opencord.igmp' : { 'ssmTranslate' : [] } } }
ssm_xlate_list = ssm_dict['apps']['org.opencord.igmp']['ssmTranslate']
for g in groups:
- for s in self.src_list:
+ for s in src_list:
d = {}
d['source'] = s
d['group'] = g
@@ -102,7 +107,6 @@
self.onos_load_config(ssm_dict)
def cord_port_table_load(self, cord_port_map):
- return
cord_group_dict = {'apps' : { 'org.ciena.cordigmp' : { 'cordIgmpTranslate' : [] } } }
cord_group_xlate_list = cord_group_dict['apps']['org.ciena.cordigmp']['cordIgmpTranslate']
for group, ports in cord_port_map.items():
@@ -118,7 +122,7 @@
Started = 1
Idle = 0
Joined = 1
- def __init__(self, num, channel_start = 0, iface = 'veth0', iface_mcast = 'veth2', mcast_cb = None):
+ def __init__(self, num, channel_start = 0, iface = 'veth0', iface_mcast = 'veth2', mcast_cb = None, src_list = None):
self.num = num
self.channel_start = channel_start
self.channels = self.generate(self.num, self.channel_start)
@@ -132,9 +136,10 @@
self.last_chan = None
self.iface_mcast = iface_mcast
self.mcast_cb = mcast_cb
+ self.src_list = src_list
for c in range(self.num):
self.channel_states[c] = [self.Idle]
- IgmpChannel.__init__(self, ssm_list = self.channels, iface=iface)
+ IgmpChannel.__init__(self, ssm_list = self.channels, iface=iface, src_list = src_list)
def generate(self, num, channel_start = 0):
start = (225 << 24) | ( ( (channel_start >> 16) & 0xff) << 16 ) | \
@@ -162,7 +167,8 @@
self.streams.start()
self.state = self.Started
- def join(self, chan = None):
+ def join(self, chan = None, src_list = None, record_type = None):
+ #def join(self, chan = None):
if chan is None:
chan = random.randint(0, self.num)
else:
@@ -171,15 +177,14 @@
if self.get_state(chan) == self.Joined:
return chan, 0
-
groups = [self.channels[chan]]
join_start = monotonic.monotonic()
- self.igmp_join(groups)
+ self.igmp_join(groups, src_list = src_list, record_type = record_type)
self.set_state(chan, self.Joined)
self.last_chan = chan
return chan, join_start
- def leave(self, chan, force = False):
+ def leave(self, chan, force = False, src_list = None):
if chan is None:
chan = self.last_chan
if chan is None or chan >= self.num:
@@ -187,7 +192,7 @@
if force is False and self.get_state(chan) != self.Joined:
return False
groups = [self.channels[chan]]
- self.igmp_leave(groups)
+ self.igmp_leave(groups, src_list = src_list)
self.set_state(chan, self.Idle)
if chan == self.last_chan:
self.last_chan = None
@@ -199,7 +204,7 @@
if chan is None:
return None
leave = chan
- join = chan+1
+ join = chan+1
else:
leave = chan - 1
join = chan
@@ -237,14 +242,20 @@
return self.group_channel_map[group]
return None
- def recv_cb(self, pkt):
+ def recv_cb(self, pkt, src_list = None):
'''Default channel receive callback'''
log_test.debug('Received packet from source %s, destination %s' %(pkt[IP].src, pkt[IP].dst))
- send_time = float(pkt[IP].payload.load)
- recv_time = monotonic.monotonic()
- log_test.debug('Packet received in %.3f usecs' %(recv_time - send_time))
+ if src_list is None:
+ send_time = float(pkt[IP].payload.load)
+ recv_time = monotonic.monotonic()
+ log_test.debug('Packet received in %.3f usecs' %(recv_time - send_time))
+ elif(pkt[IP].src == src_list[0]):
+ log_test.debug('Received packet from specified source %s, destination %s' %(pkt[IP].src, pkt[IP].dst))
+ elif(pkt[IP].src != src_list[0]):
+ log_test.debug('Received packet not from specified source %s, destination %s' %(pkt[IP].src, pkt[IP].dst))
+ time.sleep(60)
- def recv(self, chan, cb = None, count = 1, timeout = 5):
+ def recv(self, chan, cb = None, count = 1, timeout = 5, src_list = None):
if chan is None:
return None
if type(chan) == type([]) or type(chan) == type(()):
@@ -253,7 +264,7 @@
else:
groups = (self.gaddr(chan),)
if cb is None:
- cb = self.recv_cb
+ cb = self.recv_cb(src_list = src_list)
return sniff(prn = cb, count=count, timeout = timeout,
lfilter = lambda p: IP in p and p[IP].dst in groups, iface = bytes(self.iface[:15]))
@@ -273,11 +284,11 @@
start = 0
ssm_list = []
for i in xrange(2):
- channels = Channels(num, start)
+ channels = Channels(num, start, src_list = src_list)
ssm_list += channels.channels
start += num
- igmpChannel = IgmpChannel()
- igmpChannel.igmp_load_ssm_config(ssm_list)
+ igmpChannel = IgmpChannel(src_list = src_list)
+ igmpChannel.igmp_load_ssm_config(ssm_list, src_list)
channels.start()
for i in range(num):
channels.join(i)
diff --git a/src/test/voltha/volthaTest.py b/src/test/voltha/volthaTest.py
index 6844e0d..191af71 100644
--- a/src/test/voltha/volthaTest.py
+++ b/src/test/voltha/volthaTest.py
@@ -5,6 +5,7 @@
import json
import requests
import threading
+from IGMP import *
from random import randint
from threading import Timer
from threadPool import ThreadPool
@@ -18,6 +19,8 @@
from portmaps import g_subscriber_port_map
from OltConfig import *
from EapTLS import TLSAuthTest
+from Channels import Channels, IgmpChannel
+from Stats import Stats
from DHCP import DHCPTest
from OnosCtrl import OnosCtrl
from CordLogger import CordLogger
@@ -28,21 +31,29 @@
from CordContainer import Onos
-class multicast_channels:
+class Voltha_olt_subscribers(Channels):
- def __init__(self, port_list):
+ STATS_JOIN = 2
+ STATS_LEAVE = 3
+
+
+ def __init__(self, tx_port, rx_port, num_channels =1, channel_start = 0, src_list = None):
+ self.tx_port = tx_port
+ self.rx_port = rx_port
+ self.src_list = src_list
try:
- self.tx_intf = self.port_map['ports'][port_list[0][0]]
- self.rx_intf = self.port_map['ports'][port_list[0][1]]
+ self.tx_intf = tx_port
+ self.rx_intf = rx_port
except:
self.tx_intf = self.INTF_TX_DEFAULT
self.rx_intf = self.INTF_RX_DEFAULT
- channel_start = 0
+# num = 1
+# channel_start = 0
mcast_cb = None
- Channels.__init__(self, num, channel_start = channel_start,
+ Channels.__init__(self, num_channels, channel_start = channel_start, src_list = src_list,
iface = self.rx_intf, iface_mcast = self.tx_intf, mcast_cb = mcast_cb)
- self.loginType = loginType
+ self.loginType = 'wireless'
##start streaming channels
self.join_map = {}
##accumulated join recv stats
@@ -54,12 +65,13 @@
self.join_map[chan] = ( Stats(), Stats(), Stats(), Stats() )
self.channel_update(chan, self.STATS_JOIN, 1, t = join_time)
- def channel_join(self, chan = 0, delay = 2):
+ def channel_join(self, chan = 0, delay = 2, src_list = None, record_type = None):
'''Join a channel and create a send/recv stats map'''
if self.join_map.has_key(chan):
del self.join_map[chan]
self.delay = delay
- chan, join_time = self.join(chan)
+ chan, join_time = self.join(chan, src_list = src_list, record_type = record_type)
+ #chan, join_time = self.join(chan)
self.channel_join_update(chan, join_time)
return chan
@@ -83,10 +95,10 @@
self.channel_join_update(chan, join_time)
return chan
- def channel_leave(self, chan = 0, force = False):
+ def channel_leave(self, chan = 0, force = False, src_list = None):
if self.join_map.has_key(chan):
del self.join_map[chan]
- self.leave(chan, force = force)
+ self.leave(chan, force = force, src_list = src_list)
def channel_update(self, chan, stats_type, packets, t=0):
if type(chan) == type(0):
@@ -97,22 +109,25 @@
if self.join_map.has_key(c):
self.join_map[c][stats_type].update(packets = packets, t = t)
- def channel_receive(self, chan, cb = None, count = 1, timeout = 5):
- log_test.info('Subscriber %s on port %s receiving from group %s, channel %d' %
- (self.name, self.rx_intf, self.gaddr(chan), chan))
- r = self.recv(chan, cb = cb, count = count, timeout = timeout)
+ def channel_receive(self, chan, cb = None, count = 1, timeout = 5, src_list = None):
+ log_test.info('Subscriber on port %s receiving from group %s, channel %d' %
+ (self.rx_intf, self.gaddr(chan), chan))
+ r = self.recv(chan, cb = cb, count = count, timeout = timeout, src_list = src_list)
if len(r) == 0:
- log_test.info('Subscriber %s on port %s timed out' %(self.name, self.rx_intf))
+ log_test.info('Subscriber on port %s timed out' %( self.rx_intf))
+ self.test_status = False
else:
- log_test.info('Subscriber %s on port %s received %d packets' %(self.name, self.rx_intf, len(r)))
+ log_test.info('Subscriber on port %s received %d packets' %(self.rx_intf, len(r)))
if self.recv_timeout:
##Negative test case is disabled for now
assert_equal(len(r), 0)
+ return self.test_status
- def recv_channel_cb(self, pkt):
+ def recv_channel_cb(self, pkt, src_list = None):
+
##First verify that we have received the packet for the joined instance
- log_test.info('Packet received for group %s, subscriber %s, port %s' %
- (pkt[IP].dst, self.name, self.rx_intf))
+ log_test.info('Packet received for group %s, subscriber, port %s' %
+ (pkt[IP].dst, self.rx_intf))
if self.recv_timeout:
return
chan = self.caddr(pkt[IP].dst)
@@ -124,7 +139,6 @@
self.channel_update(chan, self.STATS_RX, 1, t = delta)
log_test.debug('Packet received in %.3f usecs for group %s after join' %(delta, pkt[IP].dst))
-
class voltha_subscriber_pool:
def __init__(self, subscriber, test_cbs):
@@ -134,7 +148,7 @@
def pool_cb(self):
for cb in self.test_cbs:
if cb:
- self.test_status = cb(self.subscriber)
+ self.test_status = cb(self.subscriber, multiple_sub = True)
if self.test_status is not True:
## This is chaning for other sub status has to check again
self.test_status = True
@@ -143,7 +157,6 @@
log_test.info('This Subscriber is tested for multiple service eligibility ')
self.test_status = True
-
class voltha_exchange(unittest.TestCase):
OLT_TYPE = 'tibit_olt'
@@ -168,6 +181,8 @@
olt_conf_file = os.getenv('OLT_CONFIG_FILE', os.path.join(test_path, '..', 'setup/olt_config.json'))
onos_restartable = bool(int(os.getenv('ONOS_RESTART', 0)))
VOLTHA_AUTO_CONFIGURE = False
+ num_joins = 0
+
VOLTHA_ENABLED = True
INTF_TX_DEFAULT = 'veth2'
INTF_RX_DEFAULT = 'veth0'
@@ -429,11 +444,26 @@
ip_range.append(".".join(map(str, temp)))
return random.choice(ip_range)
- def tls_flow_check(self, olt_ports, cert_info = None):
- if len(olt_ports[0]) >= 2:
- olt_nni_port = olt_ports[0]
- olt_uni_port = olt_ports[1]
- elif len(olt_ports[0]) == 1:
+ def random_mcast_ip(self,start_ip = '224.0.1.0', end_ip = '224.0.1.100'):
+ start = list(map(int, start_ip.split(".")))
+ end = list(map(int, end_ip.split(".")))
+ temp = start
+ ip_range = []
+ ip_range.append(start_ip)
+ while temp != end:
+ start[3] += 1
+ for i in (3, 2, 1):
+ if temp[i] == 255:
+ temp[i] = 0
+ temp[i-1] += 1
+ ip_range.append(".".join(map(str, temp)))
+ return random.choice(ip_range)
+
+ def tls_flow_check(self, olt_ports, cert_info = None, multiple_sub = False):
+ if multiple_sub is True:
+ olt_nni_port = olt_ports.tx_port
+ olt_uni_port = olt_ports.rx_port
+ else:
olt_uni_port = olt_ports
def tls_fail_cb():
@@ -477,16 +507,16 @@
self.test_status = True
return self.test_status
- def dhcp_flow_check(self, olt_ports =None, negative_test = None):
- if len(olt_ports[0]) >= 2:
- olt_nni_port = olt_ports[0]
- onu_iface = olt_ports[1]
+ def dhcp_flow_check(self, olt_ports, negative_test = None, multiple_sub = False):
+ if multiple_sub is True:
+ olt_nni_port = olt_ports.tx_port
+ onu_iface = olt_ports.rx_port
dhcp_server_startip = self.random_ip()
random_mac = '00:00:00:0a:0a:' + hex(random.randrange(50,254)).split('x')[1]
- elif len(olt_ports[0]) == 1:
- onu_iface = olt_ports
- dhcp_server_startip = '10.10.10.20'
- random_mac = None
+ else:
+ onu_iface = olt_ports
+ dhcp_server_startip = '10.10.10.20'
+ random_mac = None
self.success = True
if negative_test is None:
@@ -522,6 +552,7 @@
assert_equal(cip,None)
log_test.info('ONOS dhcp server rejected client discover with invalid source mac as expected')
self.test_status = True
+
if negative_test == "invalid_src_mac_multicast":
config = {'startip':'10.10.10.20', 'endip':'10.10.10.69',
'ip':'10.10.10.2', 'mac': "ca:fe:ca:fe:ca:fe",
@@ -568,6 +599,7 @@
log_test.info('Test done. Releasing ip %s to server %s' %(cip2, sip2))
assert_equal(self.dhcp.release(cip2), True)
self.test_status = True
+
if negative_test == "starvation_positive":
config = {'startip':'193.170.1.20', 'endip':'193.170.1.69',
'ip':'193.170.1.2', 'mac': "ca:fe:c2:fe:cc:fe",
@@ -583,6 +615,7 @@
assert_equal(False, ip_map.has_key(cip))
ip_map[cip] = sip
self.test_status = True
+
if negative_test == "starvation_negative":
config = {'startip':'182.17.0.20', 'endip':'182.17.0.69',
'ip':'182.17.0.2', 'mac': "ca:fe:c3:fe:ca:fe",
@@ -601,6 +634,7 @@
assert_equal(sip, None)
self.test_status = True
self.success = True
+
if negative_test == "multiple_discover":
config = {'startip':'10.10.10.20', 'endip':'10.10.10.69',
'ip':'10.10.10.2', 'mac': "ca:fe:ca:fe:ca:fe",
@@ -718,91 +752,203 @@
# self.success = True
return self.test_status
- def recv_channel_cb(self, pkt):
- ##First verify that we have received the packet for the joined instance
- chan = self.subscriber.caddr(pkt[IP].dst)
- assert_equal(chan in self.subscriber.join_map.keys(), True)
- recv_time = monotonic.monotonic() * 1000000
- join_time = self.subscriber.join_map[chan][self.subscriber.STATS_JOIN].start
- delta = recv_time - join_time
- self.subscriber.join_rx_stats.update(packets=1, t = delta, usecs = True)
- self.subscriber.channel_update(chan, self.subscriber.STATS_RX, 1, t = delta)
- log_test.debug('Packet received in %.3f usecs for group %s after join' %(delta, pkt[IP].dst))
- self.test_status = True
+ def recv_channel_cb(self, pkt):
+ ##First verify that we have received the packet for the joined instance
+ chan = self.subscriber.caddr(pkt[IP].dst)
+ assert_equal(chan in self.subscriber.join_map.keys(), True)
+ recv_time = monotonic.monotonic() * 1000000
+ join_time = self.subscriber.join_map[chan][self.subscriber.STATS_JOIN].start
+ delta = recv_time - join_time
+ self.subscriber.join_rx_stats.update(packets=1, t = delta, usecs = True)
+ self.subscriber.channel_update(chan, self.subscriber.STATS_RX, 1, t = delta)
+ log_test.debug('Packet received in %.3f usecs for group %s after join' %(delta, pkt[IP].dst))
+ self.test_status = True
- def traffic_verify(self, subscriber):
- # if subscriber.has_service('TRAFFIC'):
- url = 'http://www.google.com'
- resp = requests.get(url)
- self.test_status = resp.ok
- if resp.ok == False:
- log_test.info('Subscriber %s failed get from url %s with status code %d'
- %(subscriber.name, url, resp.status_code))
- else:
- log_test.info('GET request from %s succeeded for subscriber %s'
- %(url, subscriber.name))
- return self.test_status
+ def traffic_verify(self, subscriber):
+ # if subscriber.has_service('TRAFFIC'):
+ url = 'http://www.google.com'
+ resp = requests.get(url)
+ self.test_status = resp.ok
+ if resp.ok == False:
+ log_test.info('Subscriber %s failed get from url %s with status code %d'
+ %(subscriber.name, url, resp.status_code))
+ else:
+ log_test.info('GET request from %s succeeded for subscriber %s'
+ %(url, subscriber.name))
+ return self.test_status
- def voltha_igmp_verify(self, subscriber):
- chan = 0
- #if subscriber.has_service('IGMP'):
- ##We wait for all the subscribers to join before triggering leaves
- if subscriber.rx_port > 1:
- time.sleep(5)
- subscriber.channel_join(chan, delay = 0)
- self.num_joins += 1
- while self.num_joins < self.num_subscribers:
- time.sleep(5)
- log_test.info('All subscribers have joined the channel')
- for i in range(10):
- subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 10)
- log_test.info('Leaving channel %d for subscriber %s' %(chan, subscriber.name))
- subscriber.channel_leave(chan)
- time.sleep(5)
- log_test.info('Interface %s Join RX stats for subscriber %s, %s' %(subscriber.iface, subscriber.name,subscriber.join_rx_stats))
- #Should not receive packets for this subscriber
- self.recv_timeout = True
- subscriber.recv_timeout = True
- subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 10)
- subscriber.recv_timeout = False
- self.recv_timeout = False
- log_test.info('Joining channel %d for subscriber %s' %(chan, subscriber.name))
- subscriber.channel_join(chan, delay = 0)
- self.test_status = True
- return self.test_status
+ def igmp_flow_check(self, subscriber, multiple_sub = False):
+ chan = 0
+ subscriber.channel_join(chan, delay = 0, src_list = subscriber.src_list)
+ self.num_joins += 1
+ while self.num_joins < self.num_subscribers:
+ time.sleep(5)
+ log_test.info('All subscribers have joined the channel')
+ for i in range(10):
+ self.test_status = subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 10, src_list = subscriber.src_list)
+ log_test.info('Leaving channel %d for subscriber on port %s' %(chan, subscriber.rx_port))
+ subscriber.channel_leave(chan, src_list = subscriber.src_list)
+ time.sleep(5)
+ log_test.info('Interface %s Join RX stats for subscriber, %s' %(subscriber.iface,subscriber.join_rx_stats))
+ #Should not receive packets for this subscriber
+ self.recv_timeout = True
+ subscriber.recv_timeout = True
+ subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 10, src_list = subscriber.src_list)
+ subscriber.recv_timeout = False
+ self.recv_timeout = False
+ log_test.info('Joining channel %d for subscriber port %s' %(chan, subscriber.rx_port))
+ subscriber.channel_join(chan, delay = 0, src_list = subscriber.src_list)
+# self.test_status = True
+ return self.test_status
- def voltha_igmp_jump_verify(self, subscriber):
- if subscriber.has_service('IGMP'):
- for i in xrange(subscriber.num):
- log_test.info('Subscriber %s jumping channel' %subscriber.name)
- chan = subscriber.channel_jump(delay=0)
- subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 1)
- log_test.info('Verified receive for channel %d, subscriber %s' %(chan, subscriber.name))
- time.sleep(3)
- log_test.info('Interface %s Jump RX stats for subscriber %s, %s' %(subscriber.iface, subscriber.name, subscriber.join_rx_stats))
- self.test_status = True
- return self.test_status
+ def igmp_flow_check_join_change_to_exclude(self, subscriber, multiple_sub = False):
+ chan = 0
+ subscriber.channel_join(chan, delay = 0, src_list = subscriber.src_list)
+ self.num_joins += 1
+ while self.num_joins < self.num_subscribers:
+ time.sleep(5)
+ log_test.info('All subscribers have joined the channel')
+ self.test_status = subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 10, src_list = subscriber.src_list)
+ time.sleep(5)
+ log_test.info('Leaving channel %d for subscriber on port %s from specific source ip %s' %(chan, subscriber.rx_port,subscriber.src_list[0]))
+ subscriber.channel_join(chan, delay = 0, src_list = subscriber.src_list[0], record_type = IGMP_V3_GR_TYPE_CHANGE_TO_EXCLUDE)
+ time.sleep(5)
+ self.recv_timeout = True
+ subscriber.recv_timeout = True
+ self.test_status = subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 10, src_list = subscriber.src_list[1])
+ if self.test_status is True:
+ self.test_status = subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 10, src_list = subscriber.src_list[0])
+ if self.test_status is True:
+ log_test.info('Subscriber should not receive data from channel %s on specific source %s, test is failed' %(chan, subscriber.rx_port))
+ self.test_status = False
+ subscriber.recv_timeout = False
+ self.recv_timeout = False
+ subscriber.channel_leave(chan, src_list = subscriber.src_list)
+# self.test_status = True
+ return self.test_status
+ def igmp_flow_check_join_change_to_exclude_again_include_back(self, subscriber, multiple_sub = False):
+ chan = 0
+ subscriber.channel_join(chan, delay = 0, src_list = subscriber.src_list)
+ self.num_joins += 1
+ while self.num_joins < self.num_subscribers:
+ time.sleep(5)
+ log_test.info('All subscribers have joined the channel')
+ self.test_status = subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 10, src_list = subscriber.src_list)
+ time.sleep(5)
+ log_test.info('Leaving channel %d for subscriber on port %s from specific source ip %s' %(chan, subscriber.rx_port,subscriber.src_list[0]))
+ subscriber.channel_join(chan, delay = 0, src_list = subscriber.src_list[0], record_type = IGMP_V3_GR_TYPE_CHANGE_TO_EXCLUDE)
+ time.sleep(5)
+ self.recv_timeout = True
+ subscriber.recv_timeout = True
+ self.test_status = subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 10, src_list = subscriber.src_list[1])
+ if self.test_status is True:
+ self.test_status = subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 10, src_list = subscriber.src_list[0])
+ if self.test_status is True:
+ log_test.info('Subscriber should not receive data from channel %s on specific source %s, test is failed' %(chan, subscriber.rx_port))
+ self.test_status = False
+ subscriber.recv_timeout = False
+ self.recv_timeout = False
+ log_test.info('Again include the source list in the group %s souce ip %s' %(chan, subscriber.rx_port,subscriber.src_list[0]))
+ subscriber.channel_join(chan, delay = 0, src_list = subscriber.src_list[0], record_type = IGMP_V3_GR_TYPE_CHANGE_TO_INCLUDE)
+ time.sleep(5)
+ self.recv_timeout = True
+ subscriber.recv_timeout = True
+ self.test_status = subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 10, src_list = subscriber.src_list)
+ subscriber.recv_timeout = False
+ self.recv_timeout = False
+ subscriber.channel_leave(chan, src_list = subscriber.src_list)
+# self.test_status = True
+ return self.test_status
- def voltha_igmp_next_verify(self, subscriber):
- #if subscriber.has_service('IGMP'):
- for c in xrange(self.VOLTHA_IGMP_ITERATIONS):
- for i in xrange(subscriber.num):
- if i:
- chan = subscriber.channel_join_next(delay=0, leave_flag = self.leave_flag)
- time.sleep(0.2)
- else:
- chan = subscriber.channel_join(i, delay=0)
- time.sleep(0.2)
- if subscriber.num == 1:
- subscriber.channel_leave(chan)
- log_test.info('Joined next channel %d for subscriber %s' %(chan, subscriber.name))
- #subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count=1)
- #log_test.info('Verified receive for channel %d, subscriber %s' %(chan, subscriber.name))
- self.test_status = True
- return self.test_status
+ def igmp_flow_check_join_change_to_block(self, subscriber, multiple_sub = False):
+ chan = 0
+ subscriber.channel_join(chan, delay = 0, src_list = subscriber.src_list)
+ self.num_joins += 1
+ while self.num_joins < self.num_subscribers:
+ time.sleep(5)
+ log_test.info('All subscribers have joined the channel')
+ self.test_status = subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 10, src_list = subscriber.src_list)
+ time.sleep(5)
+ log_test.info('Leaving channel %d for subscriber on port %s from specific source ip %s' %(chan, subscriber.rx_port,subscriber.src_list[0]))
+ subscriber.channel_join(chan, delay = 0, src_list = subscriber.src_list[0], record_type = IGMP_V3_GR_TYPE_BLOCK_OLD)
+ time.sleep(5)
+ self.recv_timeout = True
+ subscriber.recv_timeout = True
+ self.test_status = subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 10, src_list = subscriber.src_list[1])
+ if self.test_status is True:
+ self.test_status = subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 10, src_list = subscriber.src_list[0])
+ if self.test_status is True:
+ log_test.info('Subscriber should not receive data from channel %s on specific source %s, test is failed' %(chan, subscriber.rx_port))
+ self.test_status = False
+ subscriber.recv_timeout = False
+ self.recv_timeout = False
+ subscriber.channel_leave(chan, src_list = subscriber.src_list)
+ return self.test_status
- def voltha_subscribers(self, services, cbs = None, num_subscribers = 1, num_channels = 1):
+ def igmp_flow_check_join_change_to_block_again_allow_back(self, subscriber, multiple_sub = False):
+ chan = 0
+ subscriber.channel_join(chan, delay = 0, src_list = subscriber.src_list)
+ self.num_joins += 1
+ while self.num_joins < self.num_subscribers:
+ time.sleep(5)
+ log_test.info('All subscribers have joined the channel')
+ self.test_status = subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 10, src_list = subscriber.src_list)
+ time.sleep(5)
+ log_test.info('Leaving channel %d for subscriber on port %s from specific source ip %s' %(chan, subscriber.rx_port,subscriber.src_list[0]))
+ subscriber.channel_join(chan, delay = 0, src_list = subscriber.src_list[0], record_type = IGMP_V3_GR_TYPE_CHANGE_TO_EXCLUDE)
+ time.sleep(5)
+ self.recv_timeout = True
+ subscriber.recv_timeout = True
+ self.test_status = subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 10, src_list = subscriber.src_list[1])
+ if self.test_status is True:
+ self.test_status = subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 10, src_list = subscriber.src_list[0])
+ if self.test_status is True:
+ log_test.info('Subscriber should not receive data from channel %s on specific source %s, test is failed' %(chan, subscriber.rx_port))
+ self.test_status = False
+ subscriber.recv_timeout = False
+ self.recv_timeout = False
+ log_test.info('Again include the source list in the group %s souce ip %s' %(chan, subscriber.rx_port,subscriber.src_list[0]))
+ subscriber.channel_join(chan, delay = 0, src_list = subscriber.src_list[0], record_type = IGMP_V3_GR_TYPE_ALLOW_NEW)
+ time.sleep(5)
+ self.recv_timeout = True
+ subscriber.recv_timeout = True
+ self.test_status = subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 10, src_list = subscriber.src_list)
+ subscriber.recv_timeout = False
+ self.recv_timeout = False
+ subscriber.channel_leave(chan, src_list = subscriber.src_list)
+ return self.test_status
+
+ def voltha_igmp_jump_verify(self, subscriber):
+ if subscriber.has_service('IGMP'):
+ for i in xrange(subscriber.num):
+ log_test.info('Subscriber %s jumping channel' %subscriber.name)
+ chan = subscriber.channel_jump(delay=0)
+ subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 1)
+ log_test.info('Verified receive for channel %d, subscriber %s' %(chan, subscriber.name))
+ time.sleep(3)
+ log_test.info('Interface %s Jump RX stats for subscriber %s, %s' %(subscriber.iface, subscriber.name, subscriber.join_rx_stats))
+ self.test_status = True
+ return self.test_status
+
+ def voltha_igmp_next_verify(self, subscriber):
+ for c in xrange(self.VOLTHA_IGMP_ITERATIONS):
+ for i in xrange(subscriber.num):
+ if i:
+ chan = subscriber.channel_join_next(delay=0, leave_flag = self.leave_flag)
+ time.sleep(0.2)
+ else:
+ chan = subscriber.channel_join(i, delay=0)
+ time.sleep(0.2)
+ if subscriber.num == 1:
+ subscriber.channel_leave(chan)
+ log_test.info('Joined next channel %d for subscriber %s' %(chan, subscriber.name))
+ #subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count=1)
+ #log_test.info('Verified receive for channel %d, subscriber %s' %(chan, subscriber.name))
+ self.test_status = True
+ return self.test_status
+
+ def voltha_subscribers(self, services, cbs = None, num_subscribers = 1, num_channels = 1, src_list = None):
"""Test subscriber join next for channel surfing"""
voltha = VolthaCtrl(self.VOLTHA_HOST,
rest_port = self.VOLTHA_REST_PORT,
@@ -841,7 +987,7 @@
cbs = cbs,
port_list = self.generate_port_list(self.num_subscribers,
self.num_channels),
- services = services)
+ src_list = src_list, services = services)
assert_equal(test_status, True)
finally:
if switch_map is not None:
@@ -852,10 +998,8 @@
log_test.info('Uninstalling OLT app')
OnosCtrl.uninstall_app(self.olt_app_name)
-
-
- def subscriber_flows_check( self, num_subscribers = 10, num_channels = 1,
- channel_start = 0, cbs = None, port_list = [],
+ def subscriber_flows_check( self, num_subscribers = 1, num_channels = 1,
+ channel_start = 0, cbs = None, port_list = [], src_list = None,
services = None, negative_subscriber_auth = None):
self.test_status = False
self.ovs_cleanup()
@@ -865,8 +1009,17 @@
port_list = self.generate_port_list(num_subscribers, num_channels)
subscriber_tx_rx_ports = []
for i in range(num_subscribers):
- subscriber_tx_rx_ports.append((self.port_map['ports'][port_list[i][0]], self.port_map['ports'][port_list[i][1]]))
+ #subscriber_tx_rx_ports.append((self.port_map['ports'][port_list[i][0]], self.port_map['ports'][port_list[i][1]]))
+ subscriber_tx_rx_ports.append(Voltha_olt_subscribers(tx_port = self.port_map['ports'][port_list[i][0]],
+ rx_port = self.port_map['ports'][port_list[i][1]],
+ num_channels = num_channels,src_list = src_list,))
self.onos_aaa_load()
+ #load the ssm list for all subscriber channels
+ igmpChannel = IgmpChannel(src_list = src_list)
+ ssm_groups = map(lambda sub: sub.channels, subscriber_tx_rx_ports)
+ ssm_list = reduce(lambda ssm1, ssm2: ssm1+ssm2, ssm_groups)
+ igmpChannel.igmp_load_ssm_config(ssm_list, src_list= src_list)
+
self.thread_pool = ThreadPool(min(100, subscribers_count), queue_size=1, wait_timeout=1)
chan_leave = False #for single channel, multiple subscribers
@@ -885,7 +1038,6 @@
def generate_port_list(self, subscribers, channels):
return self.port_list[:subscribers]
-
@classmethod
def ovs_cleanup(cls):
##For every test case, delete all the OVS groups
@@ -897,7 +1049,6 @@
finally:
return
-
def test_olt_enable_disable(self):
log_test.info('Enabling OLT type %s, MAC %s' %(self.OLT_TYPE, self.OLT_MAC))
device_id, status = self.voltha.enable_device(self.OLT_TYPE, self.OLT_MAC)
@@ -1484,7 +1635,6 @@
reactor.callLater(0, tls_flow_check_with_disable_olt_device_scenario, df)
return df
-
@deferred(TESTCASE_TIMEOUT)
def test_subscriber_with_voltha_for_eap_tls_authentication_restarting_onu(self):
"""
@@ -1666,7 +1816,6 @@
reactor.callLater(0, tls_flow_check_on_two_subscribers_same_olt_device, df)
return df
-
@deferred(TESTCASE_TIMEOUT)
def test_two_subscribers_with_voltha_for_eap_tls_authentication_using_same_certificates(self):
"""
@@ -1960,7 +2109,6 @@
num_subscribers = num_subscribers,
num_channels = num_channels)
-
@deferred(TESTCASE_TIMEOUT)
def test_subscriber_with_voltha_for_dhcp_request(self):
"""
@@ -2322,8 +2470,6 @@
reactor.callLater(0, dhcp_flow_check_scenario, df)
return df
-
-
@deferred(TESTCASE_TIMEOUT)
def test_subscriber_with_voltha_for_dhcp_sending_multiple_request(self):
"""
@@ -2370,7 +2516,6 @@
reactor.callLater(0, dhcp_flow_check_scenario, df)
return df
-
@deferred(TESTCASE_TIMEOUT)
def test_subscriber_with_voltha_for_dhcp_requesting_desired_ip_address(self):
"""
@@ -2457,7 +2602,6 @@
reactor.callLater(0, dhcp_flow_check_scenario, df)
return df
-
@deferred(TESTCASE_TIMEOUT)
def test_subscriber_with_voltha_deactivating_dhcp_app_in_onos(self):
"""
@@ -2659,8 +2803,6 @@
reactor.callLater(0, dhcp_flow_check_scenario, df)
return df
-
-
@deferred(TESTCASE_TIMEOUT)
def test_subscriber_with_voltha_for_dhcp_with_multiple_times_disabling_of_olt(self):
"""
@@ -2721,7 +2863,6 @@
reactor.callLater(0, dhcp_flow_check_scenario, df)
return df
-
@deferred(TESTCASE_TIMEOUT)
def test_subscriber_with_voltha_for_dhcp_toggling_olt(self):
"""
@@ -2837,8 +2978,6 @@
reactor.callLater(0, dhcp_flow_check_scenario, df)
return df
-
-
@deferred(TESTCASE_TIMEOUT)
def test_subscriber_with_voltha_for_dhcp_disabling_onu_port(self):
"""
@@ -3071,7 +3210,6 @@
reactor.callLater(0, dhcp_flow_check_scenario, df)
return df
-
@deferred(TESTCASE_TIMEOUT)
def test_two_subscribers_with_voltha_for_dhcp_discover(self):
"""
@@ -3336,9 +3474,7 @@
thread2.join()
dhcp_flow_status = self.success
try:
-# if self.success is not True:
assert_equal(dhcp_flow_status, True)
- #assert_equal(status, True)
time.sleep(10)
finally:
self.voltha.disable_device(device_id, delete = True)
@@ -3392,9 +3528,7 @@
thread2.join()
dhcp_flow_status = self.success
try:
-# if self.success is not True:
assert_equal(dhcp_flow_status, True)
- #assert_equal(status, True)
time.sleep(10)
finally:
self.voltha.disable_device(device_id, delete = True)
@@ -3454,9 +3588,7 @@
thread3.join()
dhcp_flow_status = self.success
try:
-# if self.success is not True:
assert_equal(dhcp_flow_status, True)
- #assert_equal(status, True)
time.sleep(10)
finally:
self.voltha.disable_device(device_id, delete = True)
@@ -3466,7 +3598,6 @@
reactor.callLater(0, dhcp_flow_check_scenario, df)
return df
-
@deferred(TESTCASE_TIMEOUT)
def test_two_subscribers_with_voltha_for_dhcp_disabling_olt(self):
"""
@@ -3515,9 +3646,7 @@
thread3.join()
dhcp_flow_status = self.success
try:
-# if self.success is not True:
assert_equal(dhcp_flow_status, True)
- #assert_equal(status, True)
time.sleep(10)
finally:
self.voltha.disable_device(device_id, delete = True)
@@ -3577,9 +3706,7 @@
thread3.join()
dhcp_flow_status = self.success
try:
-# if self.success is not True:
assert_equal(dhcp_flow_status, True)
- #assert_equal(status, True)
time.sleep(10)
finally:
self.voltha.disable_device(device_id, delete = True)
@@ -3636,9 +3763,7 @@
thread3.join()
dhcp_flow_status = self.success
try:
-# if self.success is not True:
assert_equal(dhcp_flow_status, True)
- #assert_equal(status, True)
time.sleep(10)
finally:
self.voltha.disable_device(device_id, delete = True)
@@ -4109,6 +4234,15 @@
6. Verify that multicast data packets are being recieved on join sent uni port on ONU to cord-tester.
"""
+ """Test subscriber join next for channel surfing with 3 subscribers browsing 3 channels each"""
+ num_subscribers = 1
+ num_channels = 1
+ services = ('IGMP')
+ cbs = (self.igmp_flow_check, None, None)
+ self.voltha_subscribers(services, cbs = cbs,
+ num_subscribers = num_subscribers,
+ num_channels = num_channels)
+
def test_subscriber_with_voltha_for_igmp_leave_verify_traffic(self):
"""
Test Method:
@@ -4122,6 +4256,16 @@
7. Send igmp leave for a multicast group address multi-group-addressA.
8. Verify that multicast data packets are not being recieved on leave sent uni port on ONU to cord-tester.
"""
+ """Test subscriber join next for channel surfing with 3 subscribers browsing 3 channels each"""
+ num_subscribers = 1
+ num_channels = 1
+ services = ('IGMP')
+ cbs = (self.igmp_flow_check, None, None)
+ self.voltha_subscribers(services, cbs = cbs,
+ num_subscribers = num_subscribers,
+ num_channels = num_channels)
+
+
def test_subscriber_with_voltha_for_igmp_leave_and_again_join_verify_traffic(self):
"""
Test Method:
@@ -4136,6 +4280,15 @@
8. Verify that multicast data packets are not being recieved on leave sent uni port on ONU to cord-tester.
9. Repeat steps 4 to 6.
"""
+ """Test subscriber join next for channel surfing with 3 subscribers browsing 3 channels each"""
+ num_subscribers = 1
+ num_channels = 1
+ services = ('IGMP')
+ cbs = (self.igmp_flow_check, None, None)
+ self.voltha_subscribers(services, cbs = cbs,
+ num_subscribers = num_subscribers,
+ num_channels = num_channels)
+
def test_subscriber_with_voltha_for_igmp_2_groups_joins_verify_traffic(self):
"""
@@ -4149,6 +4302,16 @@
6. Verify that 2 groups multicast data packets are being recieved on join sent uni port on ONU to cord-tester.
"""
+ """Test subscriber join next for channel surfing with 3 subscribers browsing 3 channels each"""
+ num_subscribers = 1
+ num_channels = 2
+ services = ('IGMP')
+ cbs = (self.igmp_flow_check, None, None)
+ self.voltha_subscribers(services, cbs = cbs,
+ num_subscribers = num_subscribers,
+ num_channels = num_channels)
+
+
def test_subscriber_with_voltha_for_igmp_2_groups_joins_and_leave_for_one_group_verify_traffic(self):
"""
Test Method:
@@ -4163,6 +4326,15 @@
8. Verify that multicast data packets of group(multi-group-addressA) are not being recieved on leave sent uni port on ONU to cord-tester.
9. Verify that multicast data packets of group (multi-group-addressB) are being recieved on join sent uni port on ONU to cord-tester.
"""
+ """Test subscriber join next for channel surfing with 3 subscribers browsing 3 channels each"""
+ num_subscribers = 1
+ num_channels = 2
+ services = ('IGMP')
+ cbs = (self.igmp_flow_check, None, None)
+ self.voltha_subscribers(services, cbs = cbs,
+ num_subscribers = num_subscribers,
+ num_channels = num_channels)
+
def test_subscriber_with_voltha_for_igmp_join_different_group_src_list_verify_traffic(self):
"""
@@ -4177,8 +4349,16 @@
7. Send multicast data traffic for a group (multi-group-addressA) from other uni port with source ip as src_listB on ONU.
8. Verify that multicast data packets are not being recieved on join sent uni port on ONU from other source list to cord-tester.
"""
+ num_subscribers = 1
+ num_channels = 1
+ services = ('IGMP')
+ cbs = (self.igmp_flow_check, None, None)
+ self.voltha_subscribers(services, cbs = cbs, src_list = ['2.3.4.5','3.4.5.6'],
+ num_subscribers = num_subscribers,
+ num_channels = num_channels)
- def test_subscriber_with_voltha_for_igmp_change_to_exclude_src_list_verify_traffic(self):
+
+ def test_subscriber_with_voltha_for_igmp_change_to_exclude_mcast_group_verify_traffic(self):
"""
Test Method:
0. Make sure that voltha is up and running on CORD-POD setup.
@@ -4193,7 +4373,17 @@
9. Verify that multicast data packets are not being recieved on join sent uni port on ONU from other source list to cord-tester.
"""
- def test_subscriber_with_voltha_for_igmp_change_to_allow_src_list_verify_traffic(self):
+ num_subscribers = 1
+ num_channels = 2
+ services = ('IGMP')
+ cbs = (self.igmp_flow_check_join_change_to_exclude, None, None)
+ self.voltha_subscribers(services, cbs = cbs, src_list = ['2.3.4.5','3.4.5.6'],
+ num_subscribers = num_subscribers,
+ num_channels = num_channels)
+
+
+
+ def test_subscriber_with_voltha_for_igmp_change_to_include_back_from_exclude_mcast_group_verify_traffic(self):
"""
Test Method:
0. Make sure that voltha is up and running on CORD-POD setup.
@@ -4207,6 +4397,14 @@
8. Send multicast data traffic for a group (multi-group-addressA) from other uni port with source ip as src_listA on ONU.
9. Verify that multicast data packets are being recieved on join sent uni port on ONU from other source list to cord-tester.
"""
+ num_subscribers = 1
+ num_channels = 1
+ services = ('IGMP')
+ cbs = (self.igmp_flow_check_join_change_to_exclude_again_include_back, None, None)
+ self.voltha_subscribers(services, cbs = cbs, src_list = ['2.3.4.5','3.4.5.6'],
+ num_subscribers = num_subscribers,
+ num_channels = num_channels)
+
def test_subscriber_with_voltha_for_igmp_change_to_block_src_list_verify_traffic(self):
"""
@@ -4222,6 +4420,18 @@
8. Send multicast data traffic for a group (multi-group-addressA) from other uni port with source ip as src_listA on ONU.
9. Verify that multicast data packets are not being recieved on join sent uni port on ONU from other source list to cord-tester.
"""
+
+ num_subscribers = 1
+ num_channels = 1
+ services = ('IGMP')
+ cbs = (self.igmp_flow_check_join_change_to_block, None, None)
+ self.voltha_subscribers(services, cbs = cbs, src_list = ['2.3.4.5','3.4.5.6'],
+ num_subscribers = num_subscribers,
+ num_channels = num_channels)
+
+
+
+
def test_subscriber_with_voltha_for_igmp_allow_new_src_list_verify_traffic(self):
"""
Test Method:
@@ -4236,6 +4446,16 @@
8. Send multicast data traffic for a group (multi-group-addressA) from other uni port with source ip as src_listB on ONU.
9. Verify that multicast data packets are being recieved on join sent uni port on ONU from other source list to cord-tester.
"""
+
+ num_subscribers = 1
+ num_channels = 1
+ services = ('IGMP')
+ cbs = (self.igmp_flow_check_join_change_to_block_again_allow_back, None, None)
+ self.voltha_subscribers(services, cbs = cbs, src_list = ['2.3.4.5','3.4.5.6'],
+ num_subscribers = num_subscribers,
+ num_channels = num_channels)
+
+
def test_subscriber_with_voltha_for_igmp_include_empty_src_list_verify_traffic(self):
"""
Test Method: