dataplane: single-threaded rewrite
There's now just one thread for the dataplane instead of one per port. This
greatly reduces the amount of code needed for each port implementation.
diff --git a/src/python/oftest/base_tests.py b/src/python/oftest/base_tests.py
index 7f29db5..db9e4a9 100644
--- a/src/python/oftest/base_tests.py
+++ b/src/python/oftest/base_tests.py
@@ -112,7 +112,7 @@
logging.info("Teardown for simple dataplane test")
SimpleProtocol.tearDown(self)
if hasattr(self, 'dataplane'):
- self.dataplane.kill(join_threads=self.clean_shutdown)
+ self.dataplane.kill()
del self.dataplane
logging.info("Teardown done")
@@ -136,7 +136,7 @@
def tearDown(self):
logging.info("Teardown for simple dataplane test")
- self.dataplane.kill(join_threads=self.clean_shutdown)
+ self.dataplane.kill()
del self.dataplane
logging.info("Teardown done")
diff --git a/src/python/oftest/dataplane.py b/src/python/oftest/dataplane.py
index 6da8979..d53e223 100644
--- a/src/python/oftest/dataplane.py
+++ b/src/python/oftest/dataplane.py
@@ -36,11 +36,6 @@
except:
pass
-##@todo Find a better home for these identifiers (dataplane)
-RCV_SIZE_DEFAULT = 4096
-ETH_P_ALL = 0x03
-RCV_TIMEOUT = 10000
-
def match_exp_pkt(exp_pkt, pkt):
"""
Compare the string value of pkt with the string value of exp_pkt,
@@ -55,204 +50,66 @@
return e == p
-class DataPlanePort(Thread):
+class DataPlanePort:
"""
- Class defining a port monitoring object.
-
- Control a dataplane port connected to the switch under test.
- Creates a promiscuous socket on a physical interface.
- Queues the packets received on that interface with time stamps.
- Inherits from Thread class as meant to run in background. Also
- supports polling.
-
- Currently assumes a controlling 'parent' which maintains a
- common Lock object and a total packet-pending count. May want
- to decouple that some day.
+ Uses raw sockets to capture and send packets on a network interface.
"""
- def __init__(self, interface_name, port_number, parent, max_pkts=1024):
+ RCV_SIZE_DEFAULT = 4096
+ ETH_P_ALL = 0x03
+ RCV_TIMEOUT = 10000
+
+ def __init__(self, interface_name, port_number):
"""
- Set up a port monitor object
@param interface_name The name of the physical interface like eth1
- @param port_number The port number associated with this port
- @param parent The controlling dataplane object; for pkt wait CV
- @param max_pkts Maximum number of pkts to keep in queue
"""
- Thread.__init__(self)
self.interface_name = interface_name
- self.max_pkts = max_pkts
- self.packets_total = 0
- self.packets = []
- self.packets_discarded = 0
- self.port_number = port_number
- logname = "dp-" + interface_name
- self.logger = logging.getLogger(logname)
- try:
- self.socket = self.interface_open(interface_name)
- except:
- self.logger.info("Could not open socket")
- raise
- self.logger.info("Opened port monitor (class %s)", type(self).__name__)
- self.parent = parent
- self.killed = False
+ self.socket = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
+ socket.htons(self.ETH_P_ALL))
+ self.socket.bind((interface_name, 0))
+ netutils.set_promisc(self.socket, interface_name)
+ self.socket.settimeout(self.RCV_TIMEOUT)
- # Used to wake up the event loop in kill()
- self.waker = EventDescriptor()
-
- def interface_open(self, interface_name):
- """
- Open a socket in a promiscuous mode for a data connection.
- @param interface_name port name as a string such as 'eth1'
- @retval s socket
- """
- s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
- socket.htons(ETH_P_ALL))
- s.bind((interface_name, 0))
- netutils.set_promisc(s, interface_name)
- s.settimeout(RCV_TIMEOUT)
- return s
-
- def run(self):
- """
- Activity function for class
- """
- self.socs = [self.socket, self.waker]
- error_warned = False # Have we warned about error?
- while not self.killed:
- try:
- sel_in, sel_out, sel_err = \
- select.select(self.socs, [], [], 1)
- except:
- print sys.exc_info()
- self.logger.error("Select error, exiting")
- break
-
- if (sel_in is None) or (len(sel_in) == 0):
- continue
-
- if self.waker in sel_in:
- self.waker.wait()
- continue
-
- try:
- rcvmsg = self.socket.recv(RCV_SIZE_DEFAULT)
- except socket.error:
- if not error_warned:
- self.logger.info("Socket error on recv")
- error_warned = True
- continue
-
- if len(rcvmsg) == 0:
- self.logger.info("Zero len pkt rcvd")
- self.kill()
- break
-
- rcvtime = time.time()
- self.logger.debug("Pkt len " + str(len(rcvmsg)) +
- " in at " + str(rcvtime) + " on port " +
- str(self.port_number))
-
- # Enqueue packet
- with self.parent.pkt_sync:
- if len(self.packets) >= self.max_pkts:
- # Queue full, throw away oldest
- self.packets.pop(0)
- self.packets_discarded += 1
- self.logger.debug("Discarding oldest packet to make room")
- self.packets.append((rcvmsg, rcvtime))
- self.packets_total += 1
- self.parent.pkt_sync.notify_all()
-
- self.logger.info("Thread exit")
-
- def kill(self):
- """
- Terminate the running thread
- """
- self.logger.debug("Port monitor kill")
- self.killed = True
- self.waker.notify()
- try:
+ def __del__(self):
+ if self.socket:
self.socket.close()
- except:
- self.logger.info("Ignoring dataplane soc shutdown error")
- def timestamp_head(self):
+ def fileno(self):
"""
- Return the timestamp of the head of queue or None if empty
+ Return an integer file descriptor that can be passed to select(2).
"""
- rv = None
- try:
- rv = self.packets[0][1]
- except:
- rv = None
- return rv
+ return self.socket.fileno()
- def flush(self):
+ def recv(self):
"""
- Clear the packet queue
+ Receive a packet from this port.
+ @retval (packet data, timestamp)
"""
- with self.parent.pkt_sync:
- self.packets_discarded += len(self.packets)
- self.packets = []
+ pkt = self.socket.recv(self.RCV_SIZE_DEFAULT)
+ return (pkt, time.time())
def send(self, packet):
"""
- Send a packet to the dataplane port
+ Send a packet out this port.
@param packet The packet data to send to the port
@retval The number of bytes sent
"""
return self.socket.send(packet)
- def register(self, handler):
+ def down(self):
"""
- Register a callback function to receive packets from this
- port. The callback will be passed the packet, the
- interface name and the port number (if set) on which the
- packet was received.
-
- To be implemented
+ Bring the physical link down.
"""
- pass
+ os.system("ifconfig down %s" % self.interface_name)
- def show(self, prefix=''):
-
- print prefix + "Name: " + self.interface_name
- print prefix + "Pkts pending: " + str(len(self.packets))
- print prefix + "Pkts total: " + str(self.packets_total)
- print prefix + "socket: " + str(self.socket)
-
-
- def port_down(self,port_number,config):
-
+ def up(self):
"""
- Grabs a port from the dataplane ports and brings it down by
- shutting the corresponding interface
- @port_number The port number which has brought to be down
- @interface_name The interface corresponding to the port that needs to
- be brought down
-
+ Bring the physical link up.
"""
- interface_name = config["port_map"].get(port_number)
- cmd = 'ifdown '+ interface_name
- os.system(cmd)
-
- def port_up(self,port_number,config):
-
- """
- Grabs a port from the dataplane ports and brings it up by
- starting up the corresponding interface
- @port_number The port number which has to brought up
- @interface_name The interface corresponding to the port that has to
- be brought up
-
- """
- interface_name = config["port_map"].get(port_number)
- cmd = 'ifup '+ interface_name
- os.system(cmd)
+ os.system("ifconfig up %s" % self.interface_name)
-class DataPlanePortPcap(DataPlanePort):
+class DataPlanePortPcap:
"""
Alternate port implementation using libpcap. This is required for recent
versions of Linux (such as Linux 3.2 included in Ubuntu 12.04) which
@@ -260,81 +117,53 @@
socket. libpcap understands how to read the VLAN tag from the kernel.
"""
- def __init__(self, interface_name, port_number, parent, max_pkts=1024):
- DataPlanePort.__init__(self, interface_name, port_number, parent, max_pkts)
-
- def interface_open(self, interface_name):
- """
- Open a PCAP interface.
- """
+ def __init__(self, interface_name, port_number):
self.pcap = pcap.pcap(interface_name)
self.pcap.setnonblock()
+
+ def fileno(self):
return self.pcap.fileno()
- def run(self):
- """
- Activity function for class
- """
- while not self.killed:
- try:
- sel_in, sel_out, sel_err = select.select([self.socket, self.waker], [], [], 1)
- except:
- print sys.exc_info()
- self.logger.error("Select error, exiting")
- break
-
- if (sel_in is None) or (len(sel_in) == 0):
- continue
-
- if self.waker in sel_in:
- self.waker.wait()
- continue
-
- # Enqueue packet
- with self.parent.pkt_sync:
- for (timestamp, rcvmsg) in self.pcap.readpkts():
- self.logger.debug("Pkt len " + str(len(rcvmsg)) +
- " in at " + str(timestamp) + " on port " +
- str(self.port_number))
-
- if len(self.packets) >= self.max_pkts:
- # Queue full, throw away oldest
- self.packets.pop(0)
- self.packets_discarded += 1
- self.logger.debug("Discarding oldest packet to make room")
- self.packets.append((rcvmsg, timestamp))
- self.packets_total += 1
- self.parent.pkt_sync.notify_all()
-
- self.logger.info("Thread exit")
-
- def kill(self):
- """
- Terminate the running thread
- """
- self.logger.debug("Port monitor kill")
- self.killed = True
- self.waker.notify()
- # pcap object is closed on GC.
+ def recv(self):
+ (timestamp, pkt) = next(self.pcap)
+ return (pkt, timestamp)
def send(self, packet):
- """
- Send a packet to the dataplane port
- @param packet The packet data to send to the port
- @retval The number of bytes sent
- """
return self.pcap.inject(packet, len(packet))
-class DataPlane:
+ def down(self):
+ pass
+
+ def up(self):
+ pass
+
+class DataPlane(Thread):
"""
- Class defining access primitives to the data plane
- Controls a list of DataPlanePort objects
+ This class provides methods to send and receive packets on the dataplane.
+ It uses the DataPlanePort class, or an alternative implementation of that
+ interface, to do IO on a particular port. A background thread is used to
+ read packets from the dataplane ports and enqueue them to be read by the
+ test. The kill() method must be called to shutdown this thread.
"""
+
+ MAX_QUEUE_LEN = 100
+
def __init__(self, config=None):
- self.port_list = {}
- # pkt_sync serves double duty as a regular top level lock and
+ Thread.__init__(self)
+
+ # dict from port number to port object
+ self.ports = {}
+
+ # dict from port number to list of (timestamp, packet)
+ self.packet_queues = {}
+
+ # cvar serves double duty as a regular top level lock and
# as a condition variable
- self.pkt_sync = Condition()
+ self.cvar = Condition()
+
+ # Used to wake up the event loop from another thread
+ self.waker = EventDescriptor()
+ self.killed = False
self.logger = logging.getLogger("dataplane")
@@ -361,18 +190,54 @@
self.logger.warning("Missing pypcap, VLAN tests may fail. See README for installation instructions.")
self.dppclass = DataPlanePort
+ self.start()
+
+ def run(self):
+ """
+ Activity function for class
+ """
+ while not self.killed:
+ sockets = [self.waker] + self.ports.values()
+ try:
+ sel_in, sel_out, sel_err = select.select(sockets, [], [], 1)
+ except:
+ print sys.exc_info()
+ self.logger.error("Select error, exiting")
+ break
+
+ with self.cvar:
+ for port in sel_in:
+ if port == self.waker:
+ self.waker.wait()
+ continue
+ else:
+ # Enqueue packet
+ pkt, timestamp = port.recv()
+ port_number = port._port_number
+ self.logger.debug("Pkt len %d in on port %d",
+ len(pkt), port_number)
+ queue = self.packet_queues[port_number]
+ if len(queue) >= self.MAX_QUEUE_LEN:
+ # Queue full, throw away oldest
+ queue.pop(0)
+ self.logger.debug("Discarding oldest packet to make room")
+ queue.append((pkt, timestamp))
+ self.cvar.notify_all()
+
+ self.logger.info("Thread exit")
+
def port_add(self, interface_name, port_number):
"""
Add a port to the dataplane
- TBD: Max packets for queue?
@param interface_name The name of the physical interface like eth1
@param port_number The port number used to refer to the port
+ Stashes the port number on the created port object.
"""
-
- self.port_list[port_number] = self.dppclass(interface_name,
- port_number, self);
-
- self.port_list[port_number].start()
+ self.ports[port_number] = self.dppclass(interface_name, port_number)
+ self.ports[port_number]._port_number = port_number
+ self.packet_queues[port_number] = []
+ # Need to wake up event loop to change the sockets being selected on.
+ self.waker.notify()
def send(self, port_number, packet):
"""
@@ -382,40 +247,44 @@
"""
self.logger.debug("Sending %d bytes to port %d" %
(len(packet), port_number))
- bytes = self.port_list[port_number].send(packet)
+ bytes = self.ports[port_number].send(packet)
if bytes != len(packet):
self.logger.error("Unhandled send error, length mismatch %d != %d" %
(bytes, len(packet)))
return bytes
- # Returns the port with the oldest packet, or None if no packets are queued.
- def oldest_port(self):
- min_port = None
+ def oldest_port_number(self):
+ """
+ Returns the port number with the oldest packet, or
+ None if no packets are queued.
+ """
+ min_port_number = None
min_time = float('inf')
- for port in self.port_list.values():
- ptime = port.timestamp_head()
- if ptime and ptime < min_time:
- min_time = ptime
- min_port = port
- return min_port
+ for (port_number, queue) in self.packet_queues.items():
+ if queue and queue[0][1] < min_time:
+ min_time = queue[0][1]
+ min_port_number = port_number
+ return min_port_number
# Dequeues and yields packets in the order they were received.
- # Yields (port, packet, received time).
+ # Yields (port number, packet, received time).
# If port_number is not specified yields packets from all ports.
def packets(self, port_number=None):
while True:
- if port_number == None:
- port = self.oldest_port()
- else:
- port = self.port_list[port_number]
+ rcv_port_number = port_number or self.oldest_port_number()
- if port == None or len(port.packets) == 0:
- self.logger.debug("Out of packets for port %s" % str(port_number))
- # Out of packets
+ if rcv_port_number == None:
+ self.logger.debug("Out of packets on all ports")
break
- pkt, time = port.packets.pop(0)
- yield (port, pkt, time)
+ queue = self.packet_queues[rcv_port_number]
+
+ if len(queue) == 0:
+ self.logger.debug("Out of packets on port %d", rcv_port_number)
+ break
+
+ pkt, time = queue.pop(0)
+ yield (rcv_port_number, pkt, time)
def poll(self, port_number=None, timeout=-1, exp_pkt=None):
"""
@@ -444,50 +313,37 @@
# Retrieve the packet. Returns (port number, packet, time).
def grab():
self.logger.debug("Grabbing packet")
- for (port, pkt, time) in self.packets(port_number):
- self.logger.debug("Checking packet from port %d" % port.port_number)
+ for (rcv_port_number, pkt, time) in self.packets(port_number):
+ self.logger.debug("Checking packet from port %d", rcv_port_number)
if not exp_pkt or match_exp_pkt(exp_pkt, pkt):
- return (port, pkt, time)
+ return (rcv_port_number, pkt, time)
self.logger.debug("Did not find packet")
return None
- with self.pkt_sync:
- ret = timed_wait(self.pkt_sync, grab, timeout=timeout)
+ with self.cvar:
+ ret = timed_wait(self.cvar, grab, timeout=timeout)
if ret != None:
- (port, pkt, time) = ret
- return (port.port_number, pkt, time)
+ return ret
else:
self.logger.debug("Poll time out, no packet from " + str(port_number))
return (None, None, None)
- def kill(self, join_threads=True):
+ def kill(self):
"""
- Close all sockets for dataplane
- @param join_threads If True call join on each thread
+ Stop the dataplane thread.
"""
- for port_number in self.port_list.keys():
- self.port_list[port_number].kill()
+ self.killed = True
+ self.waker.notify()
+ self.join()
+ # Explicitly release ports to ensure we don't run out of sockets
+ # even if someone keeps holding a reference to the dataplane.
+ del self.ports
- if join_threads:
- for port_number in self.port_list.keys():
- self.logger.debug("Joining " + str(port_number))
- self.port_list[port_number].join()
-
- self.port_list = None
-
- self.logger.info("DataPlane shutdown")
-
- def show(self, prefix=''):
- print prefix + "Dataplane Controller"
- for pnum, port in self.port_list.items():
- print prefix + "OpenFlow Port Number " + str(pnum)
- port.show(prefix + ' ')
-
- def port_down(self,port_number):
+ def port_down(self, port_number):
"""Brings the specified port down"""
- self.port_list[port_number].port_down(port_number,self.config)
+ self.ports[port_number].down()
- def port_up(self,port_number):
+ def port_up(self, port_number):
"""Brings the specified port up"""
- self.port_list[port_number].port_up(port_number,self.config)
+ self.ports[port_number].up()