Merge branch 'master' of github.com:floodlight/oftest
diff --git a/src/python/oftest/controller.py b/src/python/oftest/controller.py
index 70b3aad..9e3354d 100644
--- a/src/python/oftest/controller.py
+++ b/src/python/oftest/controller.py
@@ -40,6 +40,26 @@
import select
import logging
+
+FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.'
+ for x in range(256)])
+
+def hex_dump_buffer(src, length=16):
+ """
+ Convert src to a hex dump string and return the string
+ @param src The source buffer
+ @param length The number of bytes shown in each line
+ @returns A string showing the hex dump
+ """
+ result = ["\n"]
+ for i in xrange(0, len(src), length):
+ chars = src[i:i+length]
+ hex = ' '.join(["%02x" % ord(x) for x in chars])
+ printable = ''.join(["%s" % ((ord(x) <= 127 and
+ FILTER[ord(x)]) or '.') for x in chars])
+ result.append("%04x %-*s %s\n" % (i, length*3, hex, printable))
+ return ''.join(result)
+
##@todo Find a better home for these identifiers (controller)
RCV_SIZE_DEFAULT = 32768
LISTEN_QUEUE_SIZE = 1
@@ -63,11 +83,9 @@
@var rcv_size The receive size to use for receive calls
@var max_pkts The max size of the receive queue
@var keep_alive If true, listen for echo requests and respond w/
- @var keep_alive If true, listen for echo requests and respond w/
echo replies
@var initial_hello If true, will send a hello message immediately
upon connecting to the switch
- @var exit_on_reset If true, terminate controller on connection reset
@var host The host to use for connect
@var port The port to connect on
@var packets_total Total number of packets received
@@ -102,7 +120,6 @@
self.keep_alive = False
self.active = True
self.initial_hello = True
- self.exit_on_reset = True
# Settings
self.max_pkts = max_pkts
@@ -111,7 +128,11 @@
self.port = port
self.dbg_state = "init"
self.logger = logging.getLogger("controller")
- self.barrier_to = 15 # Barrier timeout default value; add to config
+ self.filter_packet_in = False # Drop "excessive" packet ins
+ self.pkt_in_run = 0 # Count on run of packet ins
+ self.pkt_in_filter_limit = 50 # Count on run of packet ins
+ self.pkt_in_dropped = 0 # Total dropped packet ins
+ self.transact_to = 15 # Transact timeout default value; add to config
# Transaction and message type waiting variables
# xid_cv: Condition variable (semaphore) for packet waiters
@@ -132,6 +153,37 @@
self.expect_msg_response = None
self.buffered_input = ""
+ def filter_packet(self, rawmsg, hdr):
+ """
+ Check if packet should be filtered
+
+ Currently filters packet in messages
+ @return Boolean, True if packet should be dropped
+ """
+ # Add check for packet in and rate limit
+ if self.filter_packet_in:
+ # If this is a packet in and not expecting a packet in
+ if ((not self.expect_msg_type or
+ (self.expect_msg_type != hdr.type)) and
+ hdr.type == OFPT_PACKET_IN):
+
+ self.pkt_in_run += 1
+ # Check if limit exceeded
+ if self.pkt_in_run > self.pkt_in_filter_limit:
+ self.pkt_in_dropped += 1
+ return True
+
+ else: # Not in filtering mode
+ # If we were dropping packets, report number dropped
+ if self.pkt_in_run > self.pkt_in_filter_limit:
+ self.logger.debug("Dropped %d packet ins (%d total)"
+ % ((self.pkt_in_run -
+ self.pkt_in_filter_limit),
+ self.pkt_in_dropped))
+ self.pkt_in_run = 0
+
+ return False
+
def _pkt_handle(self, pkt):
"""
Check for all packet handling conditions
@@ -157,23 +209,26 @@
while offset < len(pkt):
# Parse the header to get type
hdr = of_header_parse(pkt[offset:])
- if not hdr:
- self.logger.info("Could not parse header, pkt len", len(pkt))
- self.parse_errors += 1
- return
- if hdr.length == 0:
- self.logger.error("Header length is zero; out of sync")
- self.parse_errors += 1
+ if not hdr or hdr.length == 0:
+ self.logger.error("Could not parse header")
+ self.logger.error("pkt len %d." % len(pkt))
+ if hdr:
+ self.logger.error("hdr len %d." % hdr.length)
+ self.logger.error("%s" % hex_dump_buffer(pkt[:200]))
+ self.kill()
return
# Extract the raw message bytes
if (offset + hdr.length) > len(pkt):
break
rawmsg = pkt[offset : offset + hdr.length]
-
- self.logger.debug("Msg in: len %d. offset %d. type %s. hdr.len %d" %
- (len(pkt), offset, ofp_type_map[hdr.type], hdr.length))
offset += hdr.length
+
+ if self.filter_packet(rawmsg, hdr):
+ continue
+
+ self.logger.debug("Msg in: buf len %d. hdr.type %s. hdr.len %d" %
+ (len(pkt), ofp_type_map[hdr.type], hdr.length))
if hdr.version != OFP_VERSION:
self.logger.error("Version %d does not match OFTest version %d"
% (hdr.version, OFP_VERSION))
@@ -181,7 +236,7 @@
(hdr.version, OFP_VERSION)
self.active = False
self.switch_socket = None
- self.kill()
+ return
msg = of_message_parse(rawmsg)
if not msg:
@@ -228,7 +283,8 @@
rep = echo_reply()
rep.header.xid = hdr.xid
# Ignoring additional data
- self.message_send(rep.pack(), zero_xid=True)
+ if self.message_send(rep.pack(), zero_xid=True) < 0:
+ self.logger.error("Error sending echo reply")
continue
# Now check for message handlers; preference is given to
@@ -259,44 +315,53 @@
def _socket_ready_handle(self, s):
"""
Handle an input-ready socket
+
@param s The socket object that is ready
- @retval True, reset the switch connection
+ @returns 0 on success, -1 on error
"""
- if s == self.listen_socket:
+ if s and s == self.listen_socket:
if self.switch_socket:
- return False
+ return 0 # Ignore listen socket while connected to switch
(self.switch_socket, self.switch_addr) = \
self.listen_socket.accept()
self.logger.info("Got cxn to " + str(self.switch_addr))
+ self.socs.append(self.switch_socket)
# Notify anyone waiting
self.connect_cv.acquire()
self.connect_cv.notify()
self.connect_cv.release()
- self.socs.append(self.switch_socket)
if self.initial_hello:
self.message_send(hello())
- elif s == self.switch_socket:
- try:
- pkt = self.switch_socket.recv(self.rcv_size)
- except:
- self.logger.warning("Error on switch read")
- return True
+ ## @fixme Check return code
+ elif s and s == self.switch_socket:
+ for idx in range(3): # debug: try a couple of times
+ try:
+ pkt = self.switch_socket.recv(self.rcv_size)
+ except:
+ self.logger.warning("Error on switch read")
+ return -1
+
+ if not self.active:
+ return 0
+
+ if len(pkt) == 0:
+ self.logger.warning("Zero-length switch read, %d" % idx)
+ else:
+ break
- if not self.active:
- return False
-
- if len(pkt) == 0:
+ if len(pkt) == 0: # Still no packet
self.logger.warning("Zero-length switch read; closing cxn")
- return True
+ self.logger.info(str(self))
+ return -1
self._pkt_handle(pkt)
else:
self.logger.error("Unknown socket ready: " + str(s))
- return True
+ return -1
- return False
+ return 0
def run(self):
"""
@@ -329,42 +394,25 @@
self.socs = [self.listen_socket]
self.dbg_state = "running"
while self.active:
- reset_switch_cxn = False
try:
sel_in, sel_out, sel_err = \
select.select(self.socs, [], self.socs, 1)
except:
print sys.exc_info()
self.logger.error("Select error, exiting")
- sys.exit(1)
-
- if not self.active:
+ self.active = False
break
- for s in sel_in:
- reset_switch_cxn = self._socket_ready_handle(s)
-
for s in sel_err:
self.logger.error("Got socket error on: " + str(s))
- if s == self.switch_socket:
- reset_switch_cxn = True
- else:
- self.logger.error("Socket error; exiting")
+ self.active = False
+ break
+
+ for s in sel_in:
+ if self._socket_ready_handle(s) == -1:
self.active = False
break
- if self.active and reset_switch_cxn:
- if self.exit_on_reset:
- self.kill()
- else:
- self.logger.warning("Closing switch cxn")
- try:
- self.switch_socket.close()
- except:
- pass
- self.switch_socket = None
- self.socs = self.socs[0:1]
-
# End of main loop
self.dbg_state = "closing"
self.logger.info("Exiting controller thread")
@@ -404,6 +452,7 @@
@todo Might want to synchronize shutdown with self.sync...
"""
+
self.active = False
try:
self.switch_socket.shutdown(socket.SHUT_RDWR)
@@ -416,6 +465,16 @@
except:
self.logger.info("Ignoring listen soc shutdown error")
self.listen_socket = None
+
+ # Release condition variables on which controller may be wait
+ self.xid_cv.acquire()
+ self.xid_cv.notifyAll()
+ self.xid_cv.release()
+
+ self.connect_cv.acquire()
+ self.connect_cv.notifyAll()
+ self.connect_cv.release()
+
self.dbg_state = "down"
def register(self, msg_type, handler):
@@ -512,7 +571,8 @@
received message handling.
@param msg The message object to send; must not be a string
- @param timeout The timeout in seconds (?)
+ @param timeout The timeout in seconds; if -1 use default. if None
+ blocks without time out
@param zero_xid Normally, if the XID is 0 an XID will be generated
for the message. Set xero_xid to override this behavior
@return The matching message object or None if unsuccessful
@@ -523,7 +583,7 @@
msg.header.xid = gen_xid()
if timeout == -1:
- timeout = self.barrier_to
+ timeout = self.transact_to
self.logger.debug("Running transaction %d" % msg.header.xid)
self.xid_cv.acquire()
if self.xid:
@@ -533,7 +593,11 @@
self.xid = msg.header.xid
self.xid_response = None
- self.message_send(msg.pack())
+ if self.message_send(msg.pack()) < 0:
+ self.logger.error("Error sending pkt for transaction %d" %
+ msg.header.xid)
+ return (None, None)
+
self.logger.debug("Waiting for transaction %d" % msg.header.xid)
self.xid_cv.wait(timeout)
if self.xid_response:
@@ -600,6 +664,8 @@
string += " host " + str(self.host) + "\n"
string += " port " + str(self.port) + "\n"
string += " keep_alive " + str(self.keep_alive) + "\n"
+ string += " pkt_in_run " + str(self.pkt_in_run) + "\n"
+ string += " pkt_in_dropped " + str(self.pkt_in_dropped) + "\n"
return string
def show(self):
diff --git a/tests/load.py b/tests/load.py
new file mode 100644
index 0000000..d168a17
--- /dev/null
+++ b/tests/load.py
@@ -0,0 +1,134 @@
+"""
+Prototype test cases related to operation under load
+
+It is recommended that these definitions be kept in their own
+namespace as different groups of tests will likely define
+similar identifiers.
+
+ The function test_set_init is called with a complete configuration
+dictionary prior to the invocation of any tests from this file.
+
+ The switch is actively attempting to contact the controller at the address
+indicated in oft_config
+
+In general these test cases make some assumption about the external
+configuration of the switch under test. For now, the assumption is
+that the first two OF ports are connected by a loopback cable.
+"""
+
+import copy
+
+import logging
+
+import unittest
+
+import oftest.controller as controller
+import oftest.cstruct as ofp
+import oftest.message as message
+import oftest.dataplane as dataplane
+import oftest.action as action
+import oftest.parse as parse
+import basic
+import time
+
+from testutils import *
+
+#@var load_port_map Local copy of the configuration map from OF port
+# numbers to OS interfaces
+load_port_map = None
+#@var load_logger Local logger object
+load_logger = None
+#@var load_config Local copy of global configuration data
+load_config = None
+
+# For test priority
+#@var test_prio Set test priority for local tests
+test_prio = {}
+
+
+def test_set_init(config):
+ """
+ Set up function for packet action test classes
+
+ @param config The configuration dictionary; see oft
+ """
+
+ global load_port_map
+ global load_logger
+ global load_config
+
+ load_logger = logging.getLogger("load")
+ load_logger.info("Initializing test set")
+ load_port_map = config["port_map"]
+ load_config = config
+
+class LoadBarrier(basic.SimpleProtocol):
+ """
+ Test barrier under load with loopback
+
+ This test assumes there is a pair of ports on the switch with
+ a loopback cable connected and that spanning tree is disabled.
+ A flow is installed to cause a storm of packet-in messages
+ when a packet is sent to the loopbacked interface. After causing
+ this storm, a barrier request is sent.
+
+ The test succeeds if the barrier response is received. Otherwise
+ the test fails.
+ """
+ def runTest(self):
+ # Set up flow to send from port 1 to port 2 and copy to CPU
+ # Test parameter gives LB port base (assumes consecutive)
+ lb_port = test_param_get(self.config, 'lb_port', default=1)
+ barrier_count = test_param_get(self.config, 'barrier_count',
+ default=10)
+
+ # Set controller to filter packet ins
+ self.controller.filter_packet_in = True
+ self.controller.pkt_in_filter_limit = 10
+
+ pkt = simple_tcp_packet()
+ match = parse.packet_to_flow_match(pkt)
+ match.wildcards &= ~ofp.OFPFW_IN_PORT
+ match.in_port = lb_port
+ act = action.action_output()
+ act.port = lb_port + 1
+
+ request = message.flow_mod()
+ request.match = match
+ request.hard_timeout = 2 * barrier_count
+
+ request.buffer_id = 0xffffffff
+ self.assertTrue(request.actions.add(act), "Could not add action")
+
+ act = action.action_output()
+ act.port = ofp.OFPP_CONTROLLER
+ self.assertTrue(request.actions.add(act), "Could not add action")
+
+ rv = self.controller.message_send(request)
+ self.assertTrue(rv != -1, "Error installing flow mod")
+ self.assertEqual(do_barrier(self.controller), 0, "Barrier failed")
+
+ # Create packet out and send to port lb_port + 1
+ msg = message.packet_out()
+ msg.data = str(pkt)
+ act = action.action_output()
+ act.port = lb_port + 1
+ self.assertTrue(msg.actions.add(act), 'Could not add action to msg')
+ load_logger.info("Sleeping before starting storm")
+ time.sleep(1) # Root causing issue with fast disconnects
+ load_logger.info("Sending packet out to %d" % (lb_port + 1))
+ rv = self.controller.message_send(msg)
+ self.assertTrue(rv == 0, "Error sending out message")
+
+ for idx in range(0, barrier_count):
+ self.assertEqual(do_barrier(self.controller), 0, "Barrier failed")
+ # To do: Add some interesting functionality here
+ load_logger.info("Barrier %d completed" % idx)
+
+ # Clear the flow table when done
+ load_logger.debug("Deleting all flows from switch")
+ rc = delete_all_flows(self.controller, load_logger)
+ self.assertEqual(rc, 0, "Failed to delete all flows")
+
+# Do not run by default; still mysterious disconnects often
+test_prio["LoadBarrier"] = -1
diff --git a/tests/oft b/tests/oft
index 828eefa..c834b8e 100755
--- a/tests/oft
+++ b/tests/oft
@@ -417,11 +417,14 @@
# Check if test list is requested; display and exit if so
if config["list"]:
did_print = False
+ mod_count = 0
+ test_count = 0
print "\nTest List:"
for mod in config["all_tests"].keys():
if config["test_spec"] != "all" and \
config["test_spec"] != mod.__name__:
continue
+ mod_count += 1
did_print = True
desc = mod.__doc__.strip()
desc = desc.split('\n')[0]
@@ -440,10 +443,14 @@
if len(start_str) > 22:
desc = "\n" + _space_to(22, "") + desc
print start_str + _space_to(22, start_str) + desc
+ test_count += 1
print
if not did_print:
print "No tests found for " + config["test_spec"]
else:
+ print "%d modules shown with a total of %d tests" % \
+ (mod_count, test_count)
+ print
print "Tests preceded by * are not run by default"
print "Tests marked (TP1) after name take --test-params including:"
print " 'vid=N;strip_vlan=bool;add_vlan=bool'"
diff --git a/tests/pktact.py b/tests/pktact.py
index 4598262..a686dbd 100644
--- a/tests/pktact.py
+++ b/tests/pktact.py
@@ -874,9 +874,10 @@
- def installFlow(self, prio, inp, egp):
+ def installFlow(self, prio, inp, egp,
+ wildcards=ofp.OFPFW_DL_SRC):
request = flow_msg_create(self, self.pkt, ing_port=inp,
- wildcards=ofp.OFPFW_DL_SRC,
+ wildcards=wildcards,
egr_ports=egp)
request.priority = prio
flow_msg_install(self, request, clear_table_override=False)
@@ -895,13 +896,16 @@
raise Exception("Not initialized")
- def verifyFlow(self, inp, egp):
+ def verifyFlow(self, inp, egp, pkt=None):
+ if pkt == None:
+ pkt = self.pkt
+
self.logger.info("Pkt match test: " + str(inp) +
" to " + str(egp))
self.logger.debug("Send packet: " + str(inp) + " to "
+ str(egp))
- self.dataplane.send(inp, str(self.pkt))
- receive_pkt_verify(self, egp, self.pkt, inp)
+ self.dataplane.send(inp, str(pkt))
+ receive_pkt_verify(self, egp, pkt, inp)
@@ -930,7 +934,26 @@
+class WildcardPriority(SingleWildcardMatchPriority):
+
+ def runTest(self):
+ self._Init()
+
+ of_ports = pa_port_map.keys()
+ of_ports.sort()
+
+ self._ClearTable()
+ # Install a flow with no wildcards for our packet:
+ self.installFlow(1000, of_ports[0], of_ports[1], wildcards=0)
+ self.verifyFlow(of_ports[0], of_ports[1])
+ # Install a flow with wildcards for our packet with higher
+ # priority.
+ self.installFlow(1001, of_ports[0], of_ports[2])
+ self.verifyFlow(of_ports[0], of_ports[2])
+
+
+
class SingleWildcardMatch(BaseMatchCase):
"""
Exercise wildcard matching for all ports
diff --git a/tests/testutils.py b/tests/testutils.py
index 9c7b81f..68ee343 100644
--- a/tests/testutils.py
+++ b/tests/testutils.py
@@ -174,8 +174,8 @@
Return 0 on success, -1 on error
"""
b = message.barrier_request()
- (resp, pkt) = ctrl.transact(b, timeout=ctrl.barrier_to)
- # We'll trust the transaction processing in the controller
+ (resp, pkt) = ctrl.transact(b)
+ # We'll trust the transaction processing in the controller that xid matched
if not resp:
return -1
return 0
@@ -838,7 +838,7 @@
@param length The number of bytes shown in each line
@returns A string showing the hex dump
"""
- result = []
+ result = ["\n"]
for i in xrange(0, len(src), length):
chars = src[i:i+length]
hex = ' '.join(["%02x" % ord(x) for x in chars])
diff --git a/tools/ovs-ctl/ovs-ctl.py b/tools/ovs-ctl/ovs-ctl.py
index 5df57d9..4d4f100 100755
--- a/tools/ovs-ctl/ovs-ctl.py
+++ b/tools/ovs-ctl/ovs-ctl.py
@@ -178,8 +178,19 @@
use this option""",
action='store_true', default=False)
+gParser.add_argument('-lb', '--loopback',
+ help="Create a loopback pair. The port numbers are port_count+1 and port_count+2.",
+ default=False, action='store_true')
+gParser.add_argument("--cli",
+ help="Run the ovs-ctl cli after initialization",
+ action='store_true', default=False)
+
+gParser.add_argument("--teardown",
+ help="Kill OVS instance after CLI exits",
+ action='store_true', default=False)
+
#
# Reset defaults based on config files and override
#
@@ -374,22 +385,26 @@
print gServerPid
vsctl(["del-br", gArgs.bridge])
-# Kill existing DB/vswitchd
-killp(gSwitchPid)
-killp(gServerPid)
-killp(gLogPid)
-# Remove old logpid file, since this does not happen automagically
-if os.path.exists(gLogPid):
- os.remove(gLogPid)
+def killall():
+ # Kill existing DB/vswitchd
+ killp(gSwitchPid)
+ killp(gServerPid)
+ killp(gLogPid)
-if gArgs.keep_veths == False:
- lcall(['/sbin/rmmod', 'veth'])
- lcall(['/sbin/modprobe', 'veth'])
+ # Remove old logpid file, since this does not happen automagically
+ if os.path.exists(gLogPid):
+ os.remove(gLogPid)
-# Remove kmod
-lcall(['/sbin/rmmod', gArgs.ovs_kmod])
+ if gArgs.keep_veths == False:
+ lcall(['/sbin/rmmod', 'veth'])
+ lcall(['/sbin/modprobe', 'veth'])
+ # Remove kmod
+ lcall(['/sbin/rmmod', gArgs.ovs_kmod])
+
+
+killall()
if gArgs.kill == True:
# Don't do anything else
sys.exit()
@@ -400,8 +415,11 @@
# Insert openvswitch module
lcall(['/sbin/insmod', gArgs.ovs_kmod])
-createVeths(gArgs.port_count)
-vethsUp(gArgs.port_count)
+port_count = gArgs.port_count
+if gArgs.loopback:
+ port_count += 1
+createVeths(port_count)
+vethsUp(port_count)
if not os.path.exists(gArgs.ovs_db_file) or gArgs.keep_db == False:
print "Initializing DB @ %s" % (gArgs.ovs_db_file)
@@ -450,6 +468,11 @@
for idx in range(0, gArgs.port_count):
vsctl(["add-port", gArgs.bridge, "veth%s" % (idx*2)])
+# Check if loopback port added
+if gArgs.loopback:
+ lb_idx = gArgs.port_count * 2
+ vsctl(["add-port", gArgs.bridge, "veth%d" % (lb_idx)])
+ vsctl(["add-port", gArgs.bridge, "veth%d" % (lb_idx+1)])
# Set controller
vsctl(["set-controller", gArgs.bridge, "tcp:%s:%s" % (
@@ -463,3 +486,30 @@
ofctl(["show", gArgs.bridge])
+if gArgs.cli:
+ while True:
+ cmd = raw_input("[%s] ovs-ctl> " % gConfigSection)
+ if cmd and cmd != "":
+ args = cmd.split(" ")
+ if args[0] == "vsctl" or args[0] == "ovs-vsctl":
+ vsctl(args[1:])
+ elif args[0] == "ofctl" or args[0] == "ovs-ofctl":
+ ofctl(args[1:])
+ elif args[0] == "exit" or args[0] == "quit":
+ break;
+ elif args[0] == "kill":
+ gArgs.teardown = True
+ break
+ else:
+ print "unknown command '%s'" % args[0]
+
+
+if gArgs.teardown:
+ print "Killing OVS"
+ killall()
+
+
+
+
+
+