Major overhaul of oftest command interface
Added tests/oft as top level executable
Support command line options for many config params
Use logging module for output
Got rid of oft_config.py; consolidate configuration in
oft (top level script) and pass around as a dictionary
Add oft_assert.py (the one useful piece of oft_config that
remained).
diff --git a/src/python/oftest/controller.py b/src/python/oftest/controller.py
index 2b47d2a..479a35b 100644
--- a/src/python/oftest/controller.py
+++ b/src/python/oftest/controller.py
@@ -16,7 +16,6 @@
Callbacks and polling support specifying the message type
@todo Support transaction semantics via xid
-@todo Set up reasonable logging facility
@todo Support select and listen on an administrative socket (or
use a timeout to support clean shutdown).
@@ -27,7 +26,6 @@
"""
-from oft_config import *
import os
import socket
import time
@@ -40,6 +38,11 @@
# For some reason, it seems select to be last (or later).
# Otherwise get an attribute error when calling select.select
import select
+import logging
+
+##@todo Find a better home for these identifiers (controller)
+RCV_SIZE_DEFAULT = 4096
+LISTEN_QUEUE_SIZE = 1
class Controller(Thread):
"""
@@ -71,7 +74,7 @@
@var dbg_state Debug indication of state
"""
- def __init__(self, max_pkts=1024):
+ def __init__(self, host='127.0.0.1', port=6633, max_pkts=1024):
Thread.__init__(self)
# Socket related
self.rcv_size = RCV_SIZE_DEFAULT
@@ -101,11 +104,10 @@
# Settings
self.max_pkts = max_pkts
self.passive = True
- self.host = controller_host
- self.port = controller_port
+ self.host = host
+ self.port = port
self.dbg_state = "init"
- self.debug_level = debug_level_default
- # self.debug_level = DEBUG_VERBOSE
+ self.logger = logging.getLogger("controller")
# Transaction and message type waiting variables
# xid_cv: Condition variable (semaphore) for packet waiters
@@ -125,9 +127,6 @@
self.expect_msg_type = None
self.expect_msg_response = None
- def dbg(self, level, string):
- debug_log("CTRL", self.debug_level, level, string)
-
def _socket_ready_handle(self, s):
"""
Handle an input-ready socket
@@ -137,12 +136,12 @@
if s == self.listen_socket:
if self.switch_socket:
- self.dbg(DEBUG_ERROR, "Multiple switch cxns not supported")
+ self.logger.error("Multiple switch cxns not supported")
sys.exit(1)
(self.switch_socket, self.switch_addr) = \
self.listen_socket.accept()
- self.dbg(DEBUG_INFO, "Got cxn to " + str(self.switch_addr))
+ self.logger.info("Got cxn to " + str(self.switch_addr))
# Notify anyone waiting
self.connect_cv.acquire()
self.connect_cv.notify()
@@ -154,14 +153,14 @@
try:
pkt = self.switch_socket.recv(self.rcv_size)
except:
- self.dbg(DEBUG_INFO, "error on switch read")
+ self.logger.info("Error on switch read")
return True
if not self.active:
return False
if len(pkt) == 0:
- self.dbg(DEBUG_INFO, "zero-len pkt in")
+ self.logger.info("zero-len pkt in")
return True
(handled, msg) = self._pkt_handler_check(pkt)
@@ -178,7 +177,7 @@
self.packets_total += 1
self.sync.release()
else:
- self.dbg(DEBUG_ERROR, "Unknown socket ready: " + str(s))
+ self.logger.error("Unknown socket ready: " + str(s))
return True
return False
@@ -201,7 +200,7 @@
self.dbg_state = "starting"
# Create listen socket
- self.dbg(DEBUG_INFO, "Create/listen at " + self.host + ":" +
+ self.logger.info("Create/listen at " + self.host + ":" +
str(self.port))
self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.listen_socket.setsockopt(socket.SOL_SOCKET,
@@ -210,7 +209,7 @@
self.dbg_state = "listening"
self.listen_socket.listen(LISTEN_QUEUE_SIZE)
- self.dbg(DEBUG_INFO, "Waiting for switch connection")
+ self.logger.info("Waiting for switch connection")
self.socs = [self.listen_socket]
self.dbg_state = "running"
while self.active:
@@ -220,7 +219,7 @@
select.select(self.socs, [], self.socs, 1)
except:
print sys.exc_info()
- self.dbg(DEBUG_ERROR, "Select error, exiting")
+ self.logger.error("Select error, exiting")
sys.exit(1)
if not self.active:
@@ -230,16 +229,16 @@
reset_switch_cxn = self._socket_ready_handle(s)
for s in sel_err:
- self.dbg(DEBUG_ERROR, "Got socket error on: " + str(s))
+ self.logger.error("Got socket error on: " + str(s))
if s == self.switch_socket:
reset_switch_cxn = True
else:
- self.dbg(DEBUG_ERROR, "Socket error; exiting")
+ self.logger.error("Socket error; exiting")
self.active = False
break
if self.active and reset_switch_cxn:
- self.dbg(DEBUG_INFO, "Closing switch cxn")
+ self.logger.info("Closing switch cxn")
try:
self.switch_socket.close()
except:
@@ -249,7 +248,7 @@
# End of main loop
self.dbg_state = "closing"
- self.dbg(DEBUG_INFO, "Exiting controller thread")
+ self.logger.info("Exiting controller thread")
self.shutdown()
def connect(self, timeout=None):
@@ -290,13 +289,13 @@
try:
self.switch_socket.shutdown(socket.SHUT_RDWR)
except:
- self.dbg(DEBUG_INFO, "Ignoring switch soc shutdown error")
+ self.logger.info("Ignoring switch soc shutdown error")
self.switch_socket = None
try:
self.listen_socket.shutdown(socket.SHUT_RDWR)
except:
- self.dbg(DEBUG_INFO, "Ignoring listen soc shutdown error")
+ self.logger.info("Ignoring listen soc shutdown error")
self.listen_socket = None
self.dbg_state = "down"
@@ -314,18 +313,16 @@
# Parse the header to get type
hdr = of_header_parse(pkt)
if not hdr:
- self.dbg(DEBUG_INFO, "Could not parse header, pkt len", len(pkt))
+ self.logger.info("Could not parse header, pkt len", len(pkt))
self.parse_errors += 1
return (True, None)
- # if self.debug_level <= DEBUG_VERBOSE:
- # hdr.show()
- self.dbg(DEBUG_VERBOSE, "message: len %d. type %s. hdr.len %d" %
+ self.logger.debug("message: len %d. type %s. hdr.len %d" %
(len(pkt), ofp_type_map[hdr.type], hdr.length))
msg = of_message_parse(pkt)
if not msg:
self.parse_errors += 1
- self.dbg(DEBUG_WARN, "Could not parse message")
+ self.logger.warn("Could not parse message")
return (True, None)
# Check if transaction is waiting
@@ -342,7 +339,7 @@
# Check if anyone waiting on this type of message
self.expect_msg_cv.acquire()
if self.expect_msg:
- if not self.expect_msg_type or self.expect_msg_type == hdr.type:
+ if not self.expect_msg_type or (self.expect_msg_type == hdr.type):
self.expect_msg_response = (msg, pkt)
self.expect_msg = False
self.expect_msg_cv.notify()
@@ -391,20 +388,24 @@
Wait for the next OF message received from the switch.
@param exp_msg If set, return only when this type of message
- is received.
+ is received (unless timeout occurs).
@param timeout If None, do not block. Otherwise, sleep in
- intervals of 1 second until
+ intervals of 1 second until message is received.
@retval A pair (msg, pkt) where msg is a message object and pkt
the string representing the packet as received from the socket.
This allows additional parsing by the receiver if necessary.
The data members in the message are in host endian order.
- If an error occurs, None is returned
+ If an error occurs, (None, None) is returned
+
+ The current queue is searched for a message of the desired type
+ before sleeping on message in events.
"""
msg = pkt = None
+ self.logger.debug("Poll for " + ofp_type_map[exp_msg])
# First check the current queue
self.sync.acquire()
if len(self.packets) > 0:
@@ -425,20 +426,24 @@
self.sync.release()
return (None, None)
+ msg = pkt = None
+ self.logger.debug("Entering timeout")
# Careful of race condition releasing sync before message cv
# Also, this style is ripe for a lockup.
self.expect_msg_cv.acquire()
self.sync.release()
+ self.expect_msg_response = None
self.expect_msg = True
self.expect_msg_type = exp_msg
self.expect_msg_cv.wait(timeout)
if self.expect_msg_response is not None:
(msg, pkt) = self.expect_msg_response
- self.expect_msg_response = None
self.expect_msg_cv.release()
if msg is None:
- self.dbg(DEBUG_VERBOSE, "poll time out")
+ self.logger.debug("Poll time out")
+ else:
+ self.logger.debug("Got msg " + str(msg))
return (msg, pkt)
@@ -464,8 +469,7 @@
self.xid_cv.acquire()
if self.xid:
self.xid_cv.release()
- self.dbg(DEBUG_ERROR,
- "Can only run one transaction at a time")
+ self.logger.error("Can only run one transaction at a time")
return None
self.xid = msg.header.xid
@@ -494,7 +498,7 @@
if not self.switch_socket:
# Sending a string indicates the message is ready to go
- self.dbg(DEBUG_INFO, "message_send: no socket")
+ self.logger.info("message_send: no socket")
return -1
#@todo If not string, try to pack
if type(msg) != type(""):
@@ -503,17 +507,17 @@
msg.header.xid = gen_xid()
outpkt = msg.pack()
except:
- self.dbg(DEBUG_INFO,
+ self.logger.error(
"message_send: not an OF message or string?")
return -1
else:
outpkt = msg
- self.dbg(DEBUG_INFO, "Sending pkt of len " + str(len(outpkt)))
+ self.logger.debug("Sending pkt of len " + str(len(outpkt)))
if self.switch_socket.sendall(outpkt) is None:
return 0
- self.dbg(DEBUG_ERROR, "Unknown error on sendall")
+ self.logger.error("Unknown error on sendall")
return -1
def __str__(self):
diff --git a/src/python/oftest/dataplane.py b/src/python/oftest/dataplane.py
index 29b7fad..9389d51 100644
--- a/src/python/oftest/dataplane.py
+++ b/src/python/oftest/dataplane.py
@@ -22,13 +22,14 @@
from threading import Thread
from threading import Lock
from threading import Condition
-from oft_config import *
import select
+import logging
+from oft_assert import oft_assert
-#@todo Move these identifiers into config
+##@todo Find a better home for these identifiers (dataplane)
+RCV_SIZE_DEFAULT = 4096
ETH_P_ALL = 0x03
RCV_TIMEOUT = 10000
-RCV_SIZE = 4096
class DataPlanePort(Thread):
"""
@@ -56,21 +57,18 @@
"""
Thread.__init__(self)
self.interface_name = interface_name
- self.debug_level = debug_level_default
self.max_pkts = max_pkts
self.packets_total = 0
self.packets = []
self.packets_discarded = 0
self.port_number = port_number
self.socket = self.interface_open(interface_name)
- self.dbg(DEBUG_INFO, "Openned port monitor socket")
+ logname = "dp-" + interface_name
+ self.logger = logging.getLogger(logname)
+ self.logger.info("Openned port monitor socket")
self.parent = parent
self.pkt_sync = self.parent.pkt_sync
- def dbg(self, level, string):
- debug_log("DPLANE", self.debug_level, level,
- self.interface_name + ": " + string)
-
def interface_open(self, interface_name):
"""
Open a socket in a promiscuous mode for a data connection.
@@ -97,33 +95,30 @@
select.select(self.socs, [], [], 1)
except:
print sys.exc_info()
- self.dbg(DEBUG_ERROR, "Select error, exiting")
- sys.exit(1)
-
- #if not sel_err is None:
- # self.dbg(DEBUG_VERBOSE, "Socket error from select set")
+ self.logger.error("Select error, exiting")
+ break
if not self.running:
break
- if sel_in is None:
+ if (sel_in is None) or (len(sel_in) == 0):
continue
try:
- rcvmsg = self.socket.recv(RCV_SIZE)
+ rcvmsg = self.socket.recv(RCV_SIZE_DEFAULT)
except socket.error:
if not error_warned:
- self.dbg(DEBUG_INFO, "Socket error on recv")
+ self.logger.info("Socket error on recv")
error_warned = True
continue
if len(rcvmsg) == 0:
- self.dbg(DEBUG_INFO, "Zero len pkt rcvd")
+ self.logger.info("Zero len pkt rcvd")
self.kill()
break
rcvtime = time.clock()
- self.dbg(DEBUG_VERBOSE, "Pkt len " + str(len(rcvmsg)) +
+ self.logger.debug("Pkt len " + str(len(rcvmsg)) +
" in at " + str(rcvtime))
# Enqueue packet
@@ -134,29 +129,29 @@
self.packets_discarded += 1
else:
self.parent.packets_pending += 1
- # Check if parent is waiting on this (or any) port
- if self.parent.want_pkt:
- if (not self.parent.want_pkt_port or
+ # Check if parent is waiting on this (or any) port
+ if self.parent.want_pkt:
+ if (not self.parent.want_pkt_port or
self.parent.want_pkt_port == self.port_number):
- self.parent.got_pkt_port = self.port_number
- self.parent.want_pkt = False
- self.parent.want_pkt.notify()
+ self.parent.got_pkt_port = self.port_number
+ self.parent.want_pkt = False
+ self.parent.pkt_sync.notify()
self.packets.append((rcvmsg, rcvtime))
self.packets_total += 1
self.pkt_sync.release()
- self.dbg(DEBUG_INFO, "Thread exit ")
+ self.logger.info("Thread exit ")
def kill(self):
"""
Terminate the running thread
"""
+ self.logger.debug("Port monitor kill")
self.running = False
try:
self.socket.close()
except:
- self.dbg(DEBUG_INFO, "Ignoring dataplane soc shutdown error")
- self.dbg(DEBUG_INFO, "Port monitor exiting")
+ self.logger.info("Ignoring dataplane soc shutdown error")
def dequeue(self, use_lock=True):
"""
@@ -205,8 +200,6 @@
@param packet The packet data to send to the port
@retval The number of bytes sent
"""
- self.dbg(DEBUG_VERBOSE,
- "port sending " + str(len(packet)) + " bytes")
return self.socket.send(packet)
@@ -235,7 +228,6 @@
"""
def __init__(self):
self.port_list = {}
- self.debug_level = debug_level_default
# pkt_sync serves double duty as a regular top level lock and
# as a condition variable
self.pkt_sync = Condition()
@@ -245,9 +237,7 @@
self.want_pkt_port = None # What port required (or None)
self.got_pkt_port = None # On what port received?
self.packets_pending = 0 # Total pkts in all port queues
-
- def dbg(self, level, string):
- debug_log("DPORT", self.debug_level, level, string)
+ self.logger = logging.getLogger("dataplane")
def port_add(self, interface_name, port_number):
"""
@@ -267,12 +257,11 @@
@param port_number The port to send the data to
@param packet Raw packet data to send to port
"""
- self.dbg(DEBUG_VERBOSE,
- "Sending %d bytes to port %d" % (len(packet), port_number))
+ self.logger.debug("Sending %d bytes to port %d" %
+ (len(packet), port_number))
bytes = self.port_list[port_number].send(packet)
if bytes != len(packet):
- self.dbg(DEBUG_ERROR,"Unhandled send error, " +
- "length mismatch %d != %d" %
+ self.logger.error("Unhandled send error, length mismatch %d != %d" %
(bytes, len(packet)))
return bytes
@@ -284,7 +273,7 @@
for port_number in self.port_list.keys():
bytes = self.port_list[port_number].send(packet)
if bytes != len(packet):
- self.dbg(DEBUG_ERROR, "Unhandled send error" +
+ self.logger.error("Unhandled send error" +
", port %d, length mismatch %d != %d" %
(port_number, bytes, len(packet)))
@@ -352,15 +341,15 @@
self.port_list[self.got_pkt_port].dequeue(use_lock=False)
self.pkt_sync.release()
oft_assert(pkt, "Poll: pkt reported, but not found at " +
- self.got_pkt_port)
+ str(self.got_pkt_port))
return self.got_pkt_port, pkt, time
self.pkt_sync.release()
- self.dbg(DEBUG_VERBOSE, "Poll time out, no packet")
+ self.logger.debug("Poll time out, no packet")
return None, None, None
- def kill(self, join_threads=False):
+ def kill(self, join_threads=True):
"""
Close all sockets for dataplane
@param join_threads If True call join on each thread
@@ -368,10 +357,10 @@
for port_number in self.port_list.keys():
self.port_list[port_number].kill()
if join_threads:
- self.dbg(DEBUG_INFO, "Joining " + str(port_number))
+ self.logger.debug("Joining " + str(port_number))
self.port_list[port_number].join()
- self.dbg(DEBUG_INFO, "DataPlane shutdown")
+ self.logger.info("DataPlane shutdown")
def show(self, prefix=''):
print prefix + "Dataplane Controller"
diff --git a/src/python/oftest/oft_assert.py b/src/python/oftest/oft_assert.py
new file mode 100644
index 0000000..d773c84
--- /dev/null
+++ b/src/python/oftest/oft_assert.py
@@ -0,0 +1,28 @@
+"""
+OpenFlow Test Framework
+
+Framework assert definition
+"""
+
+import sys
+import logging
+
+def oft_assert(condition, string):
+ """
+ Test framework assertion check
+
+ @param condition The boolean condition to check
+ @param string String to print if error
+
+ If condition is not true, it is considered a test framework
+ failure and exit is called.
+
+ This assert is meant to represent a violation in the
+ assumptions of how the test framework is supposed to work
+ (for example, an inconsistent packet queue state) rather than
+ a test failure.
+ """
+ if not condition:
+ logging.critical("Internal error: " + string)
+ sys.exit(1)
+
diff --git a/src/python/oftest/oft_config.py b/src/python/oftest/oft_config.py
deleted file mode 100644
index f278846..0000000
--- a/src/python/oftest/oft_config.py
+++ /dev/null
@@ -1,149 +0,0 @@
-"""
-OpenFlow Test Framework
-
-Configuration fragment
-
-This file contains Python code to specify the configuration
-of the system under test.
-
-This is a work in progress. The code below is for illustration only.
-
-The configuration information is extensible in that any
-Python code may be added to this file (or its imports) and will
-be available to test cases.
-
-A platform identifier is given at the top of the file and most
-other information is determined by testing this value. Additional
-files may also be imported based on the platform.
-
-The configuration must specify a mapping from system interfaces
-available to the test framework to OpenFlow port numbers on the
-switch under test. This is done in the interface_ofport_map
-dictionary. Future extensions may include specifying a driver
-for the port (so as to allow remote packet generation) and to
-specify a switch instance (to allow multiple switches to be
-tested together).
-
-Currently, the assumption is that ports are bidirectional, so
-specifying "OF port 1 is connnected to eth2" implies this is so
-for both TX and RX.
-
-"""
-
-import sys
-
-##@var platform
-# A string representing the platform under test. Tested below
-# for determining other variables.
-
-##@var controller_host
-# Gives the controller host address to use
-
-##@var controller_port
-# Gives the controller port to use
-
-# platform = "sw_userspace"
-# platform = "sw_kernelspace"
-platform = "bcm_indigo"
-# platform = "stanford_lb4g"
-
-
-if platform == "sw_userspace":
- interface_ofport_map = {
- 1 : "eth2",
- 2 : "eth3",
- 3 : "eth4",
- 4 : "eth5"
- }
- controller_host = "172.27.74.158"
- controller_port = 7000
-
-elif platform == "bcm_indigo":
- interface_ofport_map = {
- 23 : "eth2",
- 24 : "eth3",
- 25 : "eth4",
- 26 : "eth5"
- }
- # For SSH connections to switch
- controller_host = "192.168.2.2"
-# controller_host = "172.27.74.26"
- controller_port = 6634
-
-
-
-# Debug levels
-DEBUG_ALL = 0
-DEBUG_VERBOSE = 1
-DEBUG_INFO = 2
-DEBUG_WARN = 3
-DEBUG_ERROR = 4
-DEBUG_CRITICAL = 5
-DEBUG_NONE = 6 # For current setting only; not for string level
-
-debug_level_default = DEBUG_WARN
-
-dbg_string = [
- "DBG ALL ",
- "VERBOSE ",
- "INFO ",
- "WARN ",
- "ERROR ",
- "CRITICAL "
- ]
-
-
-# These can be moved into platform specific code if needed
-
-RCV_TIMEOUT_DEFAULT = 10
-RCV_SIZE_DEFAULT = 4096
-CONTROLLER_HOST_DEFAULT = ''
-CONTROLLER_PORT_DEFAULT = 6633
-
-# Timeout in seconds for initial connection
-INIT_CONNECT_TIMEOUT = 4
-
-# Number of switch connection requests to queue
-LISTEN_QUEUE_SIZE = 1
-
-def debug_log(module, cur_level, level, string):
- """
- Log a debug message
-
- Compare the debug level to the current level and display
- the string if appropriate.
- @param module String representing the module reporting the info/error
- @param cur_level The module's current debug level
- @param level The level of the error message
- @param string String to report
-
- @todo Allow file logging options, etc
- @todo Add timestamps
- @todo Consider using the native Python logging module
- """
-
- if level >= cur_level:
- #@todo Support output redirection based on debug level
- print module + ":" + dbg_string[level] + ":" + string
-
-
-def oft_assert(condition, string):
- """
- Test framework assertion check
-
- @param condition The boolean condition to check
- @param string String to print if error
-
- If condition is not true, it is considered a test framework
- failure and exit is called.
-
- This assert is meant to represent a violation in the
- assumptions of how the test framework is supposed to work
- (for example, an inconsistent packet queue state) rather than
- a test failure.
- """
- if not condition:
- debug_log("OFT", debug_level_default, DEBUG_CRITICAL,
- "Internal error: " + string)
- sys.exit(1)
-
diff --git a/src/python/oftest/parse.py b/src/python/oftest/parse.py
index af1faed..018b70b 100644
--- a/src/python/oftest/parse.py
+++ b/src/python/oftest/parse.py
@@ -12,7 +12,6 @@
from scapy.all import *
except:
sys.exit("Need to install scapy for packet parsing")
-import oft_config
"""
of_message.py
@@ -21,10 +20,8 @@
function information into the of_message namespace
"""
-parse_debug_level = oft_config.DEBUG_VERBOSE
-
-def dbg(self, level, string):
- debug_log("PARSE", parse_debug_level, level, string)
+parse_logger = logging.getLogger("parse")
+#parse_logger.setLevel(logging.DEBUG)
# These message types are subclassed
msg_type_subclassed = [
@@ -119,7 +116,7 @@
sub_hdr.unpack(binary_string[OFP_HEADER_BYTES:])
return error_to_class_map[sub_hdr.type]()
else:
- dbg(oft_config.DEBUG_ERROR, "Cannot parse pkt to message")
+ parse_logger.error("Cannot parse pkt to message")
return None
def of_message_parse(binary_string, raw=False):
@@ -138,7 +135,7 @@
"""
if raw:
- dbg(oft_config.DEBUG_ERROR, "raw packet message parsing not supported")
+ parse_logger.error("raw packet message parsing not supported")
return None
obj = _of_message_to_object(binary_string)
@@ -162,7 +159,7 @@
"""
if raw:
- dbg(oft_config.DEBUG_ERROR, "raw packet message parsing not supported")
+ parse_logger.error("raw packet message parsing not supported")
return None
hdr = ofp_header()
@@ -261,7 +258,7 @@
#@todo check min length of packet
if pkt_format.upper() != "L2":
- dbg(oft_config.DEBUG_ERROR, "Only L2 supported for packet_to_flow")
+ parse_logger.error("Only L2 supported for packet_to_flow")
return None
ether = scapy.all.Ether(packet)