fix usage of condition variables in dataplane
diff --git a/src/python/oftest/dataplane.py b/src/python/oftest/dataplane.py
index 4611aa2..6b69589 100644
--- a/src/python/oftest/dataplane.py
+++ b/src/python/oftest/dataplane.py
@@ -25,6 +25,7 @@
 import select
 import logging
 from oft_assert import oft_assert
+from ofutils import *
 
 ##@todo Find a better home for these identifiers (dataplane)
 RCV_SIZE_DEFAULT = 4096
@@ -54,7 +55,6 @@
     Queues the packets received on that interface with time stamps.
     Inherits from Thread class as meant to run in background.  Also
     supports polling.
-    Use accessors to dequeue packets for proper synchronization.
 
     Currently assumes a controlling 'parent' which maintains a
     common Lock object and a total packet-pending count.  May want
@@ -85,7 +85,6 @@
             sys.exit(1)
         self.logger.info("Openned port monitor socket")
         self.parent = parent
-        self.pkt_sync = self.parent.pkt_sync
 
     def interface_open(self, interface_name):
         """
@@ -141,31 +140,17 @@
                      str(self.port_number))
 
             # Enqueue packet
-            self.pkt_sync.acquire()
-            if len(self.packets) >= self.max_pkts:
-                # Queue full, throw away oldest
-                self.packets.pop(0)
-                self.packets_discarded += 1
-            else:
-                self.parent.packets_pending += 1
-            # Check if parent is waiting on this (or any) port
-            drop_pkt = False
-            if self.parent.want_pkt:
-                if (not self.parent.want_pkt_port or
-                        self.parent.want_pkt_port == self.port_number):
-                    if self.parent.exp_pkt:
-                        if not match_exp_pkt(self.parent.exp_pkt, rcvmsg):
-                            drop_pkt = True
-                    if not drop_pkt:
-                        self.parent.got_pkt_port = self.port_number
-                        self.parent.want_pkt = False
-                        self.parent.pkt_sync.notify()
-            if not drop_pkt:
+            with self.parent.pkt_sync:
+                if len(self.packets) >= self.max_pkts:
+                    # Queue full, throw away oldest
+                    self.packets.pop(0)
+                    self.packets_discarded += 1
+                    self.logger.debug("Discarding oldest packet to make room")
                 self.packets.append((rcvmsg, rcvtime))
                 self.packets_total += 1
-            self.pkt_sync.release()
+                self.parent.pkt_sync.notify_all()
 
-        self.logger.info("Thread exit ")
+        self.logger.info("Thread exit")
 
     def kill(self):
         """
@@ -178,24 +163,6 @@
         except:
             self.logger.info("Ignoring dataplane soc shutdown error")
 
-    def dequeue(self, use_lock=True):
-        """
-        Get the oldest packet in the queue
-        @param use_lock If True, acquires the packet sync lock (which is
-        really the parent's lock)
-        @return The pair packet, packet time-stamp
-        """
-        if use_lock:
-            self.pkt_sync.acquire()
-        if len(self.packets) > 0:
-            pkt, pkt_time = self.packets.pop(0)
-            self.parent.packets_pending -= 1
-        else:
-            pkt = pkt_time = None
-        if use_lock:
-            self.pkt_sync.release()
-        return pkt, pkt_time
-
     def timestamp_head(self):
         """
         Return the timestamp of the head of queue or None if empty
@@ -211,13 +178,9 @@
         """
         Clear the packet queue
         """
-        self.pkt_sync.acquire()
-        self.packets_discarded += len(self.packets)
-        self.parent.packets_pending -= len(self.packets)
-        self.packets = []
-        self.packet_times = []
-        self.pkt_sync.release()
-
+        with self.parent.pkt_sync:
+            self.packets_discarded += len(self.packets)
+            self.packets = []
 
     def send(self, packet):
         """
@@ -338,18 +301,39 @@
 
     def _oldest_packet_find(self):
         # Find port with oldest packet
-        min_time = 0
-        min_port = -1
-        for port_number in self.port_list.keys():
-            ptime = self.port_list[port_number].timestamp_head()
-            if ptime:
-                if (min_port == -1) or (ptime < min_time):
-                    min_time = ptime
-                    min_port = port_number
         oft_assert(min_port != -1, "Could not find port when pkts pending")
 
         return min_port
 
+    # Returns the port with the oldest packet, or None if no packets are queued.
+    def oldest_port(self):
+        min_port = None
+        min_time = float('inf')
+        for port in self.port_list.values():
+            ptime = port.timestamp_head()
+            if ptime and ptime < min_time:
+                min_time = ptime
+                min_port = port
+        return min_port
+
+    # Dequeues and yields packets in the order they were received.
+    # Yields (port, packet, received time).
+    # If port_number is not specified yields packets from all ports.
+    def packets(self, port_number=None):
+        while True:
+            if port_number == None:
+                port = self.oldest_port()
+            else:
+                port = self.port_list[port_number]
+
+            if port == None or len(port.packets) == 0:
+                self.logger.debug("Out of packets for port %s" % str(port_number))
+                # Out of packets
+                break
+
+            pkt, time = port.packets.pop(0)
+            yield (port, pkt, time)
+
     def poll(self, port_number=None, timeout=None, exp_pkt=None):
         """
         Poll one or all dataplane ports for a packet
@@ -371,63 +355,28 @@
         occurs, return None, None, None
         """
 
-
         if exp_pkt and not port_number:
             self.logger.warn("Dataplane poll with exp_pkt but no port number")
 
-        self.pkt_sync.acquire()
-
-        # Check if requested specific port and it has a packet
-        if port_number and len(self.port_list[port_number].packets) != 0:
-            while len(self.port_list[port_number].packets) != 0:
-                pkt, time = self.port_list[port_number].dequeue(use_lock=False)
-                if not exp_pkt:
-                    break
-                if match_exp_pkt(exp_pkt, pkt):
-                    break
-                pkt = None # Discard silently
-            if pkt:
-                self.pkt_sync.release()
-                oft_assert(pkt, "Poll: packet not found on port " +
-                           str(port_number))
-                return port_number, pkt, time
-
-        # Check if requested any port and some packet pending
-        if not port_number:
-            while self.packets_pending != 0:
-                port = self._oldest_packet_find()
-                pkt, time = self.port_list[port].dequeue(use_lock=False)
-                self.pkt_sync.release()
-                oft_assert(pkt, "Poll: oldest packet not found")
+        # Retrieve the packet. Returns (port number, packet, time).
+        def grab():
+            self.logger.debug("Grabbing packet")
+            for (port, pkt, time) in self.packets(port_number):
+                self.logger.debug("Checking packet from port %d" % port.port_number)
                 if not exp_pkt or match_exp_pkt(exp_pkt, pkt):
-                    return port, pkt, time
+                    return (port, pkt, time)
+            self.logger.debug("Did not find packet")
+            return None
 
-        # No packet pending; blocking call requested?
-        if not timeout:
-            self.pkt_sync.release()
-            return None, None, None
+        with self.pkt_sync:
+            ret = timed_wait(self.pkt_sync, grab, timeout=timeout)
 
-        # Desired packet isn't available and timeout is specified
-        # Already holding pkt_sync; wait on pkt_sync variable
-        self.want_pkt = True
-        self.exp_pkt = exp_pkt
-        self.want_pkt_port = port_number
-        self.got_pkt_port = None
-        self.pkt_sync.wait(timeout)
-        self.want_pkt = False
-        self.exp_pkt = None
-        if self.got_pkt_port:
-            pkt, time = \
-                self.port_list[self.got_pkt_port].dequeue(use_lock=False)
-            self.pkt_sync.release()
-            oft_assert(pkt, "Poll: pkt reported, but not found at " +
-                       str(self.got_pkt_port))
-            return self.got_pkt_port, pkt, time
-
-        self.pkt_sync.release()
-        self.logger.debug("Poll time out, no packet from " + str(port_number))
-
-        return None, None, None
+        if ret != None:
+            (port, pkt, time) = ret
+            return (port.port_number, pkt, time)
+        else:
+            self.logger.debug("Poll time out, no packet from " + str(port_number))
+            return (None, None, None)
 
     def kill(self, join_threads=True):
         """