Test: Changes to subscriber multitable changes for single channel, N subscriber tests.
Changes to subscriber onos app to remove metabuilder constraints for next flow objectives.
Change-Id: I6fb91e9706df122c42fce7eb4d6598f3a56b4d71
diff --git a/src/test/apps/ciena-cordigmp-multitable-2.0-SNAPSHOT.oar b/src/test/apps/ciena-cordigmp-multitable-2.0-SNAPSHOT.oar
index 892d066..d22de1b 100644
--- a/src/test/apps/ciena-cordigmp-multitable-2.0-SNAPSHOT.oar
+++ b/src/test/apps/ciena-cordigmp-multitable-2.0-SNAPSHOT.oar
Binary files differ
diff --git a/src/test/apps/ciena-cordigmp.multi-table/src/main/java/org/ciena/cordigmp/CordIgmp.java b/src/test/apps/ciena-cordigmp.multi-table/src/main/java/org/ciena/cordigmp/CordIgmp.java
index 441a0a4..a2ed133 100644
--- a/src/test/apps/ciena-cordigmp.multi-table/src/main/java/org/ciena/cordigmp/CordIgmp.java
+++ b/src/test/apps/ciena-cordigmp.multi-table/src/main/java/org/ciena/cordigmp/CordIgmp.java
@@ -233,23 +233,18 @@
return;
}
ConnectPoint loc = info.sink().get();
- TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder()
- .matchEthType(Ethernet.TYPE_IPV4)
- .matchIPDst(info.route().group().toIpPrefix());
- if (vlanEnabled) {
- metabuilder.matchVlanId(VlanId.vlanId((short) mcastVlan));
- }
+ log.info("Removing flow for subscriber port: {}, group {}",
+ loc.port(), info.route().group());
NextObjective next = DefaultNextObjective.builder()
.fromApp(appId)
.addTreatment(DefaultTrafficTreatment.builder().setOutput(loc.port()).build())
.withType(NextObjective.Type.BROADCAST)
- .withMeta(metabuilder.build())
.withId(groups.get(info.route().group()))
.removeFromExisting(new ObjectiveContext() {
@Override
public void onSuccess(Objective objective) {
//TODO: change to debug
- log.info("Next Objective {} installed", objective.id());
+ log.info("Next Objective {} removed", objective.id());
}
@Override
@@ -281,17 +276,10 @@
Integer nextId = groups.computeIfAbsent(route.group(), (g) -> {
Integer id = flowObjectiveService.allocateNextId();
- TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder()
- .matchEthType(Ethernet.TYPE_IPV4)
- .matchIPDst(g.toIpPrefix());
- if (vlanEnabled) {
- metabuilder.matchVlanId(VlanId.vlanId((short) mcastVlan));
- }
NextObjective next = DefaultNextObjective.builder()
.fromApp(appId)
.addTreatment(DefaultTrafficTreatment.builder().setOutput(sink.port()).build())
.withType(NextObjective.Type.BROADCAST)
- .withMeta(metabuilder.build())
.withId(id)
.add(new ObjectiveContext() {
@Override
@@ -303,7 +291,7 @@
@Override
public void onError(Objective objective, ObjectiveError error) {
//TODO: change to debug
- log.info("Next Objective {} failed, because {}",
+ log.info("Next Objective {} failed to add, because {}",
objective.id(),
error);
}
@@ -312,9 +300,9 @@
flowObjectiveService.next(sink.deviceId(), next);
TrafficSelector.Builder mcast = DefaultTrafficSelector.builder()
- .matchInPort(oltInfo.uplink())
- .matchEthType(Ethernet.TYPE_IPV4)
- .matchIPDst(g.toIpPrefix());
+ .matchInPort(oltInfo.uplink())
+ .matchEthType(Ethernet.TYPE_IPV4)
+ .matchIPDst(g.toIpPrefix());
if (vlanEnabled) {
mcast.matchVlanId(VlanId.vlanId((short) mcastVlan));
@@ -344,31 +332,28 @@
flowObjectiveService.forward(sink.deviceId(), fwd);
sync.set(true);
- log.info("Installed flows for device: {}, id {}, ip {}", sink.deviceId(), id, g.toIpPrefix());
+ log.info("Installed flows for device: {}, id {}, ip {}, port {}",
+ sink.deviceId(), id, g.toIpPrefix(), sink.port());
return id;
});
if (!sync.get()) {
- TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder()
- .matchEthType(Ethernet.TYPE_IPV4)
- .matchIPDst(route.group().toIpPrefix());
NextObjective next = DefaultNextObjective.builder()
.fromApp(appId)
.addTreatment(DefaultTrafficTreatment.builder().setOutput(sink.port()).build())
.withType(NextObjective.Type.BROADCAST)
.withId(nextId)
- .withMeta(metabuilder.build())
.addToExisting(new ObjectiveContext() {
@Override
public void onSuccess(Objective objective) {
//TODO: change to debug
- log.info("Next Objective {} installed", objective.id());
+ log.info("Next Objective {} installed to existing", objective.id());
}
@Override
public void onError(Objective objective, ObjectiveError error) {
//TODO: change to debug
- log.info("Next Objective {} failed, because {}",
+ log.info("Next Objective {} failed to install to existing, because {}",
objective.id(),
error);
}
@@ -376,8 +361,8 @@
flowObjectiveService.next(sink.deviceId(), next);
- log.info("Append flows for device {}, id {}, ip {}", sink.deviceId(), nextId,
- route.group().toIpPrefix());
+ log.info("Append flows for device {}, id {}, ip {}, port {}", sink.deviceId(), nextId,
+ route.group().toIpPrefix(), sink.port());
}
}
diff --git a/src/test/setup/olt_config.json b/src/test/setup/olt_config.json
index 2840af6..88380e6 100644
--- a/src/test/setup/olt_config.json
+++ b/src/test/setup/olt_config.json
@@ -1,2 +1,2 @@
-{ "olt" : false , "port_map" : { "ports" : [ "veth0", "veth2" ], "tx" : "veth2", "rx" : "veth0", "host" : "enp0s8", "start_vlan" : 1000 }, "uplink" : 2, "vlan" : 0 }
-
+{ "olt" : false , "port_map" : { "ports" : [ "veth0", "veth2" ], "tx" : "veth2", "rx" : "veth0", "host" : "eth0", "start_vlan" : 1000 }, "uplink" : 2, "vlan" : 0 }
+
diff --git a/src/test/subscriberMultiTable/subscriberMultiTableTest.py b/src/test/subscriberMultiTable/subscriberMultiTableTest.py
index c245bd0..10f61bd 100644
--- a/src/test/subscriberMultiTable/subscriberMultiTableTest.py
+++ b/src/test/subscriberMultiTable/subscriberMultiTableTest.py
@@ -48,6 +48,7 @@
self.tx_intf = self.port_map[self.PORT_TX_DEFAULT]
self.rx_intf = self.port_map[self.PORT_RX_DEFAULT]
+ log.info('Subscriber %s, rx interface %s, uplink interface %s' %(name, self.rx_intf, self.tx_intf))
Channels.__init__(self, num, channel_start = channel_start,
iface = self.rx_intf, iface_mcast = self.tx_intf, mcast_cb = mcast_cb)
self.name = name
@@ -61,6 +62,7 @@
self.join_map = {}
##accumulated join recv stats
self.join_rx_stats = Stats()
+ self.recv_timeout = False
def has_service(self, service):
if self.service_map.has_key(service):
@@ -116,13 +118,20 @@
if self.join_map.has_key(c):
self.join_map[c][stats_type].update(packets = packets, t = t)
- def channel_receive(self, chan, cb = None, count = 1):
- log.info('Subscriber %s receiving from group %s, channel %d' %(self.name, self.gaddr(chan), chan))
- self.recv(chan, cb = cb, count = count)
+ def channel_receive(self, chan, cb = None, count = 1, timeout = 5):
+ log.info('Subscriber %s on port %s receiving from group %s, channel %d' %
+ (self.name, self.rx_intf, self.gaddr(chan), chan))
+ r = self.recv(chan, cb = cb, count = count, timeout = timeout)
+ if self.recv_timeout:
+ ##Negative test case is disabled for now
+ assert_equal(len(r), 0)
def recv_channel_cb(self, pkt):
##First verify that we have received the packet for the joined instance
- log.debug('Packet received for group %s, subscriber %s' %(pkt[IP].dst, self.name))
+ log.info('Packet received for group %s, subscriber %s, port %s' %
+ (pkt[IP].dst, self.name, self.rx_intf))
+ if self.recv_timeout:
+ return
chan = self.caddr(pkt[IP].dst)
assert_equal(chan in self.join_map.keys(), True)
recv_time = monotonic.monotonic() * 1000000
@@ -179,6 +188,10 @@
},
}
test_services = ('IGMP',)
+ num_joins = 0
+ num_subscribers = 0
+ num_channels = 0
+ recv_timeout = False
@classmethod
def setUpClass(cls):
@@ -250,14 +263,7 @@
else:
config = network_cfg
log.info('Restarting ONOS with new network configuration')
- cfg = json.dumps(config)
- with open('{}/network-cfg.json'.format(cls.onos_config_path), 'w') as f:
- f.write(cfg)
-
- try:
- return cord_test_onos_restart()
- except:
- return False
+ return cord_test_onos_restart(config = config)
@classmethod
def remove_onos_config(cls):
@@ -379,14 +385,28 @@
def igmp_verify(self, subscriber):
chan = 0
if subscriber.has_service('IGMP'):
- for i in range(5):
- log.info('Joining channel %d for subscriber %s' %(chan, subscriber.name))
- subscriber.channel_join(chan, delay = 0)
- subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 1)
+ ##We wait for all the subscribers to join before triggering leaves
+ if subscriber.rx_port > 1:
+ time.sleep(5)
+ subscriber.channel_join(chan, delay = 0)
+ self.num_joins += 1
+ while self.num_joins < self.num_subscribers:
+ time.sleep(5)
+ log.info('All subscribers have joined the channel')
+ for i in range(10):
+ subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 10)
log.info('Leaving channel %d for subscriber %s' %(chan, subscriber.name))
subscriber.channel_leave(chan)
- time.sleep(3)
+ time.sleep(5)
log.info('Interface %s Join RX stats for subscriber %s, %s' %(subscriber.iface, subscriber.name,subscriber.join_rx_stats))
+ #Should not receive packets for this subscriber
+ self.recv_timeout = True
+ subscriber.recv_timeout = True
+ subscriber.channel_receive(chan, cb = subscriber.recv_channel_cb, count = 10)
+ subscriber.recv_timeout = False
+ self.recv_timeout = False
+ log.info('Joining channel %d for subscriber %s' %(chan, subscriber.name))
+ subscriber.channel_join(chan, delay = 0)
self.test_status = True
def igmp_jump_verify(self, subscriber):
@@ -454,8 +474,10 @@
num_channels = num_channels, channel_start = channel_start, port_list = port_list)
self.onos_aaa_load()
self.thread_pool = ThreadPool(min(100, self.num_subscribers), queue_size=1, wait_timeout=1)
+ chan_leave = False #for single channel, multiple subscribers
if cbs is None:
cbs = (self.tls_verify, self.dhcp_verify, self.igmp_verify)
+ chan_leave = True
for subscriber in self.subscriber_list:
subscriber.start()
pool_object = subscriber_pool(subscriber, cbs)
@@ -463,33 +485,39 @@
self.thread_pool.cleanUpThreads()
for subscriber in self.subscriber_list:
subscriber.stop()
+ if chan_leave is True:
+ subscriber.channel_leave(0)
+ self.num_subscribers = 0
return self.test_status
def test_subscriber_join_recv(self):
"""Test subscriber join and receive"""
- num_subscribers = 5
- num_channels = 1
- test_status = self.subscriber_join_verify(num_subscribers = num_subscribers,
- num_channels = num_channels,
- port_list = self.generate_port_list(num_subscribers, num_channels))
+ self.num_subscribers = 5
+ self.num_channels = 1
+ test_status = self.subscriber_join_verify(num_subscribers = self.num_subscribers,
+ num_channels = self.num_channels,
+ port_list = self.generate_port_list(self.num_subscribers,
+ self.num_channels))
assert_equal(test_status, True)
def test_subscriber_join_jump(self):
"""Test subscriber join and receive for channel surfing"""
- num_subscribers = 5
- num_channels = 5
- test_status = self.subscriber_join_verify(num_subscribers = num_subscribers,
- num_channels = num_channels,
+ self.num_subscribers = 5
+ self.num_channels = 10
+ test_status = self.subscriber_join_verify(num_subscribers = self.num_subscribers,
+ num_channels = self.num_channels,
cbs = (self.tls_verify, self.dhcp_jump_verify, self.igmp_jump_verify),
- port_list = self.generate_port_list(num_subscribers, num_channels))
+ port_list = self.generate_port_list(self.num_subscribers,
+ self.num_channels))
assert_equal(test_status, True)
def test_subscriber_join_next(self):
"""Test subscriber join next for channels"""
- num_subscribers = 5
- num_channels = 5
- test_status = self.subscriber_join_verify(num_subscribers = num_subscribers,
- num_channels = num_channels,
+ self.num_subscribers = 5
+ self.num_channels = 10
+ test_status = self.subscriber_join_verify(num_subscribers = self.num_subscribers,
+ num_channels = self.num_channels,
cbs = (self.tls_verify, self.dhcp_next_verify, self.igmp_next_verify),
- port_list = self.generate_port_list(num_subscribers, num_channels))
+ port_list = self.generate_port_list(self.num_subscribers,
+ self.num_channels))
assert_equal(test_status, True)
diff --git a/src/test/utils/Channels.py b/src/test/utils/Channels.py
index ccfebb5..f0f5960 100644
--- a/src/test/utils/Channels.py
+++ b/src/test/utils/Channels.py
@@ -1,12 +1,12 @@
-#
+#
# Copyright 2016-present Ciena Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -36,7 +36,7 @@
IP_DST = '224.0.1.1'
igmp_eth = Ether(dst = IGMP_DST_MAC, src = IGMP_SRC_MAC, type = ETH_P_IP)
igmp_ip = IP(dst = IP_DST, src = IP_SRC)
- ssm_list = []
+ ssm_list = []
def __init__(self, iface = 'veth0', ssm_list = [], src_list = ['1.2.3.4'], delay = 2):
self.iface = iface
@@ -45,7 +45,7 @@
self.delay = delay
self.onos_ctrl = OnosCtrl('org.onosproject.igmp')
self.onos_ctrl.activate()
-
+
def igmp_load_ssm_config(self, ssm_list = []):
if not ssm_list:
ssm_list = self.ssm_list
@@ -124,13 +124,12 @@
self.streams = None
self.channel_states = {}
self.last_chan = None
- self.recv_sock = L2Socket(iface = iface, type = ETH_P_IP)
self.iface_mcast = iface_mcast
self.mcast_cb = mcast_cb
for c in range(self.num):
self.channel_states[c] = [self.Idle]
IgmpChannel.__init__(self, ssm_list = self.channels, iface=iface)
-
+
def generate(self, num, channel_start = 0):
start = (225 << 24) | ( ( (channel_start >> 16) & 0xff) << 16 ) | \
( ( (channel_start >> 8) & 0xff ) << 8 ) | (channel_start) & 0xff
@@ -187,7 +186,7 @@
if chan == self.last_chan:
self.last_chan = None
return True
-
+
def join_next(self, chan = None):
if chan is None:
chan = self.last_chan
@@ -198,7 +197,7 @@
else:
leave = chan - 1
join = chan
-
+
if join >= self.num:
join = 0
@@ -238,7 +237,7 @@
recv_time = monotonic.monotonic()
log.debug('Packet received in %.3f usecs' %(recv_time - send_time))
- def recv(self, chan, cb = None, count = 1):
+ def recv(self, chan, cb = None, count = 1, timeout = 5):
if chan is None:
return None
if type(chan) == type([]) or type(chan) == type(()):
@@ -248,7 +247,8 @@
groups = (self.gaddr(chan),)
if cb is None:
cb = self.recv_cb
- sniff(prn = cb, count=count, lfilter = lambda p: IP in p and p[IP].dst in groups, opened_socket = self.recv_sock)
+ return sniff(prn = cb, count=count, timeout = timeout,
+ lfilter = lambda p: IP in p and p[IP].dst in groups, iface = self.iface)
def stop(self):
if self.streams: