blob: cae63b6af7bf6eaf6ea8380301cd5b41a268fdc7 [file] [log] [blame]
#!/usr/bin/env python
"""
@package oft
OpenFlow test framework top level script
This script is the entry point for running OpenFlow tests using the OFT
framework. For usage information, see --help or the README.
To add a new command line option, edit both the CONFIG_DEFAULT dictionary and
the config_setup function. The option's result will end up in the global
oftest.config dictionary.
"""
from __future__ import print_function
import sys
import optparse
import logging
import unittest
import time
import os
import imp
import random
import signal
import fnmatch
import copy
ROOT_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir)
LOG_DIR = os.path.join(ROOT_DIR, "Log")+time.strftime("/%Y-%m/%d%H%M%S")
#LOG_FILE = time.strftime("%H%M%S")+"oft.log"
PY_SRC_DIR = os.path.join(ROOT_DIR, 'Utilities', 'src', 'python')
if os.path.exists(os.path.join(PY_SRC_DIR, 'oftest')):
# Running from source tree
sys.path.insert(0, PY_SRC_DIR)
if os.path.exists(os.path.join(ROOT_DIR, 'Utilities', 'accton')):
PY_ACCTON_DIR = os.path.join(ROOT_DIR, 'Utilities', 'accton')
sys.path.insert(0, PY_ACCTON_DIR)
import oftest
from oftest import config
import oftest.ofutils
import oftest.help_formatter
import loxi
##@var DEBUG_LEVELS
# Map from strings to debugging levels
DEBUG_LEVELS = {
'debug' : logging.DEBUG,
'verbose' : logging.DEBUG,
'info' : logging.INFO,
'warning' : logging.WARNING,
'warn' : logging.WARNING,
'error' : logging.ERROR,
'critical' : logging.CRITICAL
}
##@var CONFIG_DEFAULT
# The default configuration dictionary for OFT
CONFIG_DEFAULT = {
# Miscellaneous options
"list" : False,
"list_test_names" : False,
"allow_user" : False,
# Test selection options
"test_spec" : "",
"test_file" : None,
"test_dir" : os.path.join(ROOT_DIR, "Tests"),
# Switch connection options
"controller_host" : "0.0.0.0", # For passive bind
"controller_port" : 6653,
"switch_ip" : None, # If not none, actively connect to switch
"switch_type" : None, # If not none, adapt flows to pipeline differences for switch type
"platform" : "eth",
"platform_args" : None,
"platform_dir" : os.path.join(ROOT_DIR, "Utilities", "platforms"),
"interfaces" : [],
"openflow_version" : "1.0",
# Logging options
"log_file" : time.strftime("%H%M%S")+"oft.log",
"log_dir" : LOG_DIR,
"debug" : "verbose",
"profile" : False,
"profile_file" : "profile.out",
"xunit" : False,
"xunit_dir" : "xunit",
# Test behavior options
"relax" : False,
"test_params" : "None",
"fail_skipped" : False,
"default_timeout" : 2.0,
"default_negative_timeout" : 0.01,
"minsize" : 0,
"random_seed" : None,
"disable_ipv6" : False,
"random_order" : False,
"dump_packet" : True,
"cicada_poject" : False,
"force_ofdpa_restart": False,
# Other configuration
"port_map" : {},
}
def config_setup():
"""
Set up the configuration including parsing the arguments
@return A pair (config, args) where config is an config
object and args is any additional arguments from the command line
"""
usage = "usage: %prog [options] (test|group)..."
description = """\
OFTest is a framework and set of tests for validating OpenFlow switches.
The default configuration assumes that an OpenFlow 1.0 switch is attempting to
connect to a controller on the machine running OFTest, port 6653. Additionally,
the interfaces veth1, veth3, veth5, and veth7 should be connected to the switch's
dataplane.
If no positional arguments are given then OFTest will run all tests that
depend only on standard OpenFlow 1.0. Otherwise each positional argument
is interpreted as either a test name or a test group name. The union of
these will be executed. To see what groups each test belongs to use the
--list option. Tests and groups can be subtracted from the result by
prefixing them with the '^' character.
"""
# Parse --interface
def check_interface(option, opt, value):
try:
ofport, interface = value.split('@', 1)
ofport = int(ofport)
except ValueError:
raise optparse.OptionValueError("incorrect interface syntax (got %s, expected 'ofport@interface')" % repr(value))
return (ofport, interface)
class Option(optparse.Option):
TYPES = optparse.Option.TYPES + ("interface",)
TYPE_CHECKER = copy.copy(optparse.Option.TYPE_CHECKER)
TYPE_CHECKER["interface"] = check_interface
parser = optparse.OptionParser(version="%prog 0.1",
usage=usage,
description=description,
formatter=oftest.help_formatter.HelpFormatter(),
option_class=Option)
# Set up default values
parser.set_defaults(**CONFIG_DEFAULT)
parser.add_option("--list", action="store_true",
help="List all tests and exit")
parser.add_option("--list-test-names", action='store_true',
help="List test names matching the test spec and exit")
parser.add_option("--allow-user", action="store_true",
help="Proceed even if oftest is not run as root")
group = optparse.OptionGroup(parser, "Test selection options")
group.add_option("-T", "--test-spec", "--test-list", help="Tests to run, separated by commas")
group.add_option("-f", "--test-file", help="File of tests to run, one per line")
group.add_option("--test-dir", type="string", help="Directory containing tests")
parser.add_option_group(group)
group = optparse.OptionGroup(parser, "Switch connection options")
group.add_option("-H", "--host", dest="controller_host",
help="IP address to listen on (default %default)")
group.add_option("-p", "--port", dest="controller_port",
type="int", help="Port number to listen on (default %default)")
group.add_option("-S", "--switch-ip", dest="switch_ip",
help="If set, actively connect to this switch by IP")
group.add_option("-Y", "--switch-type", dest="switch_type",
help="If set to qmx, flows will adapt to pipline differences")
group.add_option("-P", "--platform", help="Platform module name (default %default)")
group.add_option("-a", "--platform-args", help="Custom arguments for the platform")
group.add_option("--platform-dir", type="string", help="Directory containing platform modules")
group.add_option("--interface", "-i", type="interface", dest="interfaces", metavar="INTERFACE", action="append",
help="Specify a OpenFlow port number and the dataplane interface to use. May be given multiple times. Example: 1@eth1")
group.add_option("--of-version", "-V", dest="openflow_version", choices=loxi.version_names.values(),
help="OpenFlow version to use")
parser.add_option_group(group)
group = optparse.OptionGroup(parser, "Logging options")
group.add_option("--log-file", help="Name of log file (default %default)")
group.add_option("--log-dir", help="Name of log directory")
dbg_lvl_names = sorted(DEBUG_LEVELS.keys(), key=lambda x: DEBUG_LEVELS[x])
group.add_option("--debug", choices=dbg_lvl_names,
help="Debug lvl: debug, info, warning, error, critical (default %default)")
group.add_option("-v", "--verbose", action="store_const", dest="debug",
const="verbose", help="Shortcut for --debug=verbose")
group.add_option("-q", "--quiet", action="store_const", dest="debug",
const="warning", help="Shortcut for --debug=warning")
group.add_option("--profile", action="store_true", help="Enable Python profiling")
group.add_option("--profile-file", help="Output file for Python profiler")
group.add_option("--xunit", action="store_true", help="Enable xUnit-formatted results")
group.add_option("--xunit-dir", help="Output directory for xUnit-formatted results")
parser.add_option_group(group)
group = optparse.OptionGroup(parser, "Test behavior options")
group.add_option("--relax", action="store_true",
help="Relax packet match checks allowing other packets")
test_params_help = """Set test parameters: key=val;... (see --list)
"""
group.add_option("-t", "--test-params", help=test_params_help)
group.add_option("--fail-skipped", action="store_true",
help="Return failure if any test was skipped")
group.add_option("--default-timeout", type=float,
help="Timeout in seconds for most operations")
group.add_option("--default-negative-timeout", type=float,
help="Timeout in seconds for negative checks")
group.add_option("--minsize", type="int",
help="Minimum allowable packet size on the dataplane.")
group.add_option("--random-seed", type="int",
help="Random number generator seed")
group.add_option("--force-ofdpa-restart",
help="If set force ofdpa restart on user@switchIP")
group.add_option("--disable-ipv6", action="store_true",
help="Disable IPv6 tests")
group.add_option("--random-order", action="store_true",
help="Randomize order of tests")
group.add_option("--dump_packet", action="store_true",
help="Dump packet content on log when verify packet fail")
group.add_option("--cicada_poject", action="store_true",
help="True verify Cicada behavior, False verify AOS behaviro")
parser.add_option_group(group)
# Might need this if other parsers want command line
# parser.allow_interspersed_args = False
(options, args) = parser.parse_args()
# If --test-dir wasn't given, pick one based on the OpenFlow version
# if options.test_dir == None:
# if options.openflow_version == "1.3":
# options.test_dir = os.path.join(ROOT_DIR, "Test", "ofdpa")
# Convert options from a Namespace to a plain dictionary
config = CONFIG_DEFAULT.copy()
for key in config.keys():
config[key] = getattr(options, key)
return (config, args)
#def logging_directory()
"""
creates a specific log directory based on day of execution
"""
# log_dir=os.path.join(ROOT_DIR, "Log")+time.strftime("/%Y/%m/%d")
# return(log_dir)
def logging_setup(config):
"""
Set up logging based on config
"""
logging.getLogger().setLevel(DEBUG_LEVELS[config["debug"]])
if config["log_dir"] != None:
if not os.path.exists(config["log_dir"]):
os.makedirs(config["log_dir"])
else:
if os.path.exists(config["log_file"]):
os.remove(config["log_file"])
oftest.open_logfile('main')
def xunit_setup(config):
"""
Set up xUnit output based on config
"""
if not config["xunit"]:
return
if os.path.exists(config["xunit_dir"]):
import shutil
shutil.rmtree(config["xunit_dir"])
os.makedirs(config["xunit_dir"])
def pcap_setup(config):
"""
Set up dataplane packet capturing based on config
"""
if config["log_dir"] == None:
filename = os.path.splitext(config["log_file"])[0] + '.pcap'
oftest.dataplane_instance.start_pcap(filename)
else:
# start_pcap is called per-test in base_tests
pass
def profiler_setup(config):
"""
Set up profiler based on config
"""
if not config["profile"]:
return
import cProfile
profiler = cProfile.Profile()
profiler.enable()
return profiler
def profiler_teardown(profiler):
"""
Tear down profiler based on config
"""
if not config["profile"]:
return
profiler.disable()
profiler.dump_stats(config["profile_file"])
def load_test_modules(config):
"""
Load tests from the test_dir directory.
Test cases are subclasses of unittest.TestCase
Also updates the _groups member to include "standard" and
module test groups if appropriate.
@param config The oft configuration dictionary
@returns A dictionary from test module names to tuples of
(module, dictionary from test names to test classes).
"""
result = {}
for root, dirs, filenames in os.walk(config["test_dir"]):
# Iterate over each python file
for filename in fnmatch.filter(filenames, '[!.]*.py'):
modname = os.path.splitext(os.path.basename(filename))[0]
try:
if sys.modules.has_key(modname):
mod = sys.modules[modname]
else:
mod = imp.load_module(modname, *imp.find_module(modname, [root]))
except:
logging.warning("Could not import file " + filename)
raise
# Find all testcases defined in the module
tests = dict((k, v) for (k, v) in mod.__dict__.items() if type(v) == type and
issubclass(v, unittest.TestCase) and
hasattr(v, "runTest"))
if tests:
for (testname, test) in tests.items():
# Set default annotation values
if not hasattr(test, "_groups"):
test._groups = []
if not hasattr(test, "_nonstandard"):
test._nonstandard = False
if not hasattr(test, "_disabled"):
test._disabled = False
# Put test in its module's test group
if not test._disabled:
test._groups.append(modname)
# Put test in the standard test group
if not test._disabled and not test._nonstandard:
test._groups.append("standard")
test._groups.append("all") # backwards compatibility
result[modname] = (mod, tests)
return result
def prune_tests(test_specs, test_modules, version):
"""
Return tests matching the given test-specs and OpenFlow version
@param test_specs A list of group names or test names.
@param version An OpenFlow version (e.g. "1.0")
@param test_modules Same format as the output of load_test_modules.
@returns Same format as the output of load_test_modules.
"""
result = {}
for e in test_specs:
matched = False
if e.startswith('^'):
negated = True
e = e[1:]
else:
negated = False
for (modname, (mod, tests)) in test_modules.items():
for (testname, test) in tests.items():
if e in test._groups or e == "%s.%s" % (modname, testname):
result.setdefault(modname, (mod, {}))
if not negated:
if not hasattr(test, "_versions") or version in test._versions:
result[modname][1][testname] = test
else:
if modname in result and testname in result[modname][1]:
del result[modname][1][testname]
if not result[modname][1]:
del result[modname]
matched = True
if not matched:
die("test-spec element %s did not match any tests" % e)
return result
def die(msg, exit_val=1):
logging.critical(msg)
sys.exit(exit_val)
#
# Main script
#
# Setup global configuration
(new_config, args) = config_setup()
oftest.config.update(new_config)
logging_setup(config)
xunit_setup(config)
logging.info("++++++++ " + time.asctime() + " ++++++++")
# Pick an OpenFlow protocol module based on the configured version
name_to_version = dict((v,k) for k, v in loxi.version_names.iteritems())
sys.modules["ofp"] = loxi.protocol(name_to_version[config["openflow_version"]])
# HACK: testutils.py imports controller.py, which needs the ofp module
import oftest.testutils
# Allow tests to import each other
sys.path.append(config["test_dir"])
test_specs = args
if config["test_spec"] != "":
logging.warning("The '--test-spec' option is deprecated.")
test_specs += config["test_spec"].split(',')
if config["test_file"] != None:
with open(config["test_file"], 'r') as f:
for line in f:
line, _, _ = line.partition('#') # remove comments
line = line.strip()
if line:
test_specs.append(line)
if test_specs == []:
test_specs = ["standard"]
test_modules = load_test_modules(config)
# Check if test list is requested; display and exit if so
if config["list"]:
mod_count = 0
test_count = 0
all_groups = set()
print("""
Tests are shown grouped by module. If a test is in any groups beyond
"standard" and its module's group then they are shown in parentheses.
Tests marked with '*' are non-standard and may require vendor extensions or
special switch configuration. These are not part of the "standard" test group.
Tests marked with '!' are disabled because they are experimental,
special-purpose, or are too long to be run normally. These are not part of
the "standard" test group or their module's test group.
Tests marked (TP1) after name take --test-params including:
'vid=N;strip_vlan=bool;add_vlan=bool'
Test List:
""")
for (modname, (mod, tests)) in test_modules.items():
mod_count += 1
desc = (mod.__doc__ or "No description").strip().split('\n')[0]
print(" Module %13s: %s" % (mod.__name__, desc))
for (testname, test) in tests.items():
desc = (test.__doc__ or "No description").strip().split('\n')[0]
groups = set(test._groups) - set(["all", "standard", modname])
all_groups.update(test._groups)
if groups:
desc = "(%s) %s" % (",".join(groups), desc)
if hasattr(test, "_versions"):
desc = "(%s) %s" % (",".join(sorted(test._versions)), desc)
start_str = " %s%s %s:" % (test._nonstandard and "*" or " ",
test._disabled and "!" or " ",
testname)
print(" %22s : %s" % (start_str, desc))
test_count += 1
print
print("'%d' modules shown with a total of '%d' tests\n" %
(mod_count, test_count))
print("Test groups: %s" % (', '.join(sorted(all_groups))))
sys.exit(0)
test_modules = prune_tests(test_specs, test_modules, config["openflow_version"])
# Check if test list is requested; display and exit if so
if config["list_test_names"]:
for (modname, (mod, tests)) in test_modules.items():
for (testname, test) in tests.items():
print("%s.%s" % (modname, testname))
sys.exit(0)
# Generate the test suite
#@todo Decide if multiple suites are ever needed
suite = unittest.TestSuite()
sorted_tests = []
for (modname, (mod, tests)) in sorted(test_modules.items()):
for (testname, test) in sorted(tests.items()):
sorted_tests.append(test)
if config["random_order"]:
random.shuffle(sorted_tests)
for test in sorted_tests:
suite.addTest(test())
# Allow platforms to import each other
sys.path.append(config["platform_dir"])
# Load the platform module
platform_name = config["platform"]
logging.info("Importing platform: " + platform_name)
platform_mod = None
try:
platform_mod = imp.load_module(platform_name, *imp.find_module(platform_name, [config["platform_dir"]]))
except:
logging.warn("Failed to import " + platform_name + " platform module")
raise
try:
platform_mod.platform_config_update(config)
except:
logging.warn("Could not run platform host configuration")
raise
if not config["port_map"]:
die("Interface port map was not defined by the platform. Exiting.")
logging.debug("Configuration: " + str(config))
logging.info("OF port map: " + str(config["port_map"]))
oftest.ofutils.default_timeout = config["default_timeout"]
oftest.ofutils.default_negative_timeout = config["default_negative_timeout"]
oftest.testutils.MINSIZE = config['minsize']
if os.getuid() != 0 and not config["allow_user"]:
die("Super-user privileges required. Please re-run with sudo or as root.")
sys.exit(1)
if config["random_seed"] is not None:
logging.info("Random seed: %d" % config["random_seed"])
random.seed(config["random_seed"])
else:
# Generate random seed and report to log file
seed = random.randrange(100000000)
logging.info("Autogen random seed: %d" % seed)
random.seed(seed)
# Remove python's signal handler which raises KeyboardError. Exiting from an
# exception waits for all threads to terminate which might not happen.
signal.signal(signal.SIGINT, signal.SIG_DFL)
if __name__ == "__main__":
profiler = profiler_setup(config)
# Set up the dataplane
oftest.dataplane_instance = oftest.dataplane.DataPlane(config)
pcap_setup(config)
for of_port, ifname in config["port_map"].items():
oftest.dataplane_instance.port_add(ifname, of_port)
logging.info("*** TEST RUN START: " + time.asctime())
if config["xunit"]:
try:
import xmlrunner # fail-fast if module missing
except ImportError as ex:
oftest.dataplane_instance.kill()
profiler_teardown(profiler)
raise ex
runner = xmlrunner.XMLTestRunner(output=config["xunit_dir"],
outsuffix="",
verbosity=2)
result = runner.run(suite)
else:
result = unittest.TextTestRunner(verbosity=2).run(suite)
oftest.open_logfile('main')
if oftest.testutils.skipped_test_count > 0:
message = "Skipped %d test(s)" % oftest.testutils.skipped_test_count
logging.info(message)
logging.info("*** TEST RUN END : %s", time.asctime())
# Shutdown the dataplane
oftest.dataplane_instance.kill()
oftest.dataplane_instance = None
profiler_teardown(profiler)
if result.failures or result.errors:
# exit(1) hangs sometimes
os._exit(1)
if oftest.testutils.skipped_test_count > 0 and config["fail_skipped"]:
os._exit(1)