fix usage of condition variables in dataplane
diff --git a/src/python/oftest/dataplane.py b/src/python/oftest/dataplane.py
index 4611aa2..6b69589 100644
--- a/src/python/oftest/dataplane.py
+++ b/src/python/oftest/dataplane.py
@@ -25,6 +25,7 @@
import select
import logging
from oft_assert import oft_assert
+from ofutils import *
##@todo Find a better home for these identifiers (dataplane)
RCV_SIZE_DEFAULT = 4096
@@ -54,7 +55,6 @@
Queues the packets received on that interface with time stamps.
Inherits from Thread class as meant to run in background. Also
supports polling.
- Use accessors to dequeue packets for proper synchronization.
Currently assumes a controlling 'parent' which maintains a
common Lock object and a total packet-pending count. May want
@@ -85,7 +85,6 @@
sys.exit(1)
self.logger.info("Openned port monitor socket")
self.parent = parent
- self.pkt_sync = self.parent.pkt_sync
def interface_open(self, interface_name):
"""
@@ -141,31 +140,17 @@
str(self.port_number))
# Enqueue packet
- self.pkt_sync.acquire()
- if len(self.packets) >= self.max_pkts:
- # Queue full, throw away oldest
- self.packets.pop(0)
- self.packets_discarded += 1
- else:
- self.parent.packets_pending += 1
- # Check if parent is waiting on this (or any) port
- drop_pkt = False
- if self.parent.want_pkt:
- if (not self.parent.want_pkt_port or
- self.parent.want_pkt_port == self.port_number):
- if self.parent.exp_pkt:
- if not match_exp_pkt(self.parent.exp_pkt, rcvmsg):
- drop_pkt = True
- if not drop_pkt:
- self.parent.got_pkt_port = self.port_number
- self.parent.want_pkt = False
- self.parent.pkt_sync.notify()
- if not drop_pkt:
+ with self.parent.pkt_sync:
+ if len(self.packets) >= self.max_pkts:
+ # Queue full, throw away oldest
+ self.packets.pop(0)
+ self.packets_discarded += 1
+ self.logger.debug("Discarding oldest packet to make room")
self.packets.append((rcvmsg, rcvtime))
self.packets_total += 1
- self.pkt_sync.release()
+ self.parent.pkt_sync.notify_all()
- self.logger.info("Thread exit ")
+ self.logger.info("Thread exit")
def kill(self):
"""
@@ -178,24 +163,6 @@
except:
self.logger.info("Ignoring dataplane soc shutdown error")
- def dequeue(self, use_lock=True):
- """
- Get the oldest packet in the queue
- @param use_lock If True, acquires the packet sync lock (which is
- really the parent's lock)
- @return The pair packet, packet time-stamp
- """
- if use_lock:
- self.pkt_sync.acquire()
- if len(self.packets) > 0:
- pkt, pkt_time = self.packets.pop(0)
- self.parent.packets_pending -= 1
- else:
- pkt = pkt_time = None
- if use_lock:
- self.pkt_sync.release()
- return pkt, pkt_time
-
def timestamp_head(self):
"""
Return the timestamp of the head of queue or None if empty
@@ -211,13 +178,9 @@
"""
Clear the packet queue
"""
- self.pkt_sync.acquire()
- self.packets_discarded += len(self.packets)
- self.parent.packets_pending -= len(self.packets)
- self.packets = []
- self.packet_times = []
- self.pkt_sync.release()
-
+ with self.parent.pkt_sync:
+ self.packets_discarded += len(self.packets)
+ self.packets = []
def send(self, packet):
"""
@@ -338,18 +301,39 @@
def _oldest_packet_find(self):
# Find port with oldest packet
- min_time = 0
- min_port = -1
- for port_number in self.port_list.keys():
- ptime = self.port_list[port_number].timestamp_head()
- if ptime:
- if (min_port == -1) or (ptime < min_time):
- min_time = ptime
- min_port = port_number
oft_assert(min_port != -1, "Could not find port when pkts pending")
return min_port
+ # Returns the port with the oldest packet, or None if no packets are queued.
+ def oldest_port(self):
+ min_port = None
+ min_time = float('inf')
+ for port in self.port_list.values():
+ ptime = port.timestamp_head()
+ if ptime and ptime < min_time:
+ min_time = ptime
+ min_port = port
+ return min_port
+
+ # Dequeues and yields packets in the order they were received.
+ # Yields (port, packet, received time).
+ # If port_number is not specified yields packets from all ports.
+ def packets(self, port_number=None):
+ while True:
+ if port_number == None:
+ port = self.oldest_port()
+ else:
+ port = self.port_list[port_number]
+
+ if port == None or len(port.packets) == 0:
+ self.logger.debug("Out of packets for port %s" % str(port_number))
+ # Out of packets
+ break
+
+ pkt, time = port.packets.pop(0)
+ yield (port, pkt, time)
+
def poll(self, port_number=None, timeout=None, exp_pkt=None):
"""
Poll one or all dataplane ports for a packet
@@ -371,63 +355,28 @@
occurs, return None, None, None
"""
-
if exp_pkt and not port_number:
self.logger.warn("Dataplane poll with exp_pkt but no port number")
- self.pkt_sync.acquire()
-
- # Check if requested specific port and it has a packet
- if port_number and len(self.port_list[port_number].packets) != 0:
- while len(self.port_list[port_number].packets) != 0:
- pkt, time = self.port_list[port_number].dequeue(use_lock=False)
- if not exp_pkt:
- break
- if match_exp_pkt(exp_pkt, pkt):
- break
- pkt = None # Discard silently
- if pkt:
- self.pkt_sync.release()
- oft_assert(pkt, "Poll: packet not found on port " +
- str(port_number))
- return port_number, pkt, time
-
- # Check if requested any port and some packet pending
- if not port_number:
- while self.packets_pending != 0:
- port = self._oldest_packet_find()
- pkt, time = self.port_list[port].dequeue(use_lock=False)
- self.pkt_sync.release()
- oft_assert(pkt, "Poll: oldest packet not found")
+ # Retrieve the packet. Returns (port number, packet, time).
+ def grab():
+ self.logger.debug("Grabbing packet")
+ for (port, pkt, time) in self.packets(port_number):
+ self.logger.debug("Checking packet from port %d" % port.port_number)
if not exp_pkt or match_exp_pkt(exp_pkt, pkt):
- return port, pkt, time
+ return (port, pkt, time)
+ self.logger.debug("Did not find packet")
+ return None
- # No packet pending; blocking call requested?
- if not timeout:
- self.pkt_sync.release()
- return None, None, None
+ with self.pkt_sync:
+ ret = timed_wait(self.pkt_sync, grab, timeout=timeout)
- # Desired packet isn't available and timeout is specified
- # Already holding pkt_sync; wait on pkt_sync variable
- self.want_pkt = True
- self.exp_pkt = exp_pkt
- self.want_pkt_port = port_number
- self.got_pkt_port = None
- self.pkt_sync.wait(timeout)
- self.want_pkt = False
- self.exp_pkt = None
- if self.got_pkt_port:
- pkt, time = \
- self.port_list[self.got_pkt_port].dequeue(use_lock=False)
- self.pkt_sync.release()
- oft_assert(pkt, "Poll: pkt reported, but not found at " +
- str(self.got_pkt_port))
- return self.got_pkt_port, pkt, time
-
- self.pkt_sync.release()
- self.logger.debug("Poll time out, no packet from " + str(port_number))
-
- return None, None, None
+ if ret != None:
+ (port, pkt, time) = ret
+ return (port.port_number, pkt, time)
+ else:
+ self.logger.debug("Poll time out, no packet from " + str(port_number))
+ return (None, None, None)
def kill(self, join_threads=True):
"""