blob: 44ec997a1068ac1e70745b5da0bed74cc582c56e [file] [log] [blame]
"""
OpenFlow Test Framework
DataPlane and DataPlanePort classes
Provide the interface to the control the set of ports being used
to stimulate the switch under test.
See the class dataplaneport for more details. This class wraps
a set of those objects allowing general calls and parsing
configuration.
@todo Add "filters" for matching packets. Actions supported
for filters should include a callback or a counter
"""
import sys
import os
import socket
import time
import netutils
from threading import Thread
from threading import Lock
from oft_config import *
import select
#@todo Move these identifiers into config
ETH_P_ALL = 0x03
RCV_TIMEOUT = 10000
RCV_SIZE = 4096
# class packet_queue:
# """
# Class defining a packet queue across multiple ports
# Items in the queue are stored as a triple (port number, pkt, pkt in time)
# """
# def __init__(self, max_pkts=1024):
# self.sync = Lock()
# self.debug_level = debug_level_default
# self.packets = []
# self.max_pkts = max_pkts
# self.packets_total = 0
# self.packets_discarded = 0
class DataPlanePort(Thread):
"""
Class defining a port monitoring object.
Control a dataplane port connected to the switch under test.
Creates a promiscuous socket on a physical interface.
Queues the packets received on that interface with time stamps.
Inherits from Thread class as meant to run in background. Also
supports polling.
Use accessors to dequeue packets for proper synchronization.
"""
def __init__(self, interface_name, max_pkts=1024):
"""
Set up a port monitor object
@param interface_name The name of the physical interface like eth1
@param max_pkts Maximum number of pkts to keep in queue
"""
Thread.__init__(self)
self.interface_name = interface_name
self.debug_level = debug_level_default
self.max_pkts = max_pkts
self.packets_total = 0
self.packets = []
self.packets_discarded = 0
self.sync = Lock()
self.socket = self.interface_open(interface_name)
self.dbg(DEBUG_INFO, "Openned port monitor socket")
def dbg(self, level, string):
debug_log("DPLANE", self.debug_level, level,
self.interface_name + ": " + string)
def interface_open(self, interface_name):
"""
Open a socket in a promiscuous mode for a data connection.
@param interface_name port name as a string such as 'eth1'
@retval s socket
"""
s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
socket.htons(ETH_P_ALL))
s.bind((interface_name, 0))
netutils.set_promisc(s, interface_name)
s.settimeout(RCV_TIMEOUT)
return s
def run(self):
"""
Activity function for class
"""
self.running = True
self.socs = [self.socket]
error_warned = False # Have we warned about error?
while self.running:
try:
sel_in, sel_out, sel_err = \
select.select(self.socs, [], [], 1)
except:
print sys.exc_info()
self.dbg(DEBUG_ERROR, "Select error, exiting")
sys.exit(1)
#if not sel_err is None:
# self.dbg(DEBUG_VERBOSE, "Socket error from select set")
if not self.running:
break
if sel_in is None:
continue
try:
rcvmsg = self.socket.recv(RCV_SIZE)
except socket.error:
if not error_warned:
self.dbg(DEBUG_INFO, "Socket error on recv")
error_warned = True
continue
if len(rcvmsg) == 0:
self.dbg(DEBUG_INFO, "Zero len pkt rcvd")
self.kill()
break
rcvtime = time.clock()
self.dbg(DEBUG_VERBOSE, "Pkt len " + str(len(rcvmsg)) +
" in at " + str(rcvtime))
self.sync.acquire()
if len(self.packets) >= self.max_pkts:
self.packets.pop(0)
self.packets_discarded += 1
self.packets.append((rcvmsg, rcvtime))
self.packets_total += 1
self.sync.release()
self.dbg(DEBUG_INFO, "Thread exit ")
def kill(self):
"""
Terminate the running thread
"""
self.running = False
try:
self.socket.close()
except:
self.dbg(DEBUG_INFO, "Ignoring dataplane soc shutdown error")
self.dbg(DEBUG_INFO,
"Port monitor exiting")
def dequeue(self):
"""
Get the oldest packet in the queue
"""
self.sync.acquire()
pkt, pkt_time = self.packets.pop(0)
self.sync.release()
return pkt, pkt_time
def timestamp_head(self):
"""
Return the timestamp of the head of queue or None if empty
"""
rv = None
self.sync.acquire()
if len(self.packets) > 0:
rv = self.packets[0][1]
self.sync.release()
return rv
def flush(self):
"""
Clear the packet queue
"""
self.sync.acquire()
self.packets_discarded += len(self.packets)
self.packets = []
self.packet_times = []
self.sync.release()
def send(self, packet):
"""
Send a packet to the dataplane port
@param packet The packet data to send to the port
@retval The number of bytes sent
"""
self.dbg(DEBUG_VERBOSE,
"port sending " + str(len(packet)) + " bytes")
return self.socket.send(packet)
def register(self, handler):
"""
Register a callback function to receive packets from this
port. The callback will be passed the packet, the
interface name and the port number (if set) on which the
packet was received.
To be implemented
"""
pass
def show(self, prefix=''):
print prefix + "Name: " + self.interface_name
print prefix + "Pkts pending: " + str(len(self.packets))
print prefix + "Pkts total: " + str(self.packets_total)
print prefix + "socket: " + str(self.socket)
class DataPlane:
"""
Class defining access primitives to the data plane
Controls a list of DataPlanePort objects
"""
def __init__(self):
self.port_list = {}
self.debug_level = debug_level_default
def dbg(self, level, string):
debug_log("DPORT", self.debug_level, level, string)
def port_add(self, interface_name, port_number):
"""
Add a port to the dataplane
TBD: Max packets for queue?
@param interface_name The name of the physical interface like eth1
@param port_number The port number used to refer to the port
"""
self.port_list[port_number] = DataPlanePort(interface_name)
self.port_list[port_number].start()
def send(self, port_number, packet):
"""
Send a packet to the given port
@param port_number The port to send the data to
@param packet Raw packet data to send to port
"""
self.dbg(DEBUG_VERBOSE,
"Sending %d bytes to port %d" % (len(packet), port_number))
bytes = self.port_list[port_number].send(packet)
if bytes != len(packet):
self.dbg(DEBUG_ERROR,"Unhandled send error, " +
"length mismatch %d != %d" %
(bytes, len(packet)))
return bytes
def flood(self, packet):
"""
Send a packet to all ports
@param packet Raw packet data to send to port
"""
for port_number in self.port_list.keys():
bytes = self.port_list[port_number].send(packet)
if bytes != len(packet):
self.dbg(DEBUG_ERROR, "Unhandled send error" +
", port %d, length mismatch %d != %d" %
(port_number, bytes, len(packet)))
def packet_get(self, port_number=None):
"""
Get a packet from the data plane
If port_number is given, get the oldest packet from that port.
Otherwise, find the port with the oldest packet and return
that packet.
@param port_number If set, get packet from this port
@retval The triple port_number, packet, pkt_time where packet
is received from port_number at time pkt_time.
"""
if port_number:
if len(self.port_list[port_number].packets) != 0:
pkt, time = self.port_list[port_number].dequeue()
return port_number, pkt, time
else:
return None, None, None
# Find port with oldest packet
#@todo Consider using a single queue for all ports
min_time = 0
min_port = -1
for port_number in self.port_list.keys():
ptime = self.port_list[port_number].timestamp_head()
if ptime:
if (min_port == -1) or (ptime < min_time):
min_time = ptime
min_port = port_number
if min_port == -1:
return None, None, None
pkt, time = self.port_list[min_port].dequeue()
return min_port, pkt, time
def kill(self, join_threads=False):
"""
Close all sockets for dataplane
@param join_threads If True call join on each thread
"""
for port_number in self.port_list.keys():
self.port_list[port_number].kill()
if join_threads:
self.dbg(DEBUG_INFO, "Joining " + str(port_number))
self.port_list[port_number].join()
self.dbg(DEBUG_INFO, "DataPlane shutdown")
def show(self, prefix=''):
print prefix + "Dataplane Controller"
for pnum, port in self.port_list.items():
print prefix + "OpenFlow Port Number " + str(pnum)
port.show(prefix + ' ')