blob: bdbfea7c70f992a02a46b34c7a05eb85efc6916f [file] [log] [blame]
"""
OpenFlow Test Framework
DataPlane and DataPlanePort classes
Provide the interface to the control the set of ports being used
to stimulate the switch under test.
See the class dataplaneport for more details. This class wraps
a set of those objects allowing general calls and parsing
configuration.
@todo Add "filters" for matching packets. Actions supported
for filters should include a callback or a counter
"""
import sys
import os
import socket
import time
import netutils
from threading import Thread
from threading import Lock
from threading import Condition
from oft_config import *
import select
#@todo Move these identifiers into config
ETH_P_ALL = 0x03
RCV_TIMEOUT = 10000
RCV_SIZE = 4096
class DataPlanePort(Thread):
"""
Class defining a port monitoring object.
Control a dataplane port connected to the switch under test.
Creates a promiscuous socket on a physical interface.
Queues the packets received on that interface with time stamps.
Inherits from Thread class as meant to run in background. Also
supports polling.
Use accessors to dequeue packets for proper synchronization.
Currently assumes a controlling 'parent' which maintains a
common Lock object and a total packet-pending count. May want
to decouple that some day.
"""
def __init__(self, interface_name, port_number, parent, max_pkts=1024):
"""
Set up a port monitor object
@param interface_name The name of the physical interface like eth1
@param parent The controlling dataplane object; for pkt wait CV
@param max_pkts Maximum number of pkts to keep in queue
"""
Thread.__init__(self)
self.interface_name = interface_name
self.debug_level = debug_level_default
self.max_pkts = max_pkts
self.packets_total = 0
self.packets = []
self.packets_discarded = 0
self.port_number = port_number
self.socket = self.interface_open(interface_name)
self.dbg(DEBUG_INFO, "Openned port monitor socket")
self.parent = parent
self.pkt_sync = self.parent.pkt_sync
def dbg(self, level, string):
debug_log("DPLANE", self.debug_level, level,
self.interface_name + ": " + string)
def interface_open(self, interface_name):
"""
Open a socket in a promiscuous mode for a data connection.
@param interface_name port name as a string such as 'eth1'
@retval s socket
"""
s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
socket.htons(ETH_P_ALL))
s.bind((interface_name, 0))
netutils.set_promisc(s, interface_name)
s.settimeout(RCV_TIMEOUT)
return s
def run(self):
"""
Activity function for class
"""
self.running = True
self.socs = [self.socket]
error_warned = False # Have we warned about error?
while self.running:
try:
sel_in, sel_out, sel_err = \
select.select(self.socs, [], [], 1)
except:
print sys.exc_info()
self.dbg(DEBUG_ERROR, "Select error, exiting")
sys.exit(1)
#if not sel_err is None:
# self.dbg(DEBUG_VERBOSE, "Socket error from select set")
if not self.running:
break
if sel_in is None:
continue
try:
rcvmsg = self.socket.recv(RCV_SIZE)
except socket.error:
if not error_warned:
self.dbg(DEBUG_INFO, "Socket error on recv")
error_warned = True
continue
if len(rcvmsg) == 0:
self.dbg(DEBUG_INFO, "Zero len pkt rcvd")
self.kill()
break
rcvtime = time.clock()
self.dbg(DEBUG_VERBOSE, "Pkt len " + str(len(rcvmsg)) +
" in at " + str(rcvtime))
# Enqueue packet
self.pkt_sync.acquire()
if len(self.packets) >= self.max_pkts:
# Queue full, throw away oldest
self.packets.pop(0)
self.packets_discarded += 1
else:
self.parent.packets_pending += 1
# Check if parent is waiting on this (or any) port
if self.parent.want_pkt:
if (not self.parent.want_pkt_port or
self.parent.want_pkt_port == self.port_number):
self.parent.got_pkt_port = self.port_number
self.parent.want_pkt = False
self.parent.want_pkt.notify()
self.packets.append((rcvmsg, rcvtime))
self.packets_total += 1
self.pkt_sync.release()
self.dbg(DEBUG_INFO, "Thread exit ")
def kill(self):
"""
Terminate the running thread
"""
self.running = False
try:
self.socket.close()
except:
self.dbg(DEBUG_INFO, "Ignoring dataplane soc shutdown error")
self.dbg(DEBUG_INFO, "Port monitor exiting")
def dequeue(self, use_lock=True):
"""
Get the oldest packet in the queue
@param use_lock If True, acquires the packet sync lock (which is
really the parent's lock)
@return The pair packet, packet time-stamp
"""
if use_lock:
self.pkt_sync.acquire()
if len(self.packets) > 0:
pkt, pkt_time = self.packets.pop(0)
self.parent.packets_pending -= 1
else:
pkt = pkt_time = None
if use_lock:
self.pkt_sync.release()
return pkt, pkt_time
def timestamp_head(self):
"""
Return the timestamp of the head of queue or None if empty
"""
rv = None
try:
rv = self.packets[0][1]
except:
rv = None
return rv
def flush(self):
"""
Clear the packet queue
"""
self.pkt_sync.acquire()
self.packets_discarded += len(self.packets)
self.parent.packets_pending -= len(self.packets)
self.packets = []
self.packet_times = []
self.pkt_sync.release()
def send(self, packet):
"""
Send a packet to the dataplane port
@param packet The packet data to send to the port
@retval The number of bytes sent
"""
self.dbg(DEBUG_VERBOSE,
"port sending " + str(len(packet)) + " bytes")
return self.socket.send(packet)
def register(self, handler):
"""
Register a callback function to receive packets from this
port. The callback will be passed the packet, the
interface name and the port number (if set) on which the
packet was received.
To be implemented
"""
pass
def show(self, prefix=''):
print prefix + "Name: " + self.interface_name
print prefix + "Pkts pending: " + str(len(self.packets))
print prefix + "Pkts total: " + str(self.packets_total)
print prefix + "socket: " + str(self.socket)
class DataPlane:
"""
Class defining access primitives to the data plane
Controls a list of DataPlanePort objects
"""
def __init__(self):
self.port_list = {}
self.debug_level = debug_level_default
# pkt_sync serves double duty as a regular top level lock and
# as a condition variable
self.pkt_sync = Condition()
# These are used to signal async pkt arrival for polling
self.want_pkt = False
self.want_pkt_port = None # What port required (or None)
self.got_pkt_port = None # On what port received?
self.packets_pending = 0 # Total pkts in all port queues
def dbg(self, level, string):
debug_log("DPORT", self.debug_level, level, string)
def port_add(self, interface_name, port_number):
"""
Add a port to the dataplane
TBD: Max packets for queue?
@param interface_name The name of the physical interface like eth1
@param port_number The port number used to refer to the port
"""
self.port_list[port_number] = DataPlanePort(interface_name,
port_number, self)
self.port_list[port_number].start()
def send(self, port_number, packet):
"""
Send a packet to the given port
@param port_number The port to send the data to
@param packet Raw packet data to send to port
"""
self.dbg(DEBUG_VERBOSE,
"Sending %d bytes to port %d" % (len(packet), port_number))
bytes = self.port_list[port_number].send(packet)
if bytes != len(packet):
self.dbg(DEBUG_ERROR,"Unhandled send error, " +
"length mismatch %d != %d" %
(bytes, len(packet)))
return bytes
def flood(self, packet):
"""
Send a packet to all ports
@param packet Raw packet data to send to port
"""
for port_number in self.port_list.keys():
bytes = self.port_list[port_number].send(packet)
if bytes != len(packet):
self.dbg(DEBUG_ERROR, "Unhandled send error" +
", port %d, length mismatch %d != %d" %
(port_number, bytes, len(packet)))
def _oldest_packet_find(self):
# Find port with oldest packet
min_time = 0
min_port = -1
for port_number in self.port_list.keys():
ptime = self.port_list[port_number].timestamp_head()
if ptime:
if (min_port == -1) or (ptime < min_time):
min_time = ptime
min_port = port_number
oft_assert(min_port != -1, "Could not find port when pkts pending")
return min_port
def poll(self, port_number=None, timeout=None):
"""
Poll one or all dataplane ports for a packet
If port_number is given, get the oldest packet from that port.
Otherwise, find the port with the oldest packet and return
that packet.
@param port_number If set, get packet from this port
@param timeout If positive and no packet is available, block
until a packet is received or for this many seconds
@return The triple port_number, packet, pkt_time where packet
is received from port_number at time pkt_time. If a timeout
occurs, return None, None, None
"""
self.pkt_sync.acquire()
# Check if requested specific port and it has a packet
if port_number and len(self.port_list[port_number].packets) != 0:
pkt, time = self.port_list[port_number].dequeue(use_lock=False)
self.pkt_sync.release()
oft_assert(pkt, "Poll: packet not found on port " +
str(port_number))
return port_number, pkt, time
# Check if requested any port and some packet pending
if not port_number and self.packets_pending != 0:
port = self._oldest_packet_find()
pkt, time = self.port_list[port].dequeue(use_lock=False)
self.pkt_sync.release()
oft_assert(pkt, "Poll: oldest packet not found")
return port, pkt, time
# No packet pending; blocking call requested?
if not timeout:
self.pkt_sync.release()
return None, None, None
# Desired packet isn't available and timeout is specified
# Already holding pkt_sync; wait on pkt_sync variable
self.want_pkt = True
self.want_pkt_port = port_number
self.got_pkt_port = None
self.pkt_sync.wait(timeout)
self.want_pkt = False
if self.got_pkt_port:
pkt, time = \
self.port_list[self.got_pkt_port].dequeue(use_lock=False)
self.pkt_sync.release()
oft_assert(pkt, "Poll: pkt reported, but not found at " +
self.got_pkt_port)
return self.got_pkt_port, pkt, time
self.pkt_sync.release()
self.dbg(DEBUG_VERBOSE, "Poll time out, no packet")
return None, None, None
def kill(self, join_threads=False):
"""
Close all sockets for dataplane
@param join_threads If True call join on each thread
"""
for port_number in self.port_list.keys():
self.port_list[port_number].kill()
if join_threads:
self.dbg(DEBUG_INFO, "Joining " + str(port_number))
self.port_list[port_number].join()
self.dbg(DEBUG_INFO, "DataPlane shutdown")
def show(self, prefix=''):
print prefix + "Dataplane Controller"
print prefix + "Packets pending" + str(self.packets_pending)
for pnum, port in self.port_list.items():
print prefix + "OpenFlow Port Number " + str(pnum)
port.show(prefix + ' ')