Mostly polling and timeout support
Support poll timeouts for controller messages
Support poll and timeouts for dataplane messages
Changed name of dataplane pkt get to 'poll'
Six basic test cases now passing on LB4G
Added test-framework assertion
Added additional files to lint checking
diff --git a/src/python/oftest/controller.py b/src/python/oftest/controller.py
index 0b89ee6..7f4196b 100644
--- a/src/python/oftest/controller.py
+++ b/src/python/oftest/controller.py
@@ -80,6 +80,7 @@
self.switch_addr = None
self.socs = []
self.connect_cv = Condition()
+ self.message_cv = Condition()
# Counters
self.socket_errors = 0
@@ -87,6 +88,7 @@
self.packets_total = 0
self.packets_expired = 0
self.packets_handled = 0
+ self.poll_discards = 0
# State
self.packets = []
@@ -105,14 +107,24 @@
self.debug_level = debug_level_default
# self.debug_level = DEBUG_VERBOSE
- # Transaction variables
- # xid_cv: Condition variable (semaphore) for transaction
+ # Transaction and message type waiting variables
+ # xid_cv: Condition variable (semaphore) for packet waiters
# xid: Transaction ID being waited on
# xid_response: Transaction response message
+ # expect_msg: Is a message being waited on
+ # expect_msg_cv: Semaphore for waiters
+ # expect_msg_type: Type of message expected
+ # expect_msg_response: Result passed through here
+
self.xid_cv = Condition()
self.xid = None
self.xid_response = None
+ self.expect_msg = False
+ self.expect_msg_cv = Condition()
+ self.expect_msg_type = None
+ self.expect_msg_response = None
+
def dbg(self, level, string):
debug_log("CTRL", self.debug_level, level, string)
@@ -320,12 +332,24 @@
self.xid_cv.acquire()
if self.xid:
if hdr.xid == self.xid:
- self.xid_response = msg
+ self.xid_response = (msg, pkt)
+ self.xid = None
self.xid_cv.notify()
self.xid_cv.release()
return (True, None)
self.xid_cv.release()
+ # Check if anyone waiting on this type of message
+ self.expect_msg_cv.acquire()
+ if self.expect_msg:
+ if not self.expect_msg_type or self.expect_msg_type == hdr.type:
+ self.expect_msg_response = (msg, pkt)
+ self.expect_msg = False
+ self.expect_msg_cv.notify()
+ self.expect_msg_cv.release()
+ return (True, None)
+ self.expect_msg_cv.release()
+
# Check if keep alive is set; if so, respond to echo requests
if self.keep_alive:
if hdr.type == OFPT_ECHO_REQUEST:
@@ -368,7 +392,8 @@
@param exp_msg If set, return only when this type of message
is received.
- @param timeout Not yet supported
+ @param timeout If None, do not block. Otherwise, sleep in
+ intervals of 1 second until
@retval A pair (msg, pkt) where msg is a message object and pkt
the string representing the packet as received from the socket.
@@ -378,18 +403,43 @@
If an error occurs, None is returned
"""
- # For now do not support time out;
- if timeout:
- self.dbg(DEBUG_WARN, "Poll time out not supported")
+ msg = pkt = None
- while len(self.packets) > 0:
- self.sync.acquire()
- (msg, pkt) = self.packets.pop(0)
+ # First check the current queue
+ self.sync.acquire()
+ if len(self.packets) > 0:
+ if not exp_msg:
+ (msg, pkt) = self.packets.pop(0)
+ self.sync.release()
+ return (msg, pkt)
+ else:
+ for i in range(len(self.packets)):
+ msg = self.packets[i][0]
+ if msg.header.type == exp_msg:
+ (msg, pkt) = self.packets.pop(i)
+ self.sync.release()
+ return (msg, pkt)
+
+ # Okay, not currently in the queue
+ if timeout is None or timeout <= 0:
self.sync.release()
- if not exp_msg or (exp_msg and (msg.header.type == exp_msg)):
- return msg, pkt
+ return (None, None)
- return None, None
+ # Careful of race condition releasing sync before message cv
+ self.expect_msg_cv.acquire()
+ self.sync.release()
+ self.expect_msg = True
+ self.expect_msg_type = exp_msg
+ self.expect_msg_cv.wait(timeout)
+ if self.expect_msg_response is not None:
+ (msg, pkt) = self.expect_msg_response
+ self.expect_msg_response = None
+ self.expect_msg_cv.release()
+
+ if msg is None:
+ self.dbg(DEBUG_VERBOSE, "poll time out")
+
+ return (msg, pkt)
def transact(self, msg, timeout=None, zero_xid=False):
"""
@@ -421,11 +471,10 @@
self.xid_response = None
self.message_send(msg.pack())
self.xid_cv.wait(timeout)
- msg = self.xid_response
+ (msg, pkt) = self.xid_response
self.xid_response = None
- self.xid = None
self.xid_cv.release()
- return msg
+ return (msg, pkt)
def message_send(self, msg, zero_xid=False):
"""
@@ -474,6 +523,7 @@
string += " total pkts " + str(self.packets_total) + "\n"
string += " expired pkts " + str(self.packets_expired) + "\n"
string += " handled pkts " + str(self.packets_handled) + "\n"
+ string += " poll discards " + str(self.poll_discards) + "\n"
string += " parse errors " + str(self.parse_errors) + "\n"
string += " sock errrors " + str(self.socket_errors) + "\n"
string += " max pkts " + str(self.max_pkts) + "\n"