Implementation of many modify tests
Use --test-params option instead of old --param option.
Implemented pkt_action_setup that does the core work of
generating in/out pkts and action lists. Use this for a bunch
of modify test cases.
diff --git a/tests/pktact.py b/tests/pktact.py
index be7d7ed..7c3fcbb 100644
--- a/tests/pktact.py
+++ b/tests/pktact.py
@@ -624,9 +624,7 @@
"""
def runTest(self):
- vid = TEST_VID_DEFAULT
- if pa_config["param"] is not None:
- vid = pa_config["param"]
+ vid = test_param_get(self.config, 'vid', default=TEST_VID_DEFAULT)
flow_match_test(self, pa_port_map, dl_vlan=vid)
class ExactMatchTaggedMany(BaseMatchCase):
@@ -665,9 +663,7 @@
SingleWildcardMatch with tagged packets
"""
def runTest(self):
- vid = TEST_VID_DEFAULT
- if pa_config["param"] is not None:
- vid = pa_config["param"]
+ vid = test_param_get(self.config, 'vid', default=TEST_VID_DEFAULT)
for wc in WILDCARD_VALUES:
flow_match_test(self, pa_port_map, wildcards=wc, dl_vlan=vid,
max_test=10)
@@ -693,9 +689,7 @@
Match one field with tagged packets
"""
def runTest(self):
- vid = TEST_VID_DEFAULT
- if pa_config["param"] is not None:
- vid = pa_config["param"]
+ vid = test_param_get(self.config, 'vid', default=TEST_VID_DEFAULT)
for wc in WILDCARD_VALUES:
all_exp_one_wildcard = ofp.OFPFW_ALL ^ wc
flow_match_test(self, pa_port_map, wildcards=all_exp_one_wildcard,
@@ -720,9 +714,7 @@
AllWildcardMatch with tagged packets
"""
def runTest(self):
- vid = TEST_VID_DEFAULT
- if pa_config["param"] is not None:
- vid = pa_config["param"]
+ vid = test_param_get(self.config, 'vid', default=TEST_VID_DEFAULT)
flow_match_test(self, pa_port_map, wildcards=ofp.OFPFW_ALL,
dl_vlan=vid)
@@ -766,10 +758,7 @@
Just send a packet thru the switch
"""
def runTest(self):
- vid = TEST_VID_DEFAULT
- if pa_config["param"] is not None:
- vid = pa_config["param"]
- print "Param is " + str(pa_config["param"])
+ vid = test_param_get(self.config, 'vid', default=TEST_VID_DEFAULT)
pkt = simple_tcp_packet(dl_vlan_enable=True, dl_vlan=vid)
of_ports = pa_port_map.keys()
of_ports.sort()
@@ -782,11 +771,14 @@
test_prio["PacketOnlyTagged"] = -1
class ModifyVID(BaseMatchCase):
+ """
+ Modify the VLAN ID in the VLAN tag of a tagged packet
+ """
def runTest(self):
old_vid = 2
new_vid = 3
sup_acts = supported_actions_get(self)
- if not(sup_acts & 1<<ofp.OFPAT_SET_VLAN_VID):
+ if not (sup_acts & 1 << ofp.OFPAT_SET_VLAN_VID):
pa_logger.info("Skipping modify VLAN tag test")
return
@@ -799,10 +791,13 @@
action_list=[vid_act])
class StripVLANTag(BaseMatchCase):
+ """
+ Strip the VLAN tag from a tagged packet
+ """
def runTest(self):
old_vid = 2
sup_acts = supported_actions_get(self)
- if not(sup_acts & 1<<ofp.OFPAT_STRIP_VLAN):
+ if not (sup_acts & 1 << ofp.OFPAT_STRIP_VLAN):
pa_logger.info("Skipping strip VLAN tag test")
return
@@ -816,42 +811,159 @@
flow_match_test(self, pa_port_map, pkt=pkt, exp_pkt=exp_pkt,
action_list=[vid_act])
+def init_pkt_args():
+ """
+ Pass back a dictionary with default packet arguments
+ """
+ args = {}
+ args["dl_src"] = '00:23:45:67:89:AB'
+
+ dl_vlan_enable=False
+ dl_vlan=-1
+ if pa_config["test-params"]["vid"]:
+ dl_vlan_enable=True
+ dl_vlan = pa_config["test-params"]["vid"]
+
+# Unpack operator is ** on a dictionary
+
+ return args
+
class ModifyL2Src(BaseMatchCase):
+ """
+ Modify the source MAC address (TP1)
+ """
def runTest(self):
- pa_logger("To be implemented")
+ sup_acts = supported_actions_get(self)
+ if not (sup_acts & 1 << ofp.OFPAT_SET_DL_SRC):
+ pa_logger.info("Skipping ModifyL2Src test")
+ return
+
+ (pkt, exp_pkt, acts) = pkt_action_setup(self, mod_fields=['dl_src'],
+ check_test_params=True)
+ flow_match_test(self, pa_port_map, pkt=pkt, exp_pkt=exp_pkt,
+ action_list=acts, max_test=2)
class ModifyL2Dst(BaseMatchCase):
+ """
+ Modify the dest MAC address (TP1)
+ """
def runTest(self):
- pa_logger("To be implemented")
+ sup_acts = supported_actions_get(self)
+ if not (sup_acts & 1 << ofp.OFPAT_SET_DL_DST):
+ pa_logger.info("Skipping ModifyL2Dst test")
+ return
+
+ (pkt, exp_pkt, acts) = pkt_action_setup(self, mod_fields=['dl_dst'],
+ check_test_params=True)
+ flow_match_test(self, pa_port_map, pkt=pkt, exp_pkt=exp_pkt,
+ action_list=acts, max_test=2)
class ModifyL3Src(BaseMatchCase):
+ """
+ Modify the source IP address of an IP packet (TP1)
+ """
def runTest(self):
- pa_logger("To be implemented")
+ sup_acts = supported_actions_get(self)
+ if not (sup_acts & 1 << ofp.OFPAT_SET_NW_SRC):
+ pa_logger.info("Skipping ModifyL3Src test")
+ return
+
+ (pkt, exp_pkt, acts) = pkt_action_setup(self, mod_fields=['ip_src'],
+ check_test_params=True)
+ flow_match_test(self, pa_port_map, pkt=pkt, exp_pkt=exp_pkt,
+ action_list=acts, max_test=2)
class ModifyL3Dst(BaseMatchCase):
+ """
+ Modify the dest IP address of an IP packet (TP1)
+ """
def runTest(self):
- pa_logger("To be implemented")
+ sup_acts = supported_actions_get(self)
+ if not (sup_acts & 1 << ofp.OFPAT_SET_NW_DST):
+ pa_logger.info("Skipping ModifyL3Dst test")
+ return
+
+ (pkt, exp_pkt, acts) = pkt_action_setup(self, mod_fields=['ip_dst'],
+ check_test_params=True)
+ flow_match_test(self, pa_port_map, pkt=pkt, exp_pkt=exp_pkt,
+ action_list=acts, max_test=2)
class ModifyL4Src(BaseMatchCase):
+ """
+ Modify the source TCP port of a TCP packet (TP1)
+ """
def runTest(self):
- pa_logger("To be implemented")
+ sup_acts = supported_actions_get(self)
+ if not (sup_acts & 1 << ofp.OFPAT_SET_TP_SRC):
+ pa_logger.info("Skipping ModifyL4Src test")
+ return
+
+ (pkt, exp_pkt, acts) = pkt_action_setup(self, mod_fields=['tcp_sport'],
+ check_test_params=True)
+ flow_match_test(self, pa_port_map, pkt=pkt, exp_pkt=exp_pkt,
+ action_list=acts, max_test=2)
class ModifyL4Dst(BaseMatchCase):
+ """
+ Modify the dest TCP port of a TCP packet (TP1)
+ """
def runTest(self):
- pa_logger("To be implemented")
+ sup_acts = supported_actions_get(self)
+ if not (sup_acts & 1 << ofp.OFPAT_SET_TP_DST):
+ pa_logger.info("Skipping ModifyL4Dst test")
+ return
+
+ (pkt, exp_pkt, acts) = pkt_action_setup(self, mod_fields=['tcp_dport'],
+ check_test_params=True)
+ flow_match_test(self, pa_port_map, pkt=pkt, exp_pkt=exp_pkt,
+ action_list=acts, max_test=2)
class ModifyTOS(BaseMatchCase):
+ """
+ Modify the IP type of service of an IP packet (TP1)
+ """
def runTest(self):
- pa_logger("To be implemented")
+ sup_acts = supported_actions_get(self)
+ if not (sup_acts & 1 << ofp.OFPAT_SET_NW_TOS):
+ pa_logger.info("Skipping ModifyTOS test")
+ return
-test_prio["ModifyL2Src"] = -1
-test_prio["ModifyL2Dst"] = -1
-test_prio["ModifyL3Src"] = -1
-test_prio["ModifyL3Dst"] = -1
-test_prio["ModifyL4Src"] = -1
-test_prio["ModifyL4Dst"] = -1
-test_prio["ModifyTOS"] = -1
+ (pkt, exp_pkt, acts) = pkt_action_setup(self, mod_fields=['ip_tos'],
+ check_test_params=True)
+ flow_match_test(self, pa_port_map, pkt=pkt, exp_pkt=exp_pkt,
+ action_list=acts, max_test=2)
+#@todo Need to implement tagged versions of the above tests
+#
+#@todo Implement a test case that strips tag 2, adds tag 3
+# and modifies tag 4 to tag 5. Then verify (in addition) that
+# tag 6 does not get modified.
+
+class MixedVLAN(BaseMatchCase):
+ """
+ Test mixture of VLAN tag actions
+
+ Strip tag 2 on port 1, send to port 2
+ Add tag 3 on port 1, send to port 2
+ Modify tag 4 to 5 on port 1, send to port 2
+ All other traffic from port 1, send to port 3
+ All traffic from port 2 sent to port 4
+ Use exact matches with different packets for all mods
+ Verify the following: (port, vid)
+ (port 1, vid 2) => VLAN tag stripped, out port 2
+ (port 1, no tag) => tagged packet w/ vid 2 out port 2
+ (port 1, vid 4) => tagged packet w/ vid 5 out port 2
+ (port 1, vid 5) => tagged packet w/ vid 5 out port 2
+ (port 1, vid 6) => tagged packet w/ vid 6 out port 2
+ (port 2, no tag) => untagged packet out port 4
+ (port 2, vid 2-6) => unmodified packet out port 4
+
+ Variation: Might try sending VID 5 to port 3 and check.
+ If only VID 5 distinguishes pkt, this will fail on some platforms
+ """
+
+test_prio["MixedVLAN"] = -1
+
def supported_actions_get(parent, use_cache=True):
"""
Get the bitmap of supported actions from the switch
diff --git a/tests/testutils.py b/tests/testutils.py
index 90d30f4..5e57aba 100644
--- a/tests/testutils.py
+++ b/tests/testutils.py
@@ -17,6 +17,7 @@
import oftest.action as action
import oftest.parse as parse
import logging
+import types
# Some useful defines
IP_ETHERTYPE = 0x800
@@ -496,3 +497,171 @@
parent.logger.info("Ran " + str(test_count) + " tests; exiting")
return
+def test_param_get(config, key, default=None):
+ """
+ Return value passed via test-params if present
+
+ @param config The configuration structure for OFTest
+ @param key The lookup key
+ @param default Default value to use if not found
+
+ If the pair 'key=val' appeared in the string passed to --test-params
+ on the command line, return val (as interpreted by exec). Otherwise
+ return default value.
+ """
+ try:
+ exec config["test_params"]
+ except:
+ return default
+
+ s = "val = " + str(key)
+ try:
+ exec s
+ return val
+ except:
+ return default
+
+def action_generate(parent, field_to_mod, mod_field_vals):
+ """
+ Create an action to modify the field indicated in field_to_mod
+
+ @param parent Must implement, assertTrue
+ @param field_to_mod The field to modify as a string name
+ @param mod_field_vals Hash of values to use for modified values
+ """
+
+ act = None
+
+ if field_to_mod in ['pktlen']:
+ return None
+
+ if field_to_mod == 'dl_dst':
+ act = action.action_set_dl_dst()
+ act.dl_addr = parse.parse_mac(mod_field_vals['dl_dst'])
+ elif field_to_mod == 'dl_src':
+ act = action.action_set_dl_src()
+ act.dl_addr = parse.parse_mac(mod_field_vals['dl_src'])
+ elif field_to_mod == 'dl_vlan_enable':
+ if not mod_field_vals['dl_vlan_enable']: # Strip VLAN tag
+ act = action.action_strip_vlan()
+ # Add VLAN tag is handled by dl_vlan field
+ # Will return None in this case
+ elif field_to_mod == 'dl_vlan':
+ act = action.action_set_vlan_vid()
+ act.vlan_vid = mod_field_vals['dl_vlan']
+ elif field_to_mod == 'dl_vlan_pcp':
+ act = action.action_set_vlan_pcp()
+ act.vlan_pcp = mod_field_vals['dl_vlan_pcp']
+ elif field_to_mod == 'ip_src':
+ act = action.action_set_nw_src()
+ act.nw_addr = parse.parse_ip(mod_field_vals['ip_src'])
+ elif field_to_mod == 'ip_dst':
+ act = action.action_set_nw_dst()
+ act.nw_addr = parse.parse_ip(mod_field_vals['ip_dst'])
+ elif field_to_mod == 'ip_tos':
+ act = action.action_set_nw_tos()
+ act.nw_tos = mod_field_vals['ip_tos']
+ elif field_to_mod == 'tcp_sport':
+ act = action.action_set_tp_src()
+ act.tp_port = mod_field_vals['tcp_sport']
+ elif field_to_mod == 'tcp_dport':
+ act = action.action_set_tp_dst()
+ act.tp_port = mod_field_vals['tcp_dport']
+ else:
+ parent.assertTrue(0, "Unknown field to modify: " + str(field_to_mod))
+
+ return act
+
+def pkt_action_setup(parent, start_field_vals={}, mod_field_vals={},
+ mod_fields={}, check_test_params=False):
+ """
+ Set up the ingress and expected packet and action list for a test
+
+ @param parent Must implement, assertTrue, config hash and logger
+ @param start_field_values Field values to use for ingress packet (optional)
+ @param mod_field_values Field values to use for modified packet (optional)
+ @param mod_fields The list of fields to be modified by the switch in the test.
+ @params check_test_params If True, will check the parameters vid, add_vlan
+ and strip_vlan from the command line.
+
+ Returns a triple: pkt-to-send, expected-pkt, action-list
+ """
+
+ new_actions = []
+
+
+ base_pkt_params = {}
+ base_pkt_params['pktlen'] = 100
+ base_pkt_params['dl_dst'] = '00:DE:F0:12:34:56'
+ base_pkt_params['dl_src'] = '00:23:45:67:89:AB'
+ base_pkt_params['dl_vlan_enable'] = False
+ base_pkt_params['dl_vlan'] = 2
+ base_pkt_params['dl_vlan_pcp'] = 0
+ base_pkt_params['ip_src'] = '192.168.0.1'
+ base_pkt_params['ip_dst'] = '192.168.0.2'
+ base_pkt_params['ip_tos'] = 0
+ base_pkt_params['tcp_sport'] = 1234
+ base_pkt_params['tcp_dport'] = 80
+ for keyname in start_field_vals.keys():
+ base_pkt_params[keyname] = start_field_vals[keyname]
+
+ mod_pkt_params = {}
+ mod_pkt_params['pktlen'] = 100
+ mod_pkt_params['dl_dst'] = '00:21:0F:ED:CB:A9'
+ mod_pkt_params['dl_src'] = '00:ED:CB:A9:87:65'
+ mod_pkt_params['dl_vlan_enable'] = False
+ mod_pkt_params['dl_vlan'] = 3
+ mod_pkt_params['dl_vlan_pcp'] = 7
+ mod_pkt_params['ip_src'] = '10.20.30.40'
+ mod_pkt_params['ip_dst'] = '50.60.70.80'
+ mod_pkt_params['ip_tos'] = 0xf0
+ mod_pkt_params['tcp_sport'] = 4321
+ mod_pkt_params['tcp_dport'] = 8765
+ for keyname in mod_field_vals.keys():
+ mod_pkt_params[keyname] = mod_field_vals[keyname]
+
+ # Check for test param modifications
+ strip = False
+ if check_test_params:
+ add_vlan = test_param_get(parent.config, 'add_vlan')
+ strip_vlan = test_param_get(parent.config, 'strip_vlan')
+ vid = test_param_get(parent.config, 'vid')
+
+ if add_vlan and strip_vlan:
+ parent.assertTrue(0, "Add and strip VLAN both specified")
+
+ if vid:
+ base_pkt_params['dl_vlan_enable'] = True
+ base_pkt_params['dl_vlan'] = vid
+ if 'dl_vlan' in mod_fields:
+ mod_pkt_params['dl_vlan'] = vid + 1
+
+ if add_vlan:
+ base_pkt_params['dl_vlan_enable'] = False
+ mod_pkt_params['dl_vlan_enable'] = True
+ mod_pkt_params['pktlen'] = base_pkt_params['pktlen'] + 4
+ mod_fields.append('pktlen')
+ mod_fields.append('dl_vlan_enable')
+ if 'dl_vlan' not in mod_fields:
+ mod_fields.append('dl_vlan')
+ elif strip_vlan:
+ base_pkt_params['dl_vlan_enable'] = True
+ mod_pkt_params['dl_vlan_enable'] = False
+ mod_pkt_params['pktlen'] = base_pkt_params['pktlen'] - 4
+ mod_fields.append('dl_vlan_enable')
+ mod_fields.append('pktlen')
+
+ # Build the ingress packet
+ ingress_pkt = simple_tcp_packet(**base_pkt_params)
+
+ # Build the expected packet, modifying the indicated fields
+ for item in mod_fields:
+ base_pkt_params[item] = mod_pkt_params[item]
+ act = action_generate(parent, item, mod_pkt_params)
+ if act:
+ new_actions.append(act)
+
+ expected_pkt = simple_tcp_packet(**base_pkt_params)
+
+ return (ingress_pkt, expected_pkt, new_actions)
+