Mostly polling and timeout support

Support poll timeouts for controller messages
Support poll and timeouts for dataplane messages
Changed name of dataplane pkt get to 'poll'
Six basic test cases now passing on LB4G
Added test-framework assertion
Added additional files to lint checking
diff --git a/src/python/oftest/controller.py b/src/python/oftest/controller.py
index 0b89ee6..7f4196b 100644
--- a/src/python/oftest/controller.py
+++ b/src/python/oftest/controller.py
@@ -80,6 +80,7 @@
         self.switch_addr = None
         self.socs = []
         self.connect_cv = Condition()
+        self.message_cv = Condition()
 
         # Counters
         self.socket_errors = 0
@@ -87,6 +88,7 @@
         self.packets_total = 0
         self.packets_expired = 0
         self.packets_handled = 0
+        self.poll_discards = 0
 
         # State
         self.packets = []
@@ -105,14 +107,24 @@
         self.debug_level = debug_level_default
         # self.debug_level = DEBUG_VERBOSE
 
-        # Transaction variables 
-        #   xid_cv: Condition variable (semaphore) for transaction
+        # Transaction and message type waiting variables 
+        #   xid_cv: Condition variable (semaphore) for packet waiters
         #   xid: Transaction ID being waited on
         #   xid_response: Transaction response message
+        #   expect_msg: Is a message being waited on 
+        #   expect_msg_cv: Semaphore for waiters
+        #   expect_msg_type: Type of message expected
+        #   expect_msg_response: Result passed through here
+
         self.xid_cv = Condition()
         self.xid = None
         self.xid_response = None
 
+        self.expect_msg = False
+        self.expect_msg_cv = Condition()
+        self.expect_msg_type = None
+        self.expect_msg_response = None
+
     def dbg(self, level, string):
         debug_log("CTRL", self.debug_level, level, string)
 
@@ -320,12 +332,24 @@
         self.xid_cv.acquire()
         if self.xid:
             if hdr.xid == self.xid:
-                self.xid_response = msg
+                self.xid_response = (msg, pkt)
+                self.xid = None
                 self.xid_cv.notify()
                 self.xid_cv.release()
                 return (True, None)
         self.xid_cv.release()
 
+        # Check if anyone waiting on this type of message
+        self.expect_msg_cv.acquire()
+        if self.expect_msg:
+            if not self.expect_msg_type or self.expect_msg_type == hdr.type:
+                self.expect_msg_response = (msg, pkt)
+                self.expect_msg = False
+                self.expect_msg_cv.notify()
+                self.expect_msg_cv.release()
+                return (True, None)
+        self.expect_msg_cv.release()
+
         # Check if keep alive is set; if so, respond to echo requests
         if self.keep_alive:
             if hdr.type == OFPT_ECHO_REQUEST:
@@ -368,7 +392,8 @@
 
         @param exp_msg If set, return only when this type of message 
         is received.
-        @param timeout Not yet supported
+        @param timeout If None, do not block.  Otherwise, sleep in
+        intervals of 1 second until
 
         @retval A pair (msg, pkt) where msg is a message object and pkt
         the string representing the packet as received from the socket.
@@ -378,18 +403,43 @@
         If an error occurs, None is returned
         """
 
-        # For now do not support time out;
-        if timeout:
-            self.dbg(DEBUG_WARN, "Poll time out not supported")
+        msg = pkt = None
 
-        while len(self.packets) > 0:
-            self.sync.acquire()
-            (msg, pkt) = self.packets.pop(0)
+        # First check the current queue
+        self.sync.acquire()
+        if len(self.packets) > 0:
+            if not exp_msg:
+                (msg, pkt) = self.packets.pop(0)
+                self.sync.release()
+                return (msg, pkt)
+            else:
+                for i in range(len(self.packets)):
+                    msg = self.packets[i][0]
+                    if msg.header.type == exp_msg:
+                        (msg, pkt) = self.packets.pop(i)
+                        self.sync.release()
+                        return (msg, pkt)
+
+        # Okay, not currently in the queue
+        if timeout is None or timeout <= 0:
             self.sync.release()
-            if not exp_msg or (exp_msg and (msg.header.type == exp_msg)):
-                return msg, pkt
+            return (None, None)
 
-        return None, None
+        # Careful of race condition releasing sync before message cv
+        self.expect_msg_cv.acquire()
+        self.sync.release()
+        self.expect_msg = True
+        self.expect_msg_type = exp_msg
+        self.expect_msg_cv.wait(timeout)
+        if self.expect_msg_response is not None:
+            (msg, pkt) = self.expect_msg_response
+            self.expect_msg_response = None
+        self.expect_msg_cv.release()
+
+        if msg is None:
+            self.dbg(DEBUG_VERBOSE, "poll time out")
+
+        return (msg, pkt)
 
     def transact(self, msg, timeout=None, zero_xid=False):
         """
@@ -421,11 +471,10 @@
         self.xid_response = None
         self.message_send(msg.pack())
         self.xid_cv.wait(timeout)
-        msg = self.xid_response
+        (msg, pkt) = self.xid_response
         self.xid_response = None
-        self.xid = None
         self.xid_cv.release()
-        return msg
+        return (msg, pkt)
 
     def message_send(self, msg, zero_xid=False):
         """
@@ -474,6 +523,7 @@
         string += "  total pkts      " + str(self.packets_total) + "\n"
         string += "  expired pkts    " + str(self.packets_expired) + "\n"
         string += "  handled pkts    " + str(self.packets_handled) + "\n"
+        string += "  poll discards   " + str(self.poll_discards) + "\n"
         string += "  parse errors    " + str(self.parse_errors) + "\n"
         string += "  sock errrors    " + str(self.socket_errors) + "\n"
         string += "  max pkts        " + str(self.max_pkts) + "\n"
diff --git a/src/python/oftest/dataplane.py b/src/python/oftest/dataplane.py
index 44ec997..bdbfea7 100644
--- a/src/python/oftest/dataplane.py
+++ b/src/python/oftest/dataplane.py
@@ -7,7 +7,7 @@
 to stimulate the switch under test.
 
 See the class dataplaneport for more details.  This class wraps
-a set of those objects allowing general calls and parsing 
+a set of those objects allowing general calls and parsing
 configuration.
 
 @todo Add "filters" for matching packets.  Actions supported
@@ -21,6 +21,7 @@
 import netutils
 from threading import Thread
 from threading import Lock
+from threading import Condition
 from oft_config import *
 import select
 
@@ -29,21 +30,6 @@
 RCV_TIMEOUT = 10000
 RCV_SIZE = 4096
 
-# class packet_queue:
-#     """
-#     Class defining a packet queue across multiple ports
-
-#     Items in the queue are stored as a triple (port number, pkt, pkt in time)
-#     """
-
-#     def __init__(self, max_pkts=1024):
-#         self.sync = Lock()
-#         self.debug_level = debug_level_default
-#         self.packets = []
-#         self.max_pkts = max_pkts
-#         self.packets_total = 0
-#         self.packets_discarded = 0
-
 class DataPlanePort(Thread):
     """
     Class defining a port monitoring object.
@@ -54,12 +40,17 @@
     Inherits from Thread class as meant to run in background.  Also
     supports polling.
     Use accessors to dequeue packets for proper synchronization.
+
+    Currently assumes a controlling 'parent' which maintains a
+    common Lock object and a total packet-pending count.  May want
+    to decouple that some day.
     """
 
-    def __init__(self, interface_name, max_pkts=1024):
+    def __init__(self, interface_name, port_number, parent, max_pkts=1024):
         """
         Set up a port monitor object
         @param interface_name The name of the physical interface like eth1
+        @param parent The controlling dataplane object; for pkt wait CV
         @param max_pkts Maximum number of pkts to keep in queue
         """
         Thread.__init__(self)
@@ -69,12 +60,14 @@
         self.packets_total = 0
         self.packets = []
         self.packets_discarded = 0
-        self.sync = Lock()
+        self.port_number = port_number
         self.socket = self.interface_open(interface_name)
         self.dbg(DEBUG_INFO, "Openned port monitor socket")
+        self.parent = parent
+        self.pkt_sync = self.parent.pkt_sync
 
     def dbg(self, level, string):
-        debug_log("DPLANE", self.debug_level, level, 
+        debug_log("DPLANE", self.debug_level, level,
                   self.interface_name + ": " + string)
 
     def interface_open(self, interface_name):
@@ -83,7 +76,7 @@
         @param interface_name port name as a string such as 'eth1'
         @retval s socket
         """
-        s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 
+        s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
                           socket.htons(ETH_P_ALL))
         s.bind((interface_name, 0))
         netutils.set_promisc(s, interface_name)
@@ -129,16 +122,27 @@
                 break
 
             rcvtime = time.clock()
-            self.dbg(DEBUG_VERBOSE, "Pkt len " + str(len(rcvmsg)) + 
+            self.dbg(DEBUG_VERBOSE, "Pkt len " + str(len(rcvmsg)) +
                      " in at " + str(rcvtime))
 
-            self.sync.acquire()
+            # Enqueue packet
+            self.pkt_sync.acquire()
             if len(self.packets) >= self.max_pkts:
+                # Queue full, throw away oldest
                 self.packets.pop(0)
                 self.packets_discarded += 1
+            else:
+                self.parent.packets_pending += 1
+                # Check if parent is waiting on this (or any) port
+                if self.parent.want_pkt:
+                    if (not self.parent.want_pkt_port or
+                        self.parent.want_pkt_port == self.port_number):
+                        self.parent.got_pkt_port = self.port_number
+                        self.parent.want_pkt = False
+                        self.parent.want_pkt.notify()
             self.packets.append((rcvmsg, rcvtime))
             self.packets_total += 1
-            self.sync.release()
+            self.pkt_sync.release()
 
         self.dbg(DEBUG_INFO, "Thread exit ")
 
@@ -151,16 +155,24 @@
             self.socket.close()
         except:
             self.dbg(DEBUG_INFO, "Ignoring dataplane soc shutdown error")
-        self.dbg(DEBUG_INFO, 
-                 "Port monitor exiting")
+        self.dbg(DEBUG_INFO, "Port monitor exiting")
 
-    def dequeue(self):
+    def dequeue(self, use_lock=True):
         """
         Get the oldest packet in the queue
+        @param use_lock If True, acquires the packet sync lock (which is
+        really the parent's lock)
+        @return The pair packet, packet time-stamp
         """
-        self.sync.acquire()
-        pkt, pkt_time = self.packets.pop(0)
-        self.sync.release()
+        if use_lock:
+            self.pkt_sync.acquire()
+        if len(self.packets) > 0:
+            pkt, pkt_time = self.packets.pop(0)
+            self.parent.packets_pending -= 1
+        else:
+            pkt = pkt_time = None
+        if use_lock:
+            self.pkt_sync.release()
         return pkt, pkt_time
 
     def timestamp_head(self):
@@ -168,21 +180,22 @@
         Return the timestamp of the head of queue or None if empty
         """
         rv = None
-        self.sync.acquire()
-        if len(self.packets) > 0:
+        try:
             rv = self.packets[0][1]
-        self.sync.release()
+        except:
+            rv = None
         return rv
 
     def flush(self):
         """
         Clear the packet queue
         """
-        self.sync.acquire()
+        self.pkt_sync.acquire()
         self.packets_discarded += len(self.packets)
+        self.parent.packets_pending -= len(self.packets)
         self.packets = []
         self.packet_times = []
-        self.sync.release()
+        self.pkt_sync.release()
 
 
     def send(self, packet):
@@ -199,8 +212,8 @@
     def register(self, handler):
         """
         Register a callback function to receive packets from this
-        port.  The callback will be passed the packet, the 
-        interface name and the port number (if set) on which the 
+        port.  The callback will be passed the packet, the
+        interface name and the port number (if set) on which the
         packet was received.
 
         To be implemented
@@ -212,7 +225,7 @@
         print prefix + "Pkts pending:  " + str(len(self.packets))
         print prefix + "Pkts total:    " + str(self.packets_total)
         print prefix + "socket:        " + str(self.socket)
-        
+
 
 class DataPlane:
     """
@@ -222,6 +235,15 @@
     def __init__(self):
         self.port_list = {}
         self.debug_level = debug_level_default
+        # pkt_sync serves double duty as a regular top level lock and
+        # as a condition variable
+        self.pkt_sync = Condition()
+
+        # These are used to signal async pkt arrival for polling
+        self.want_pkt = False
+        self.want_pkt_port = None # What port required (or None)
+        self.got_pkt_port = None # On what port received?
+        self.packets_pending = 0 # Total pkts in all port queues
 
     def dbg(self, level, string):
         debug_log("DPORT", self.debug_level, level, string)
@@ -233,8 +255,9 @@
         @param interface_name The name of the physical interface like eth1
         @param port_number The port number used to refer to the port
         """
-        
-        self.port_list[port_number] = DataPlanePort(interface_name)
+
+        self.port_list[port_number] = DataPlanePort(interface_name, 
+                                                    port_number, self)
         self.port_list[port_number].start()
 
     def send(self, port_number, packet):
@@ -247,7 +270,7 @@
                  "Sending %d bytes to port %d" % (len(packet), port_number))
         bytes = self.port_list[port_number].send(packet)
         if bytes != len(packet):
-            self.dbg(DEBUG_ERROR,"Unhandled send error, " + 
+            self.dbg(DEBUG_ERROR,"Unhandled send error, " +
                      "length mismatch %d != %d" %
                      (bytes, len(packet)))
         return bytes
@@ -264,41 +287,77 @@
                          ", port %d, length mismatch %d != %d" %
                          (port_number, bytes, len(packet)))
 
-    def packet_get(self, port_number=None):
-        """
-        Get a packet from the data plane
-
-        If port_number is given, get the oldest packet from that port.
-        Otherwise, find the port with the oldest packet and return
-        that packet.
-        @param port_number If set, get packet from this port
-        @retval The triple port_number, packet, pkt_time where packet 
-        is received from port_number at time pkt_time.  
-        """
-
-        if port_number:
-            if len(self.port_list[port_number].packets) != 0:
-                pkt, time = self.port_list[port_number].dequeue()
-                return port_number, pkt, time
-            else:
-                return None, None, None
-
+    def _oldest_packet_find(self):
         # Find port with oldest packet
-        #@todo Consider using a single queue for all ports
         min_time = 0
         min_port = -1
         for port_number in self.port_list.keys():
             ptime = self.port_list[port_number].timestamp_head()
             if ptime:
                 if (min_port == -1) or (ptime < min_time):
-                    min_time = ptime 
+                    min_time = ptime
                     min_port = port_number
+        oft_assert(min_port != -1, "Could not find port when pkts pending")
 
-        if min_port == -1:
+        return min_port
+
+    def poll(self, port_number=None, timeout=None):
+        """
+        Poll one or all dataplane ports for a packet
+
+        If port_number is given, get the oldest packet from that port.
+        Otherwise, find the port with the oldest packet and return
+        that packet.
+        @param port_number If set, get packet from this port
+        @param timeout If positive and no packet is available, block 
+        until a packet is received or for this many seconds
+        @return The triple port_number, packet, pkt_time where packet
+        is received from port_number at time pkt_time.  If a timeout
+        occurs, return None, None, None
+        """
+
+        self.pkt_sync.acquire()
+
+        # Check if requested specific port and it has a packet
+        if port_number and len(self.port_list[port_number].packets) != 0:
+            pkt, time = self.port_list[port_number].dequeue(use_lock=False)
+            self.pkt_sync.release()
+            oft_assert(pkt, "Poll: packet not found on port " + 
+                       str(port_number))
+            return port_number, pkt, time
+
+        # Check if requested any port and some packet pending
+        if not port_number and self.packets_pending != 0:
+            port = self._oldest_packet_find()
+            pkt, time = self.port_list[port].dequeue(use_lock=False)
+            self.pkt_sync.release()
+            oft_assert(pkt, "Poll: oldest packet not found")
+            return port, pkt, time
+
+        # No packet pending; blocking call requested?
+        if not timeout:
+            self.pkt_sync.release()
             return None, None, None
 
-        pkt, time = self.port_list[min_port].dequeue()
-        return min_port, pkt, time
+        # Desired packet isn't available and timeout is specified
+        # Already holding pkt_sync; wait on pkt_sync variable
+        self.want_pkt = True
+        self.want_pkt_port = port_number
+        self.got_pkt_port = None
+        self.pkt_sync.wait(timeout)
+        self.want_pkt = False
+        if self.got_pkt_port:
+            pkt, time = \
+                self.port_list[self.got_pkt_port].dequeue(use_lock=False)
+            self.pkt_sync.release()
+            oft_assert(pkt, "Poll: pkt reported, but not found at " +
+                       self.got_pkt_port)
+            return self.got_pkt_port, pkt, time
+
+        self.pkt_sync.release()
+        self.dbg(DEBUG_VERBOSE, "Poll time out, no packet")
+
+        return None, None, None
 
     def kill(self, join_threads=False):
         """
@@ -315,6 +374,7 @@
 
     def show(self, prefix=''):
         print prefix + "Dataplane Controller"
+        print prefix + "Packets pending" + str(self.packets_pending)
         for pnum, port in self.port_list.items():
             print prefix + "OpenFlow Port Number " + str(pnum)
             port.show(prefix + '  ')
diff --git a/src/python/oftest/oft_config.py b/src/python/oftest/oft_config.py
index 264c94d..50077fe 100644
--- a/src/python/oftest/oft_config.py
+++ b/src/python/oftest/oft_config.py
@@ -30,6 +30,8 @@
 
 """
 
+import sys
+
 ##@var platform
 # A string representing the platform under test.  Tested below
 # for determining other variables.
@@ -158,3 +160,19 @@
         #@todo Support output redirection based on debug level
         print module + ":" + dbg_string[level] + ":" + string
 
+
+def oft_assert(condition, string):
+    """
+    Test framework assertion check
+
+    @param condition The boolean condition to check
+    @param string String to print if error
+
+    If condition is not true, it is considered a test framework
+    failure and exit is called
+    """
+    if not condition:
+        debug_log("OFT", debug_level_default, DEBUG_CRITICAL, 
+                  "Internal error: " + string)
+        sys.exit(1)
+
diff --git a/tests/basic.py b/tests/basic.py
index 04a2f91..f7db544 100644
--- a/tests/basic.py
+++ b/tests/basic.py
@@ -84,7 +84,7 @@
     """
     def runTest(self):
         request = echo_request()
-        response = self.controller.transact(request)
+        response, pkt = self.controller.transact(request)
         self.assertEqual(response.header.type, OFPT_ECHO_REPLY,
                          'response is not echo_reply')
         self.assertEqual(request.header.xid, response.header.xid,
@@ -98,7 +98,7 @@
     def runTest(self):
         request = echo_request()
         request.data = 'OpenFlow Will Rule The World'
-        response = self.controller.transact(request)
+        response, pkt = self.controller.transact(request)
         self.assertEqual(response.header.type, OFPT_ECHO_REPLY,
                          'response is not echo_reply')
         self.assertEqual(request.header.xid, response.header.xid,
@@ -112,24 +112,23 @@
     """
     def runTest(self):
         # Construct packet to send to dataplane
-        # Send packet to dataplane
+        # Send packet to dataplane, once to each port
         # Poll controller with expect message type packet in
         # For now, a random packet from scapy tutorial
-        pkt=Ether()/IP(dst="www.slashdot.org")/TCP()/\
-            "GET /index.html HTTP/1.0 \n\n"
+
         for of_port in interface_ofport_map.keys():
+            self.dbg(DEBUG_INFO, "PKT IN test, port " + str(of_port))
+            pkt=Ether()/IP(dst="www.slashdot.org")/TCP()/\
+                ("GET /index.html HTTP/1.0. port" + str(of_port))
             self.dataplane.send(of_port, str(pkt))
-            # For now, just send one packet
-            break
+            #@todo Check for unexpected messages?
+            (response, raw) = self.controller.poll(OFPT_PACKET_IN, 2)
 
-        time.sleep(2) # @todo Implement poll timeout for test cases
-        #@todo Check for unexpected messages?
-        (response, raw) = self.controller.poll(OFPT_PACKET_IN)
-
-        self.assertTrue(response is not None, 'Packet in message not received')
-        # Data has CRC on it, so take off last 4 bytes
-        self.assertEqual(str(pkt), response.data[:-4], 
-                         'Response packet does not match send packet')
+            self.assertTrue(response is not None, 
+                            'Packet in message not received')
+            # Data has CRC on it, so take off last 4 bytes
+            self.assertEqual(str(pkt), response.data[:-4], 
+                             'Response packet does not match send packet')
 
 class PacketOutTestCase(SimpleDataPlaneTestCase):
     """
@@ -155,7 +154,7 @@
         self.assertTrue(rv == 0, "Error sending out message")
 
         time.sleep(1) # @todo Implement poll timeout for test cases
-        (of_port, pkt, pkt_time) = self.dataplane.packet_get()
+        (of_port, pkt, pkt_time) = self.dataplane.poll()
 
         self.assertTrue(pkt is not None, 'Packet not received')
         if of_port is not None:
diff --git a/tools/munger/Makefile b/tools/munger/Makefile
index 4e979d2..c89b937 100644
--- a/tools/munger/Makefile
+++ b/tools/munger/Makefile
@@ -29,7 +29,8 @@
 
 # Generated and other files
 GEN_FILES := $(addprefix ${TARGET_DIR}/,cstruct.py message.py error.py action.py)
-OTHER_FILES :=  $(addprefix ${TARGET_DIR}/,action_list.py parse.py)
+OTHER_FILES :=  $(addprefix ${TARGET_DIR}/,action_list.py parse.py \
+	controller.py dataplane.py)
 LINT_SOURCE := ${GEN_FILES} ${OTHER_FILES}
 LINT_FILES := $(subst .py,.log,${LINT_SOURCE})
 LINT_FILES := $(subst ${TARGET_DIR}/,lint/,${LINT_FILES})