blob: 8dd59525062cdb852c3867a7c65b144052369b28 [file] [log] [blame]
#!/usr/bin/env python
"""
@package oft
OpenFlow test framework top level script
This script is the entry point for running OpenFlow tests using the OFT
framework. For usage information, see --help or the README.
To add a new command line option, edit both the config_default dictionary and
the config_setup function. The option's result will end up in the global
oftest.config dictionary.
"""
import sys
import optparse
from subprocess import Popen,PIPE
import logging
import unittest
import time
import os
import imp
import random
import signal
import fnmatch
import copy
root_dir = os.path.dirname(os.path.realpath(__file__))
pydir = os.path.join(root_dir, 'src', 'python')
if os.path.exists(os.path.join(pydir, 'oftest')):
# Running from source tree
sys.path.insert(0, pydir)
import oftest
from oftest import config
import oftest.ofutils
import oftest.help_formatter
import loxi
try:
import scapy.all as scapy
except:
try:
import scapy as scapy
except:
sys.exit("Need to install scapy for packet parsing")
##@var DEBUG_LEVELS
# Map from strings to debugging levels
DEBUG_LEVELS = {
'debug' : logging.DEBUG,
'verbose' : logging.DEBUG,
'info' : logging.INFO,
'warning' : logging.WARNING,
'warn' : logging.WARNING,
'error' : logging.ERROR,
'critical' : logging.CRITICAL
}
##@var config_default
# The default configuration dictionary for OFT
config_default = {
# Miscellaneous options
"list" : False,
"list_test_names" : False,
"allow_user" : False,
# Test selection options
"test_spec" : "",
"test_file" : None,
"test_dir" : os.path.join(root_dir, "tests"),
# Switch connection options
"controller_host" : "0.0.0.0", # For passive bind
"controller_port" : 6633,
"switch_ip" : None, # If not none, actively connect to switch
"platform" : "eth",
"platform_args" : None,
"platform_dir" : os.path.join(root_dir, "platforms"),
"interfaces" : [],
"openflow_version" : "1.0",
# Logging options
"log_file" : "oft.log",
"log_append" : False,
"debug" : "verbose",
# Test behavior options
"relax" : False,
"test_params" : "None",
"fail_skipped" : False,
"default_timeout" : 2,
"minsize" : 0,
"random_seed" : None,
# Other configuration
"port_map" : {},
}
def config_setup():
"""
Set up the configuration including parsing the arguments
@return A pair (config, args) where config is an config
object and args is any additional arguments from the command line
"""
usage = "usage: %prog [options] (test|group)..."
description = """\
OFTest is a framework and set of tests for validating OpenFlow switches.
The default configuration assumes that an OpenFlow 1.0 switch is attempting to
connect to a controller on the machine running OFTest, port 6633. Additionally,
the interfaces veth1, veth3, veth5, and veth7 should be connected to the switch's
dataplane.
If no positional arguments are given then OFTest will run all tests that
depend only on standard OpenFlow 1.0. Otherwise each positional argument
is interpreted as either a test name or a test group name. The union of
these will be executed. To see what groups each test belongs to use the
--list option. Tests and groups can be subtracted from the result by
prefixing them with the '^' character.
"""
# Parse --interface
def check_interface(option, opt, value):
try:
ofport, interface = value.split('@', 1)
ofport = int(ofport)
except ValueError:
raise optparse.OptionValueError("incorrect interface syntax (got %s, expected 'ofport@interface')" % repr(value))
return (ofport, interface)
class Option(optparse.Option):
TYPES = optparse.Option.TYPES + ("interface",)
TYPE_CHECKER = copy.copy(optparse.Option.TYPE_CHECKER)
TYPE_CHECKER["interface"] = check_interface
parser = optparse.OptionParser(version="%prog 0.1",
usage=usage,
description=description,
formatter=oftest.help_formatter.HelpFormatter(),
option_class=Option)
# Set up default values
parser.set_defaults(**config_default)
parser.add_option("--list", action="store_true",
help="List all tests and exit")
parser.add_option("--list-test-names", action='store_true',
help="List test names matching the test spec and exit")
parser.add_option("--allow-user", action="store_true",
help="Proceed even if oftest is not run as root")
group = optparse.OptionGroup(parser, "Test selection options")
group.add_option("-T", "--test-spec", "--test-list", help="Tests to run, separated by commas")
group.add_option("-f", "--test-file", help="File of tests to run, one per line")
group.add_option("--test-dir", type="string", help="Directory containing tests")
parser.add_option_group(group)
group = optparse.OptionGroup(parser, "Switch connection options")
group.add_option("-H", "--host", dest="controller_host",
help="IP address to listen on (default %default)")
group.add_option("-p", "--port", dest="controller_port",
type="int", help="Port number to listen on (default %default)")
group.add_option("-S", "--switch-ip", dest="switch_ip",
help="If set, actively connect to this switch by IP")
group.add_option("-P", "--platform", help="Platform module name (default %default)")
group.add_option("-a", "--platform-args", help="Custom arguments for the platform")
group.add_option("--platform-dir", type="string", help="Directory containing platform modules")
group.add_option("--interface", "-i", type="interface", dest="interfaces", metavar="INTERFACE", action="append",
help="Specify a OpenFlow port number and the dataplane interface to use. May be given multiple times. Example: 1@eth1")
group.add_option("--of-version", "-V", dest="openflow_version", choices=loxi.version_names.values(),
help="OpenFlow version to use")
parser.add_option_group(group)
group = optparse.OptionGroup(parser, "Logging options")
group.add_option("--log-file",
help="Name of log file, empty string to log to console (default %default)")
group.add_option("--log-append", action="store_true",
help="Do not delete log file if specified")
dbg_lvl_names = sorted(DEBUG_LEVELS.keys(), key=lambda x: DEBUG_LEVELS[x])
group.add_option("--debug", choices=dbg_lvl_names,
help="Debug lvl: debug, info, warning, error, critical (default %default)")
group.add_option("-v", "--verbose", action="store_const", dest="debug",
const="verbose", help="Shortcut for --debug=verbose")
group.add_option("-q", "--quiet", action="store_const", dest="debug",
const="warning", help="Shortcut for --debug=warning")
parser.add_option_group(group)
group = optparse.OptionGroup(parser, "Test behavior options")
group.add_option("--relax", action="store_true",
help="Relax packet match checks allowing other packets")
test_params_help = """Set test parameters: key=val;... (see --list)
"""
group.add_option("-t", "--test-params", help=test_params_help)
group.add_option("--fail-skipped", action="store_true",
help="Return failure if any test was skipped")
group.add_option("--default-timeout", type="int",
help="Timeout in seconds for most operations")
group.add_option("--minsize", type="int",
help="Minimum allowable packet size on the dataplane.")
group.add_option("--random-seed", type="int",
help="Random number generator seed")
parser.add_option_group(group)
# Might need this if other parsers want command line
# parser.allow_interspersed_args = False
(options, args) = parser.parse_args()
# Convert options from a Namespace to a plain dictionary
config = config_default.copy()
for key in config.keys():
config[key] = getattr(options, key)
return (config, args)
def logging_setup(config):
"""
Set up logging based on config
"""
_format = "%(asctime)s.%(msecs)d %(name)-10s: %(levelname)-8s: %(message)s"
_datefmt = "%H:%M:%S"
_mode = config["log_append"] and "a" or "w"
if config['log_file'] in [ "-", "stderr" ]:
config['log_file'] = None
logging.basicConfig(filename=config['log_file'],
filemode=_mode,
level=DEBUG_LEVELS[config["debug"]],
format=_format, datefmt=_datefmt)
def load_test_modules(config):
"""
Load tests from the test_dir directory.
Test cases are subclasses of unittest.TestCase
Also updates the _groups member to include "standard" and
module test groups if appropriate.
@param config The oft configuration dictionary
@returns A dictionary from test module names to tuples of
(module, dictionary from test names to test classes).
"""
result = {}
for root, dirs, filenames in os.walk(config["test_dir"]):
# Iterate over each python file
for filename in fnmatch.filter(filenames, '[!.]*.py'):
modname = os.path.splitext(os.path.basename(filename))[0]
try:
if sys.modules.has_key(modname):
mod = sys.modules[modname]
else:
mod = imp.load_module(modname, *imp.find_module(modname, [root]))
except:
logging.warning("Could not import file " + filename)
raise
# Find all testcases defined in the module
tests = dict((k, v) for (k, v) in mod.__dict__.items() if type(v) == type and
issubclass(v, unittest.TestCase))
if tests:
for (testname, test) in tests.items():
# Set default annotation values
if not hasattr(test, "_groups"):
test._groups = []
if not hasattr(test, "_nonstandard"):
test._nonstandard = False
if not hasattr(test, "_disabled"):
test._disabled = False
# Put test in its module's test group
if not test._disabled:
test._groups.append(modname)
# Put test in the standard test group
if not test._disabled and not test._nonstandard:
test._groups.append("standard")
test._groups.append("all") # backwards compatibility
result[modname] = (mod, tests)
return result
def prune_tests(test_specs, test_modules, version):
"""
Return tests matching the given test-specs and OpenFlow version
@param test_specs A list of group names or test names.
@param version An OpenFlow version (e.g. "1.0")
@param test_modules Same format as the output of load_test_modules.
@returns Same format as the output of load_test_modules.
"""
result = {}
for e in test_specs:
matched = False
if e.startswith('^'):
negated = True
e = e[1:]
else:
negated = False
for (modname, (mod, tests)) in test_modules.items():
for (testname, test) in tests.items():
if e in test._groups or e == "%s.%s" % (modname, testname):
result.setdefault(modname, (mod, {}))
if not negated:
if not hasattr(test, "_versions") or version in test._versions:
result[modname][1][testname] = test
else:
if modname in result and testname in result[modname][1]:
del result[modname][1][testname]
if not result[modname][1]:
del result[modname]
matched = True
if not matched:
die("test-spec element %s did not match any tests" % e)
return result
def die(msg, exit_val=1):
print msg
logging.critical(msg)
sys.exit(exit_val)
def _space_to(n, str):
"""
Generate a string of spaces to achieve width n given string str
If length of str >= n, return one space
"""
spaces = n - len(str)
if spaces > 0:
return " " * spaces
return " "
#
# Main script
#
# Setup global configuration
(new_config, args) = config_setup()
oftest.config.update(new_config)
logging_setup(config)
logging.info("++++++++ " + time.asctime() + " ++++++++")
# Pick an OpenFlow protocol module based on the configured version
name_to_version = dict((v,k) for k, v in loxi.version_names.iteritems())
sys.modules["ofp"] = loxi.protocol(name_to_version[config["openflow_version"]])
# HACK: testutils.py imports controller.py, which needs the ofp module
import oftest.testutils
# Allow tests to import each other
sys.path.append(config["test_dir"])
test_specs = args
if config["test_spec"] != "":
print >> sys.stderr, "WARNING: The --test-spec option is deprecated"
test_specs += config["test_spec"].split(',')
if config["test_file"] != None:
with open(config["test_file"], 'r') as f:
for line in f:
line, _, _ = line.partition('#') # remove comments
line = line.strip()
if line:
test_specs.append(line)
if test_specs == []:
test_specs = ["standard"]
test_modules = load_test_modules(config)
# Check if test list is requested; display and exit if so
if config["list"]:
mod_count = 0
test_count = 0
all_groups = set()
print """\
Tests are shown grouped by module. If a test is in any groups beyond "standard"
and its module's group then they are shown in parentheses."""
print
print """\
Tests marked with '*' are non-standard and may require vendor extensions or
special switch configuration. These are not part of the "standard" test group."""
print
print """\
Tests marked with '!' are disabled because they are experimental, special-purpose,
or are too long to be run normally. These are not part of the "standard" test
group or their module's test group."""
print
print "Tests marked (TP1) after name take --test-params including:"
print " 'vid=N;strip_vlan=bool;add_vlan=bool'"
print
print "Test List:"
for (modname, (mod, tests)) in test_modules.items():
mod_count += 1
desc = (mod.__doc__ or "No description").strip().split('\n')[0]
start_str = " Module " + mod.__name__ + ": "
print start_str + _space_to(22, start_str) + desc
for (testname, test) in tests.items():
try:
desc = (test.__doc__ or "").strip()
desc = desc.split('\n')[0]
except:
desc = "No description"
groups = set(test._groups) - set(["all", "standard", modname])
all_groups.update(test._groups)
if groups:
desc = "(%s) %s" % (",".join(groups), desc)
if hasattr(test, "_versions"):
desc = "(%s) %s" % (",".join(sorted(test._versions)), desc)
start_str = " %s%s %s:" % (test._nonstandard and "*" or " ",
test._disabled and "!" or " ",
testname)
if len(start_str) > 22:
desc = "\n" + _space_to(22, "") + desc
print start_str + _space_to(22, start_str) + desc
test_count += 1
print
print "%d modules shown with a total of %d tests" % \
(mod_count, test_count)
print
print "Test groups: %s" % (', '.join(sorted(all_groups)))
sys.exit(0)
test_modules = prune_tests(test_specs, test_modules, config["openflow_version"])
# Check if test list is requested; display and exit if so
if config["list_test_names"]:
for (modname, (mod, tests)) in test_modules.items():
for (testname, test) in tests.items():
print "%s.%s" % (modname, testname)
sys.exit(0)
# Generate the test suite
#@todo Decide if multiple suites are ever needed
suite = unittest.TestSuite()
for (modname, (mod, tests)) in test_modules.items():
for (testname, test) in tests.items():
suite.addTest(test())
# Allow platforms to import each other
sys.path.append(config["platform_dir"])
# Load the platform module
platform_name = config["platform"]
logging.info("Importing platform: " + platform_name)
platform_mod = None
try:
platform_mod = imp.load_module(platform_name, *imp.find_module(platform_name, [config["platform_dir"]]))
except:
logging.warn("Failed to import " + platform_name + " platform module")
raise
try:
platform_mod.platform_config_update(config)
except:
logging.warn("Could not run platform host configuration")
raise
if not config["port_map"]:
die("Interface port map was not defined by the platform. Exiting.")
logging.debug("Configuration: " + str(config))
logging.info("OF port map: " + str(config["port_map"]))
oftest.ofutils.default_timeout = config["default_timeout"]
oftest.testutils.MINSIZE = config['minsize']
if os.getuid() != 0 and not config["allow_user"]:
print "ERROR: Super-user privileges required. Please re-run with " \
"sudo or as root."
sys.exit(1)
if config["random_seed"] is not None:
logging.info("Random seed: %d" % config["random_seed"])
random.seed(config["random_seed"])
else:
# Generate random seed and report to log file
seed = random.randrange(100000000)
logging.info("Autogen random seed: %d" % seed)
random.seed(seed)
# Remove python's signal handler which raises KeyboardError. Exiting from an
# exception waits for all threads to terminate which might not happen.
signal.signal(signal.SIGINT, signal.SIG_DFL)
if __name__ == "__main__":
# Set up the dataplane
oftest.dataplane_instance = oftest.dataplane.DataPlane(config)
for of_port, ifname in config["port_map"].items():
oftest.dataplane_instance.port_add(ifname, of_port)
logging.info("*** TEST RUN START: " + time.asctime())
result = unittest.TextTestRunner(verbosity=2).run(suite)
if oftest.testutils.skipped_test_count > 0:
ts = " tests"
if oftest.testutils.skipped_test_count == 1: ts = " test"
logging.info("Skipped " + str(oftest.testutils.skipped_test_count) + ts)
print("Skipped " + str(oftest.testutils.skipped_test_count) + ts)
logging.info("*** TEST RUN END : " + time.asctime())
# Shutdown the dataplane
oftest.dataplane_instance.kill()
oftest.dataplane_instance = None
if result.failures or result.errors:
# exit(1) hangs sometimes
os._exit(1)
if oftest.testutils.skipped_test_count > 0 and config["fail_skipped"]:
os._exit(1)