fix multithreading issues with packet queue

This change probably breaks some of the semantics of handling incoming openflow
messages.
diff --git a/src/python/oftest/controller.py b/src/python/oftest/controller.py
index 9e3354d..7eac196 100644
--- a/src/python/oftest/controller.py
+++ b/src/python/oftest/controller.py
@@ -114,13 +114,17 @@
         self.poll_discards = 0
 
         # State
-        self.packets = []
         self.sync = Lock()
         self.handlers = {}
         self.keep_alive = False
         self.active = True
         self.initial_hello = True
 
+        # OpenFlow message/packet queue
+        # Protected by the packets_cv lock / condition variable
+        self.packets = []
+        self.packets_cv = Condition()
+
         # Settings
         self.max_pkts = max_pkts
         self.passive = True
@@ -138,19 +142,10 @@
         #   xid_cv: Condition variable (semaphore) for packet waiters
         #   xid: Transaction ID being waited on
         #   xid_response: Transaction response message
-        #   expect_msg: Is a message being waited on 
-        #   expect_msg_cv: Semaphore for waiters
-        #   expect_msg_type: Type of message expected
-        #   expect_msg_response: Result passed through here
-
         self.xid_cv = Condition()
         self.xid = None
         self.xid_response = None
 
-        self.expect_msg = False
-        self.expect_msg_cv = Condition()
-        self.expect_msg_type = None
-        self.expect_msg_response = None
         self.buffered_input = ""
 
     def filter_packet(self, rawmsg, hdr):
@@ -162,25 +157,14 @@
         """
         # Add check for packet in and rate limit
         if self.filter_packet_in:
-            # If this is a packet in and not expecting a packet in
-            if ((not self.expect_msg_type or
-                 (self.expect_msg_type != hdr.type)) and
-                hdr.type == OFPT_PACKET_IN):
-
-                self.pkt_in_run += 1
-                # Check if limit exceeded
-                if self.pkt_in_run > self.pkt_in_filter_limit:
-                    self.pkt_in_dropped += 1
-                    return True
-
-            else: # Not in filtering mode
-                # If we were dropping packets, report number dropped
-                if self.pkt_in_run > self.pkt_in_filter_limit:
-                    self.logger.debug("Dropped %d packet ins (%d total)"
-                             % ((self.pkt_in_run - 
-                                 self.pkt_in_filter_limit),
-                                 self.pkt_in_dropped))
-                self.pkt_in_run = 0
+            # If we were dropping packets, report number dropped
+            # TODO dont drop expected packet ins
+            if self.pkt_in_run > self.pkt_in_filter_limit:
+                self.logger.debug("Dropped %d packet ins (%d total)"
+                            % ((self.pkt_in_run - 
+                                self.pkt_in_filter_limit),
+                                self.pkt_in_dropped))
+            self.pkt_in_run = 0
 
         return False
 
@@ -244,69 +228,48 @@
                 self.logger.warn("Could not parse message")
                 continue
 
-            self.sync.acquire()
+            with self.sync:
+                # Check if transaction is waiting
+                with self.xid_cv:
+                    if self.xid and hdr.xid == self.xid:
+                        self.logger.debug("Matched expected XID " + str(hdr.xid))
+                        self.xid_response = (msg, rawmsg)
+                        self.xid = None
+                        self.xid_cv.notify()
+                        continue
 
-            # Check if transaction is waiting
-            self.xid_cv.acquire()
-            if self.xid:
-                if hdr.xid == self.xid:
-                    self.logger.debug("Matched expected XID " + str(hdr.xid))
-                    self.xid_response = (msg, rawmsg)
-                    self.xid = None
-                    self.xid_cv.notify()
-                    self.xid_cv.release()
-                    self.sync.release()
-                    continue
-            self.xid_cv.release()
+                # Check if keep alive is set; if so, respond to echo requests
+                if self.keep_alive:
+                    if hdr.type == OFPT_ECHO_REQUEST:
+                        self.logger.debug("Responding to echo request")
+                        rep = echo_reply()
+                        rep.header.xid = hdr.xid
+                        # Ignoring additional data
+                        if self.message_send(rep.pack(), zero_xid=True) < 0:
+                            self.logger.error("Error sending echo reply")
+                        continue
 
-            # PREVENT QUEUE ACCESS AT THIS POINT?
-            # Check if anyone waiting on this type of message
-            self.expect_msg_cv.acquire()
-            if self.expect_msg:
-                if not self.expect_msg_type or (self.expect_msg_type == hdr.type):
-                    self.logger.debug("Matched msg; type %s. expected %s " %
-                                      (ofp_type_map[hdr.type], 
-                                       str(self.expect_msg_type)))
-                    self.expect_msg_response = (msg, rawmsg)
-                    self.expect_msg = False
-                    self.expect_msg_cv.notify()
-                    self.expect_msg_cv.release()
-                    self.sync.release()
-                    continue
-            self.expect_msg_cv.release()
+                # Now check for message handlers; preference is given to
+                # handlers for a specific packet
+                handled = False
+                if hdr.type in self.handlers.keys():
+                    handled = self.handlers[hdr.type](self, msg, rawmsg)
+                if not handled and ("all" in self.handlers.keys()):
+                    handled = self.handlers["all"](self, msg, rawmsg)
 
-            # Check if keep alive is set; if so, respond to echo requests
-            if self.keep_alive:
-                if hdr.type == OFPT_ECHO_REQUEST:
-                    self.sync.release()
-                    self.logger.debug("Responding to echo request")
-                    rep = echo_reply()
-                    rep.header.xid = hdr.xid
-                    # Ignoring additional data
-                    if self.message_send(rep.pack(), zero_xid=True) < 0:
-                        self.logger.error("Error sending echo reply")
-                    continue
+                if not handled: # Not handled, enqueue
+                    self.logger.debug("Enqueuing pkt type " + ofp_type_map[hdr.type])
+                    with self.packets_cv:
+                        if len(self.packets) >= self.max_pkts:
+                            self.packets.pop(0)
+                            self.packets_expired += 1
+                        self.packets.append((msg, rawmsg))
+                        self.packets_cv.notify_all()
+                    self.packets_total += 1
+                else:
+                    self.packets_handled += 1
+                    self.logger.debug("Message handled by callback")
 
-            # Now check for message handlers; preference is given to
-            # handlers for a specific packet
-            handled = False
-            if hdr.type in self.handlers.keys():
-                handled = self.handlers[hdr.type](self, msg, rawmsg)
-            if not handled and ("all" in self.handlers.keys()):
-                handled = self.handlers["all"](self, msg, rawmsg)
-
-            if not handled: # Not handled, enqueue
-                self.logger.debug("Enqueuing pkt type " + ofp_type_map[hdr.type])
-                if len(self.packets) >= self.max_pkts:
-                    self.packets.pop(0)
-                    self.packets_expired += 1
-                self.packets.append((msg, rawmsg))
-                self.packets_total += 1
-            else:
-                self.packets_handled += 1
-                self.logger.debug("Message handled by callback")
-
-            self.sync.release()
         # end of 'while offset < len(pkt)'
         #   note that if offset = len(pkt), this is
         #   appends a harmless empty string
@@ -518,47 +481,48 @@
         before sleeping on message in events.
         """
 
-        msg = pkt = None
-
         self.logger.debug("Poll for " + ofp_type_map[exp_msg])
-        # First check the current queue
-        self.sync.acquire()
-        if len(self.packets) > 0:
-            if not exp_msg:
-                (msg, pkt) = self.packets.pop(0)
-                self.sync.release()
-                return (msg, pkt)
-            else:
-                for i in range(len(self.packets)):
-                    msg = self.packets[i][0]
-                    if msg.header.type == exp_msg:
-                        (msg, pkt) = self.packets.pop(i)
-                        self.sync.release()
-                        return (msg, pkt)
 
-        # Okay, not currently in the queue
-        if timeout is None or timeout <= 0:
-            self.sync.release()
+        # Looks for the packet in the queue
+        def grab():
+            if len(self.packets) > 0:
+                if not exp_msg:
+                    self.logger.debug("Looking for any packet")
+                    (msg, pkt) = self.packets.pop(0)
+                    return (msg, pkt)
+                else:
+                    self.logger.debug("Looking for %s" % ofp_type_map[exp_msg])
+                    for i in range(len(self.packets)):
+                        msg = self.packets[i][0]
+                        self.logger.debug("Checking packets[%d] (%s)" % (i, ofp_type_map[msg.header.type]))
+                        if msg.header.type == exp_msg:
+                            (msg, pkt) = self.packets.pop(i)
+                            return (msg, pkt)
+            # Not found
+            self.logger.debug("Packet not in queue")
             return (None, None)
 
-        msg = pkt = None
-        self.logger.debug("Entering timeout")
-        # Careful of race condition releasing sync before message cv
-        # Also, this style is ripe for a lockup.
-        self.expect_msg_cv.acquire()
-        self.expect_msg_response = None
-        self.expect_msg = True
-        self.expect_msg_type = exp_msg
-        self.sync.release()
-        self.expect_msg_cv.wait(timeout)
-        if self.expect_msg_response is not None:
-            (msg, pkt) = self.expect_msg_response
-        self.expect_msg_cv.release()
+        # Non-blocking case
+        if timeout is None or timeout <= 0:
+            return grab()
 
-        if msg is None:
-            self.logger.debug("Poll time out")
-        else:
-            self.logger.debug("Got msg " + str(msg))
+        msg = pkt = None
+        self.logger.debug("Entering timeout (%fs)" % timeout)
+        end_time = time.time() + timeout
+        with self.packets_cv:
+            while True:
+                if time.time() > end_time:
+                    self.logger.debug("Poll time out")
+                    return (None, None)
+
+                (msg, pkt) = grab()
+                if msg != None:
+                    self.logger.debug("Got msg " + str(msg))
+                    return (msg, pkt)
+
+                # Go to sleep
+                remaining_time = end_time - time.time()
+                self.packets_cv.wait(remaining_time)
 
         return (msg, pkt)