Mostly polling and timeout support
Support poll timeouts for controller messages
Support poll and timeouts for dataplane messages
Changed name of dataplane pkt get to 'poll'
Six basic test cases now passing on LB4G
Added test-framework assertion
Added additional files to lint checking
diff --git a/src/python/oftest/dataplane.py b/src/python/oftest/dataplane.py
index 44ec997..bdbfea7 100644
--- a/src/python/oftest/dataplane.py
+++ b/src/python/oftest/dataplane.py
@@ -7,7 +7,7 @@
to stimulate the switch under test.
See the class dataplaneport for more details. This class wraps
-a set of those objects allowing general calls and parsing
+a set of those objects allowing general calls and parsing
configuration.
@todo Add "filters" for matching packets. Actions supported
@@ -21,6 +21,7 @@
import netutils
from threading import Thread
from threading import Lock
+from threading import Condition
from oft_config import *
import select
@@ -29,21 +30,6 @@
RCV_TIMEOUT = 10000
RCV_SIZE = 4096
-# class packet_queue:
-# """
-# Class defining a packet queue across multiple ports
-
-# Items in the queue are stored as a triple (port number, pkt, pkt in time)
-# """
-
-# def __init__(self, max_pkts=1024):
-# self.sync = Lock()
-# self.debug_level = debug_level_default
-# self.packets = []
-# self.max_pkts = max_pkts
-# self.packets_total = 0
-# self.packets_discarded = 0
-
class DataPlanePort(Thread):
"""
Class defining a port monitoring object.
@@ -54,12 +40,17 @@
Inherits from Thread class as meant to run in background. Also
supports polling.
Use accessors to dequeue packets for proper synchronization.
+
+ Currently assumes a controlling 'parent' which maintains a
+ common Lock object and a total packet-pending count. May want
+ to decouple that some day.
"""
- def __init__(self, interface_name, max_pkts=1024):
+ def __init__(self, interface_name, port_number, parent, max_pkts=1024):
"""
Set up a port monitor object
@param interface_name The name of the physical interface like eth1
+ @param parent The controlling dataplane object; for pkt wait CV
@param max_pkts Maximum number of pkts to keep in queue
"""
Thread.__init__(self)
@@ -69,12 +60,14 @@
self.packets_total = 0
self.packets = []
self.packets_discarded = 0
- self.sync = Lock()
+ self.port_number = port_number
self.socket = self.interface_open(interface_name)
self.dbg(DEBUG_INFO, "Openned port monitor socket")
+ self.parent = parent
+ self.pkt_sync = self.parent.pkt_sync
def dbg(self, level, string):
- debug_log("DPLANE", self.debug_level, level,
+ debug_log("DPLANE", self.debug_level, level,
self.interface_name + ": " + string)
def interface_open(self, interface_name):
@@ -83,7 +76,7 @@
@param interface_name port name as a string such as 'eth1'
@retval s socket
"""
- s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
+ s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
socket.htons(ETH_P_ALL))
s.bind((interface_name, 0))
netutils.set_promisc(s, interface_name)
@@ -129,16 +122,27 @@
break
rcvtime = time.clock()
- self.dbg(DEBUG_VERBOSE, "Pkt len " + str(len(rcvmsg)) +
+ self.dbg(DEBUG_VERBOSE, "Pkt len " + str(len(rcvmsg)) +
" in at " + str(rcvtime))
- self.sync.acquire()
+ # Enqueue packet
+ self.pkt_sync.acquire()
if len(self.packets) >= self.max_pkts:
+ # Queue full, throw away oldest
self.packets.pop(0)
self.packets_discarded += 1
+ else:
+ self.parent.packets_pending += 1
+ # Check if parent is waiting on this (or any) port
+ if self.parent.want_pkt:
+ if (not self.parent.want_pkt_port or
+ self.parent.want_pkt_port == self.port_number):
+ self.parent.got_pkt_port = self.port_number
+ self.parent.want_pkt = False
+ self.parent.want_pkt.notify()
self.packets.append((rcvmsg, rcvtime))
self.packets_total += 1
- self.sync.release()
+ self.pkt_sync.release()
self.dbg(DEBUG_INFO, "Thread exit ")
@@ -151,16 +155,24 @@
self.socket.close()
except:
self.dbg(DEBUG_INFO, "Ignoring dataplane soc shutdown error")
- self.dbg(DEBUG_INFO,
- "Port monitor exiting")
+ self.dbg(DEBUG_INFO, "Port monitor exiting")
- def dequeue(self):
+ def dequeue(self, use_lock=True):
"""
Get the oldest packet in the queue
+ @param use_lock If True, acquires the packet sync lock (which is
+ really the parent's lock)
+ @return The pair packet, packet time-stamp
"""
- self.sync.acquire()
- pkt, pkt_time = self.packets.pop(0)
- self.sync.release()
+ if use_lock:
+ self.pkt_sync.acquire()
+ if len(self.packets) > 0:
+ pkt, pkt_time = self.packets.pop(0)
+ self.parent.packets_pending -= 1
+ else:
+ pkt = pkt_time = None
+ if use_lock:
+ self.pkt_sync.release()
return pkt, pkt_time
def timestamp_head(self):
@@ -168,21 +180,22 @@
Return the timestamp of the head of queue or None if empty
"""
rv = None
- self.sync.acquire()
- if len(self.packets) > 0:
+ try:
rv = self.packets[0][1]
- self.sync.release()
+ except:
+ rv = None
return rv
def flush(self):
"""
Clear the packet queue
"""
- self.sync.acquire()
+ self.pkt_sync.acquire()
self.packets_discarded += len(self.packets)
+ self.parent.packets_pending -= len(self.packets)
self.packets = []
self.packet_times = []
- self.sync.release()
+ self.pkt_sync.release()
def send(self, packet):
@@ -199,8 +212,8 @@
def register(self, handler):
"""
Register a callback function to receive packets from this
- port. The callback will be passed the packet, the
- interface name and the port number (if set) on which the
+ port. The callback will be passed the packet, the
+ interface name and the port number (if set) on which the
packet was received.
To be implemented
@@ -212,7 +225,7 @@
print prefix + "Pkts pending: " + str(len(self.packets))
print prefix + "Pkts total: " + str(self.packets_total)
print prefix + "socket: " + str(self.socket)
-
+
class DataPlane:
"""
@@ -222,6 +235,15 @@
def __init__(self):
self.port_list = {}
self.debug_level = debug_level_default
+ # pkt_sync serves double duty as a regular top level lock and
+ # as a condition variable
+ self.pkt_sync = Condition()
+
+ # These are used to signal async pkt arrival for polling
+ self.want_pkt = False
+ self.want_pkt_port = None # What port required (or None)
+ self.got_pkt_port = None # On what port received?
+ self.packets_pending = 0 # Total pkts in all port queues
def dbg(self, level, string):
debug_log("DPORT", self.debug_level, level, string)
@@ -233,8 +255,9 @@
@param interface_name The name of the physical interface like eth1
@param port_number The port number used to refer to the port
"""
-
- self.port_list[port_number] = DataPlanePort(interface_name)
+
+ self.port_list[port_number] = DataPlanePort(interface_name,
+ port_number, self)
self.port_list[port_number].start()
def send(self, port_number, packet):
@@ -247,7 +270,7 @@
"Sending %d bytes to port %d" % (len(packet), port_number))
bytes = self.port_list[port_number].send(packet)
if bytes != len(packet):
- self.dbg(DEBUG_ERROR,"Unhandled send error, " +
+ self.dbg(DEBUG_ERROR,"Unhandled send error, " +
"length mismatch %d != %d" %
(bytes, len(packet)))
return bytes
@@ -264,41 +287,77 @@
", port %d, length mismatch %d != %d" %
(port_number, bytes, len(packet)))
- def packet_get(self, port_number=None):
- """
- Get a packet from the data plane
-
- If port_number is given, get the oldest packet from that port.
- Otherwise, find the port with the oldest packet and return
- that packet.
- @param port_number If set, get packet from this port
- @retval The triple port_number, packet, pkt_time where packet
- is received from port_number at time pkt_time.
- """
-
- if port_number:
- if len(self.port_list[port_number].packets) != 0:
- pkt, time = self.port_list[port_number].dequeue()
- return port_number, pkt, time
- else:
- return None, None, None
-
+ def _oldest_packet_find(self):
# Find port with oldest packet
- #@todo Consider using a single queue for all ports
min_time = 0
min_port = -1
for port_number in self.port_list.keys():
ptime = self.port_list[port_number].timestamp_head()
if ptime:
if (min_port == -1) or (ptime < min_time):
- min_time = ptime
+ min_time = ptime
min_port = port_number
+ oft_assert(min_port != -1, "Could not find port when pkts pending")
- if min_port == -1:
+ return min_port
+
+ def poll(self, port_number=None, timeout=None):
+ """
+ Poll one or all dataplane ports for a packet
+
+ If port_number is given, get the oldest packet from that port.
+ Otherwise, find the port with the oldest packet and return
+ that packet.
+ @param port_number If set, get packet from this port
+ @param timeout If positive and no packet is available, block
+ until a packet is received or for this many seconds
+ @return The triple port_number, packet, pkt_time where packet
+ is received from port_number at time pkt_time. If a timeout
+ occurs, return None, None, None
+ """
+
+ self.pkt_sync.acquire()
+
+ # Check if requested specific port and it has a packet
+ if port_number and len(self.port_list[port_number].packets) != 0:
+ pkt, time = self.port_list[port_number].dequeue(use_lock=False)
+ self.pkt_sync.release()
+ oft_assert(pkt, "Poll: packet not found on port " +
+ str(port_number))
+ return port_number, pkt, time
+
+ # Check if requested any port and some packet pending
+ if not port_number and self.packets_pending != 0:
+ port = self._oldest_packet_find()
+ pkt, time = self.port_list[port].dequeue(use_lock=False)
+ self.pkt_sync.release()
+ oft_assert(pkt, "Poll: oldest packet not found")
+ return port, pkt, time
+
+ # No packet pending; blocking call requested?
+ if not timeout:
+ self.pkt_sync.release()
return None, None, None
- pkt, time = self.port_list[min_port].dequeue()
- return min_port, pkt, time
+ # Desired packet isn't available and timeout is specified
+ # Already holding pkt_sync; wait on pkt_sync variable
+ self.want_pkt = True
+ self.want_pkt_port = port_number
+ self.got_pkt_port = None
+ self.pkt_sync.wait(timeout)
+ self.want_pkt = False
+ if self.got_pkt_port:
+ pkt, time = \
+ self.port_list[self.got_pkt_port].dequeue(use_lock=False)
+ self.pkt_sync.release()
+ oft_assert(pkt, "Poll: pkt reported, but not found at " +
+ self.got_pkt_port)
+ return self.got_pkt_port, pkt, time
+
+ self.pkt_sync.release()
+ self.dbg(DEBUG_VERBOSE, "Poll time out, no packet")
+
+ return None, None, None
def kill(self, join_threads=False):
"""
@@ -315,6 +374,7 @@
def show(self, prefix=''):
print prefix + "Dataplane Controller"
+ print prefix + "Packets pending" + str(self.packets_pending)
for pnum, port in self.port_list.items():
print prefix + "OpenFlow Port Number " + str(pnum)
port.show(prefix + ' ')