fix multithreading issues with packet queue
This change probably breaks some of the semantics of handling incoming openflow
messages.
diff --git a/src/python/oftest/controller.py b/src/python/oftest/controller.py
index 9e3354d..7eac196 100644
--- a/src/python/oftest/controller.py
+++ b/src/python/oftest/controller.py
@@ -114,13 +114,17 @@
self.poll_discards = 0
# State
- self.packets = []
self.sync = Lock()
self.handlers = {}
self.keep_alive = False
self.active = True
self.initial_hello = True
+ # OpenFlow message/packet queue
+ # Protected by the packets_cv lock / condition variable
+ self.packets = []
+ self.packets_cv = Condition()
+
# Settings
self.max_pkts = max_pkts
self.passive = True
@@ -138,19 +142,10 @@
# xid_cv: Condition variable (semaphore) for packet waiters
# xid: Transaction ID being waited on
# xid_response: Transaction response message
- # expect_msg: Is a message being waited on
- # expect_msg_cv: Semaphore for waiters
- # expect_msg_type: Type of message expected
- # expect_msg_response: Result passed through here
-
self.xid_cv = Condition()
self.xid = None
self.xid_response = None
- self.expect_msg = False
- self.expect_msg_cv = Condition()
- self.expect_msg_type = None
- self.expect_msg_response = None
self.buffered_input = ""
def filter_packet(self, rawmsg, hdr):
@@ -162,25 +157,14 @@
"""
# Add check for packet in and rate limit
if self.filter_packet_in:
- # If this is a packet in and not expecting a packet in
- if ((not self.expect_msg_type or
- (self.expect_msg_type != hdr.type)) and
- hdr.type == OFPT_PACKET_IN):
-
- self.pkt_in_run += 1
- # Check if limit exceeded
- if self.pkt_in_run > self.pkt_in_filter_limit:
- self.pkt_in_dropped += 1
- return True
-
- else: # Not in filtering mode
- # If we were dropping packets, report number dropped
- if self.pkt_in_run > self.pkt_in_filter_limit:
- self.logger.debug("Dropped %d packet ins (%d total)"
- % ((self.pkt_in_run -
- self.pkt_in_filter_limit),
- self.pkt_in_dropped))
- self.pkt_in_run = 0
+ # If we were dropping packets, report number dropped
+ # TODO dont drop expected packet ins
+ if self.pkt_in_run > self.pkt_in_filter_limit:
+ self.logger.debug("Dropped %d packet ins (%d total)"
+ % ((self.pkt_in_run -
+ self.pkt_in_filter_limit),
+ self.pkt_in_dropped))
+ self.pkt_in_run = 0
return False
@@ -244,69 +228,48 @@
self.logger.warn("Could not parse message")
continue
- self.sync.acquire()
+ with self.sync:
+ # Check if transaction is waiting
+ with self.xid_cv:
+ if self.xid and hdr.xid == self.xid:
+ self.logger.debug("Matched expected XID " + str(hdr.xid))
+ self.xid_response = (msg, rawmsg)
+ self.xid = None
+ self.xid_cv.notify()
+ continue
- # Check if transaction is waiting
- self.xid_cv.acquire()
- if self.xid:
- if hdr.xid == self.xid:
- self.logger.debug("Matched expected XID " + str(hdr.xid))
- self.xid_response = (msg, rawmsg)
- self.xid = None
- self.xid_cv.notify()
- self.xid_cv.release()
- self.sync.release()
- continue
- self.xid_cv.release()
+ # Check if keep alive is set; if so, respond to echo requests
+ if self.keep_alive:
+ if hdr.type == OFPT_ECHO_REQUEST:
+ self.logger.debug("Responding to echo request")
+ rep = echo_reply()
+ rep.header.xid = hdr.xid
+ # Ignoring additional data
+ if self.message_send(rep.pack(), zero_xid=True) < 0:
+ self.logger.error("Error sending echo reply")
+ continue
- # PREVENT QUEUE ACCESS AT THIS POINT?
- # Check if anyone waiting on this type of message
- self.expect_msg_cv.acquire()
- if self.expect_msg:
- if not self.expect_msg_type or (self.expect_msg_type == hdr.type):
- self.logger.debug("Matched msg; type %s. expected %s " %
- (ofp_type_map[hdr.type],
- str(self.expect_msg_type)))
- self.expect_msg_response = (msg, rawmsg)
- self.expect_msg = False
- self.expect_msg_cv.notify()
- self.expect_msg_cv.release()
- self.sync.release()
- continue
- self.expect_msg_cv.release()
+ # Now check for message handlers; preference is given to
+ # handlers for a specific packet
+ handled = False
+ if hdr.type in self.handlers.keys():
+ handled = self.handlers[hdr.type](self, msg, rawmsg)
+ if not handled and ("all" in self.handlers.keys()):
+ handled = self.handlers["all"](self, msg, rawmsg)
- # Check if keep alive is set; if so, respond to echo requests
- if self.keep_alive:
- if hdr.type == OFPT_ECHO_REQUEST:
- self.sync.release()
- self.logger.debug("Responding to echo request")
- rep = echo_reply()
- rep.header.xid = hdr.xid
- # Ignoring additional data
- if self.message_send(rep.pack(), zero_xid=True) < 0:
- self.logger.error("Error sending echo reply")
- continue
+ if not handled: # Not handled, enqueue
+ self.logger.debug("Enqueuing pkt type " + ofp_type_map[hdr.type])
+ with self.packets_cv:
+ if len(self.packets) >= self.max_pkts:
+ self.packets.pop(0)
+ self.packets_expired += 1
+ self.packets.append((msg, rawmsg))
+ self.packets_cv.notify_all()
+ self.packets_total += 1
+ else:
+ self.packets_handled += 1
+ self.logger.debug("Message handled by callback")
- # Now check for message handlers; preference is given to
- # handlers for a specific packet
- handled = False
- if hdr.type in self.handlers.keys():
- handled = self.handlers[hdr.type](self, msg, rawmsg)
- if not handled and ("all" in self.handlers.keys()):
- handled = self.handlers["all"](self, msg, rawmsg)
-
- if not handled: # Not handled, enqueue
- self.logger.debug("Enqueuing pkt type " + ofp_type_map[hdr.type])
- if len(self.packets) >= self.max_pkts:
- self.packets.pop(0)
- self.packets_expired += 1
- self.packets.append((msg, rawmsg))
- self.packets_total += 1
- else:
- self.packets_handled += 1
- self.logger.debug("Message handled by callback")
-
- self.sync.release()
# end of 'while offset < len(pkt)'
# note that if offset = len(pkt), this is
# appends a harmless empty string
@@ -518,47 +481,48 @@
before sleeping on message in events.
"""
- msg = pkt = None
-
self.logger.debug("Poll for " + ofp_type_map[exp_msg])
- # First check the current queue
- self.sync.acquire()
- if len(self.packets) > 0:
- if not exp_msg:
- (msg, pkt) = self.packets.pop(0)
- self.sync.release()
- return (msg, pkt)
- else:
- for i in range(len(self.packets)):
- msg = self.packets[i][0]
- if msg.header.type == exp_msg:
- (msg, pkt) = self.packets.pop(i)
- self.sync.release()
- return (msg, pkt)
- # Okay, not currently in the queue
- if timeout is None or timeout <= 0:
- self.sync.release()
+ # Looks for the packet in the queue
+ def grab():
+ if len(self.packets) > 0:
+ if not exp_msg:
+ self.logger.debug("Looking for any packet")
+ (msg, pkt) = self.packets.pop(0)
+ return (msg, pkt)
+ else:
+ self.logger.debug("Looking for %s" % ofp_type_map[exp_msg])
+ for i in range(len(self.packets)):
+ msg = self.packets[i][0]
+ self.logger.debug("Checking packets[%d] (%s)" % (i, ofp_type_map[msg.header.type]))
+ if msg.header.type == exp_msg:
+ (msg, pkt) = self.packets.pop(i)
+ return (msg, pkt)
+ # Not found
+ self.logger.debug("Packet not in queue")
return (None, None)
- msg = pkt = None
- self.logger.debug("Entering timeout")
- # Careful of race condition releasing sync before message cv
- # Also, this style is ripe for a lockup.
- self.expect_msg_cv.acquire()
- self.expect_msg_response = None
- self.expect_msg = True
- self.expect_msg_type = exp_msg
- self.sync.release()
- self.expect_msg_cv.wait(timeout)
- if self.expect_msg_response is not None:
- (msg, pkt) = self.expect_msg_response
- self.expect_msg_cv.release()
+ # Non-blocking case
+ if timeout is None or timeout <= 0:
+ return grab()
- if msg is None:
- self.logger.debug("Poll time out")
- else:
- self.logger.debug("Got msg " + str(msg))
+ msg = pkt = None
+ self.logger.debug("Entering timeout (%fs)" % timeout)
+ end_time = time.time() + timeout
+ with self.packets_cv:
+ while True:
+ if time.time() > end_time:
+ self.logger.debug("Poll time out")
+ return (None, None)
+
+ (msg, pkt) = grab()
+ if msg != None:
+ self.logger.debug("Got msg " + str(msg))
+ return (msg, pkt)
+
+ # Go to sleep
+ remaining_time = end_time - time.time()
+ self.packets_cv.wait(remaining_time)
return (msg, pkt)