blob: 17da936d84bfb721a0d8d9211b8146ff03ee849f [file] [log] [blame]
Rich Lane93820592013-10-24 11:20:23 -07001# Distributed under the OpenFlow Software License (see LICENSE)
2# Copyright (c) 2012, 2013 Big Switch Networks, Inc.
3"""
4Packet-in match test cases
5
6Checks the match sent in packet-in messages.
7"""
8
9import logging
10
11from oftest import config
12import oftest.base_tests as base_tests
13import ofp
14
15from oftest.testutils import *
16
17class PktinMatchTest(base_tests.SimpleDataPlane):
18 """
19 Base class for packet-in match tests
20 """
21
22 def setUp(self):
23 base_tests.SimpleDataPlane.setUp(self)
24 delete_all_flows(self.controller)
25
26 def verify_pktin_match(self, pkt, expected_oxm, optional=False):
27 """
28 Cause a packet-in and verify that the expected OXM is present
29 (unless optional) and equal
30 """
31
32 in_port, = openflow_ports(1)
33 pktstr = str(pkt)
34
35 logging.debug("Inserting match-all flow sending packets to controller")
36 request = ofp.message.flow_add(
Wilson Ng42df57a2013-10-28 17:54:57 -070037 table_id=test_param_get_table(),
Rich Lane93820592013-10-24 11:20:23 -070038 instructions=[
39 ofp.instruction.apply_actions(
40 actions=[
41 ofp.action.output(
42 port=ofp.OFPP_CONTROLLER,
43 max_len=ofp.OFPCML_NO_BUFFER)])],
44 buffer_id=ofp.OFP_NO_BUFFER,
45 priority=0)
46 self.controller.message_send(request)
47 do_barrier(self.controller)
48
49 logging.debug("Sending packet")
50 self.dataplane.send(in_port, pktstr)
51
52 logging.debug("Expecting packet-in")
53 msg = verify_packet_in(self, pktstr, in_port, ofp.OFPR_NO_MATCH)
54 oxms = { type(oxm): oxm for oxm in msg.match.oxm_list }
55 oxm = oxms.get(type(expected_oxm))
56 if oxm:
57 self.assertEquals(oxm, expected_oxm, "Received %s != expected %s" % (oxm.show(), expected_oxm.show()))
58 elif optional:
59 logging.info("Optional OXM not received")
60 else:
61 raise AssertionError("Required OXM not received")
62
63class VlanAbsent(PktinMatchTest):
64 """
65 Absent VLAN tag
66 """
67 def runTest(self):
68 self.verify_pktin_match(
69 simple_tcp_packet(),
70 ofp.oxm.vlan_vid(0),
71 optional=True)
72
73class VlanVid(PktinMatchTest):
74 """
75 VLAN tag
76 """
77 def runTest(self):
78 self.verify_pktin_match(
79 simple_tcp_packet(dl_vlan_enable=True, vlan_vid=1),
80 ofp.oxm.vlan_vid(ofp.OFPVID_PRESENT|1),
81 optional=True)