dataplane: single-threaded rewrite

There's now just one thread for the dataplane instead of one per port. This
greatly reduces the amount of code needed for each port implementation.
diff --git a/src/python/oftest/base_tests.py b/src/python/oftest/base_tests.py
index 7f29db5..db9e4a9 100644
--- a/src/python/oftest/base_tests.py
+++ b/src/python/oftest/base_tests.py
@@ -112,7 +112,7 @@
         logging.info("Teardown for simple dataplane test")
         SimpleProtocol.tearDown(self)
         if hasattr(self, 'dataplane'):
-            self.dataplane.kill(join_threads=self.clean_shutdown)
+            self.dataplane.kill()
             del self.dataplane
         logging.info("Teardown done")
 
@@ -136,7 +136,7 @@
 
     def tearDown(self):
         logging.info("Teardown for simple dataplane test")
-        self.dataplane.kill(join_threads=self.clean_shutdown)
+        self.dataplane.kill()
         del self.dataplane
         logging.info("Teardown done")
 
diff --git a/src/python/oftest/dataplane.py b/src/python/oftest/dataplane.py
index 6da8979..d53e223 100644
--- a/src/python/oftest/dataplane.py
+++ b/src/python/oftest/dataplane.py
@@ -36,11 +36,6 @@
 except:
     pass
 
-##@todo Find a better home for these identifiers (dataplane)
-RCV_SIZE_DEFAULT = 4096
-ETH_P_ALL = 0x03
-RCV_TIMEOUT = 10000
-
 def match_exp_pkt(exp_pkt, pkt):
     """
     Compare the string value of pkt with the string value of exp_pkt,
@@ -55,204 +50,66 @@
     return e == p
 
 
-class DataPlanePort(Thread):
+class DataPlanePort:
     """
-    Class defining a port monitoring object.
-
-    Control a dataplane port connected to the switch under test.
-    Creates a promiscuous socket on a physical interface.
-    Queues the packets received on that interface with time stamps.
-    Inherits from Thread class as meant to run in background.  Also
-    supports polling.
-
-    Currently assumes a controlling 'parent' which maintains a
-    common Lock object and a total packet-pending count.  May want
-    to decouple that some day.
+    Uses raw sockets to capture and send packets on a network interface.
     """
 
-    def __init__(self, interface_name, port_number, parent, max_pkts=1024):
+    RCV_SIZE_DEFAULT = 4096
+    ETH_P_ALL = 0x03
+    RCV_TIMEOUT = 10000
+
+    def __init__(self, interface_name, port_number):
         """
-        Set up a port monitor object
         @param interface_name The name of the physical interface like eth1
-        @param port_number The port number associated with this port
-        @param parent The controlling dataplane object; for pkt wait CV
-        @param max_pkts Maximum number of pkts to keep in queue
         """
-        Thread.__init__(self)
         self.interface_name = interface_name
-        self.max_pkts = max_pkts
-        self.packets_total = 0
-        self.packets = []
-        self.packets_discarded = 0
-        self.port_number = port_number
-        logname = "dp-" + interface_name
-        self.logger = logging.getLogger(logname)
-        try:
-            self.socket = self.interface_open(interface_name)
-        except:
-            self.logger.info("Could not open socket")
-            raise
-        self.logger.info("Opened port monitor (class %s)", type(self).__name__)
-        self.parent = parent
-        self.killed = False
+        self.socket = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
+                                    socket.htons(self.ETH_P_ALL))
+        self.socket.bind((interface_name, 0))
+        netutils.set_promisc(self.socket, interface_name)
+        self.socket.settimeout(self.RCV_TIMEOUT)
 
-        # Used to wake up the event loop in kill()
-        self.waker = EventDescriptor()
-
-    def interface_open(self, interface_name):
-        """
-        Open a socket in a promiscuous mode for a data connection.
-        @param interface_name port name as a string such as 'eth1'
-        @retval s socket
-        """
-        s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
-                          socket.htons(ETH_P_ALL))
-        s.bind((interface_name, 0))
-        netutils.set_promisc(s, interface_name)
-        s.settimeout(RCV_TIMEOUT)
-        return s
-
-    def run(self):
-        """
-        Activity function for class
-        """
-        self.socs = [self.socket, self.waker]
-        error_warned = False # Have we warned about error?
-        while not self.killed:
-            try:
-                sel_in, sel_out, sel_err = \
-                    select.select(self.socs, [], [], 1)
-            except:
-                print sys.exc_info()
-                self.logger.error("Select error, exiting")
-                break
-
-            if (sel_in is None) or (len(sel_in) == 0):
-                continue
-
-            if self.waker in sel_in:
-                self.waker.wait()
-                continue
-
-            try:
-                rcvmsg = self.socket.recv(RCV_SIZE_DEFAULT)
-            except socket.error:
-                if not error_warned:
-                    self.logger.info("Socket error on recv")
-                    error_warned = True
-                continue
-
-            if len(rcvmsg) == 0:
-                self.logger.info("Zero len pkt rcvd")
-                self.kill()
-                break
-
-            rcvtime = time.time()
-            self.logger.debug("Pkt len " + str(len(rcvmsg)) +
-                     " in at " + str(rcvtime) + " on port " +
-                     str(self.port_number))
-
-            # Enqueue packet
-            with self.parent.pkt_sync:
-                if len(self.packets) >= self.max_pkts:
-                    # Queue full, throw away oldest
-                    self.packets.pop(0)
-                    self.packets_discarded += 1
-                    self.logger.debug("Discarding oldest packet to make room")
-                self.packets.append((rcvmsg, rcvtime))
-                self.packets_total += 1
-                self.parent.pkt_sync.notify_all()
-
-        self.logger.info("Thread exit")
-
-    def kill(self):
-        """
-        Terminate the running thread
-        """
-        self.logger.debug("Port monitor kill")
-        self.killed = True
-        self.waker.notify()
-        try:
+    def __del__(self):
+        if self.socket:
             self.socket.close()
-        except:
-            self.logger.info("Ignoring dataplane soc shutdown error")
 
-    def timestamp_head(self):
+    def fileno(self):
         """
-        Return the timestamp of the head of queue or None if empty
+        Return an integer file descriptor that can be passed to select(2).
         """
-        rv = None
-        try:
-            rv = self.packets[0][1]
-        except:
-            rv = None
-        return rv
+        return self.socket.fileno()
 
-    def flush(self):
+    def recv(self):
         """
-        Clear the packet queue
+        Receive a packet from this port.
+        @retval (packet data, timestamp)
         """
-        with self.parent.pkt_sync:
-            self.packets_discarded += len(self.packets)
-            self.packets = []
+        pkt = self.socket.recv(self.RCV_SIZE_DEFAULT)
+        return (pkt, time.time())
 
     def send(self, packet):
         """
-        Send a packet to the dataplane port
+        Send a packet out this port.
         @param packet The packet data to send to the port
         @retval The number of bytes sent
         """
         return self.socket.send(packet)
 
-    def register(self, handler):
+    def down(self):
         """
-        Register a callback function to receive packets from this
-        port.  The callback will be passed the packet, the
-        interface name and the port number (if set) on which the
-        packet was received.
-
-        To be implemented
+        Bring the physical link down.
         """
-        pass
+        os.system("ifconfig down %s" % self.interface_name)
 
-    def show(self, prefix=''):
-
-        print prefix + "Name:          " + self.interface_name
-        print prefix + "Pkts pending:  " + str(len(self.packets))
-        print prefix + "Pkts total:    " + str(self.packets_total)
-        print prefix + "socket:        " + str(self.socket)
-
-    
-    def port_down(self,port_number,config):
-
+    def up(self):
         """
-        Grabs a port from the dataplane ports and brings it down by 
-        shutting the corresponding interface 
-        @port_number The port number which has brought to be down
-        @interface_name The interface corresponding to the port that needs to
-        be brought down
-
+        Bring the physical link up.
         """
-        interface_name = config["port_map"].get(port_number)
-        cmd = 'ifdown '+ interface_name
-        os.system(cmd)
-
-    def port_up(self,port_number,config):
-
-        """
-        Grabs a port from the dataplane ports and brings it up by 
-        starting up the corresponding interface 
-        @port_number The port number which has to brought up
-        @interface_name The interface corresponding to the port that has to
-        be brought up
-
-        """
-        interface_name = config["port_map"].get(port_number)
-        cmd = 'ifup '+ interface_name
-        os.system(cmd)
+        os.system("ifconfig up %s" % self.interface_name)
 
 
-class DataPlanePortPcap(DataPlanePort):
+class DataPlanePortPcap:
     """
     Alternate port implementation using libpcap. This is required for recent
     versions of Linux (such as Linux 3.2 included in Ubuntu 12.04) which
@@ -260,81 +117,53 @@
     socket. libpcap understands how to read the VLAN tag from the kernel.
     """
 
-    def __init__(self, interface_name, port_number, parent, max_pkts=1024):
-        DataPlanePort.__init__(self, interface_name, port_number, parent, max_pkts)
-
-    def interface_open(self, interface_name):
-        """
-        Open a PCAP interface.
-        """
+    def __init__(self, interface_name, port_number):
         self.pcap = pcap.pcap(interface_name)
         self.pcap.setnonblock()
+
+    def fileno(self):
         return self.pcap.fileno()
 
-    def run(self):
-        """
-        Activity function for class
-        """
-        while not self.killed:
-            try:
-                sel_in, sel_out, sel_err = select.select([self.socket, self.waker], [], [], 1)
-            except:
-                print sys.exc_info()
-                self.logger.error("Select error, exiting")
-                break
-
-            if (sel_in is None) or (len(sel_in) == 0):
-                continue
-
-            if self.waker in sel_in:
-                self.waker.wait()
-                continue
-
-            # Enqueue packet
-            with self.parent.pkt_sync:
-                for (timestamp, rcvmsg) in self.pcap.readpkts():
-                    self.logger.debug("Pkt len " + str(len(rcvmsg)) +
-                                      " in at " + str(timestamp) + " on port " +
-                                      str(self.port_number))
-
-                    if len(self.packets) >= self.max_pkts:
-                        # Queue full, throw away oldest
-                        self.packets.pop(0)
-                        self.packets_discarded += 1
-                        self.logger.debug("Discarding oldest packet to make room")
-                    self.packets.append((rcvmsg, timestamp))
-                    self.packets_total += 1
-                self.parent.pkt_sync.notify_all()
-
-        self.logger.info("Thread exit")
-
-    def kill(self):
-        """
-        Terminate the running thread
-        """
-        self.logger.debug("Port monitor kill")
-        self.killed = True
-        self.waker.notify()
-        # pcap object is closed on GC.
+    def recv(self):
+        (timestamp, pkt) = next(self.pcap)
+        return (pkt, timestamp)
 
     def send(self, packet):
-        """
-        Send a packet to the dataplane port
-        @param packet The packet data to send to the port
-        @retval The number of bytes sent
-        """
         return self.pcap.inject(packet, len(packet))
 
-class DataPlane:
+    def down(self):
+        pass
+
+    def up(self):
+        pass
+
+class DataPlane(Thread):
     """
-    Class defining access primitives to the data plane
-    Controls a list of DataPlanePort objects
+    This class provides methods to send and receive packets on the dataplane.
+    It uses the DataPlanePort class, or an alternative implementation of that
+    interface, to do IO on a particular port. A background thread is used to
+    read packets from the dataplane ports and enqueue them to be read by the
+    test. The kill() method must be called to shutdown this thread.
     """
+
+    MAX_QUEUE_LEN = 100
+
     def __init__(self, config=None):
-        self.port_list = {}
-        # pkt_sync serves double duty as a regular top level lock and
+        Thread.__init__(self)
+
+        # dict from port number to port object
+        self.ports = {}
+
+        # dict from port number to list of (timestamp, packet)
+        self.packet_queues = {}
+
+        # cvar serves double duty as a regular top level lock and
         # as a condition variable
-        self.pkt_sync = Condition()
+        self.cvar = Condition()
+
+        # Used to wake up the event loop from another thread
+        self.waker = EventDescriptor()
+        self.killed = False
 
         self.logger = logging.getLogger("dataplane")
 
@@ -361,18 +190,54 @@
             self.logger.warning("Missing pypcap, VLAN tests may fail. See README for installation instructions.")
             self.dppclass = DataPlanePort
 
+        self.start()
+
+    def run(self):
+        """
+        Activity function for class
+        """
+        while not self.killed:
+            sockets = [self.waker] + self.ports.values()
+            try:
+                sel_in, sel_out, sel_err = select.select(sockets, [], [], 1)
+            except:
+                print sys.exc_info()
+                self.logger.error("Select error, exiting")
+                break
+
+            with self.cvar:
+                for port in sel_in:
+                    if port == self.waker:
+                        self.waker.wait()
+                        continue
+                    else:
+                        # Enqueue packet
+                        pkt, timestamp = port.recv()
+                        port_number = port._port_number
+                        self.logger.debug("Pkt len %d in on port %d",
+                                          len(pkt), port_number)
+                        queue = self.packet_queues[port_number]
+                        if len(queue) >= self.MAX_QUEUE_LEN:
+                            # Queue full, throw away oldest
+                            queue.pop(0)
+                            self.logger.debug("Discarding oldest packet to make room")
+                        queue.append((pkt, timestamp))
+                self.cvar.notify_all()
+
+        self.logger.info("Thread exit")
+
     def port_add(self, interface_name, port_number):
         """
         Add a port to the dataplane
-        TBD:  Max packets for queue?
         @param interface_name The name of the physical interface like eth1
         @param port_number The port number used to refer to the port
+        Stashes the port number on the created port object.
         """
-
-        self.port_list[port_number] = self.dppclass(interface_name, 
-                                                    port_number, self); 
-
-        self.port_list[port_number].start()
+        self.ports[port_number] = self.dppclass(interface_name, port_number)
+        self.ports[port_number]._port_number = port_number
+        self.packet_queues[port_number] = []
+        # Need to wake up event loop to change the sockets being selected on.
+        self.waker.notify()
 
     def send(self, port_number, packet):
         """
@@ -382,40 +247,44 @@
         """
         self.logger.debug("Sending %d bytes to port %d" %
                           (len(packet), port_number))
-        bytes = self.port_list[port_number].send(packet)
+        bytes = self.ports[port_number].send(packet)
         if bytes != len(packet):
             self.logger.error("Unhandled send error, length mismatch %d != %d" %
                      (bytes, len(packet)))
         return bytes
 
-    # Returns the port with the oldest packet, or None if no packets are queued.
-    def oldest_port(self):
-        min_port = None
+    def oldest_port_number(self):
+        """
+        Returns the port number with the oldest packet, or
+        None if no packets are queued.
+        """
+        min_port_number = None
         min_time = float('inf')
-        for port in self.port_list.values():
-            ptime = port.timestamp_head()
-            if ptime and ptime < min_time:
-                min_time = ptime
-                min_port = port
-        return min_port
+        for (port_number, queue) in self.packet_queues.items():
+            if queue and queue[0][1] < min_time:
+                min_time = queue[0][1]
+                min_port_number = port_number
+        return min_port_number
 
     # Dequeues and yields packets in the order they were received.
-    # Yields (port, packet, received time).
+    # Yields (port number, packet, received time).
     # If port_number is not specified yields packets from all ports.
     def packets(self, port_number=None):
         while True:
-            if port_number == None:
-                port = self.oldest_port()
-            else:
-                port = self.port_list[port_number]
+            rcv_port_number = port_number or self.oldest_port_number()
 
-            if port == None or len(port.packets) == 0:
-                self.logger.debug("Out of packets for port %s" % str(port_number))
-                # Out of packets
+            if rcv_port_number == None:
+                self.logger.debug("Out of packets on all ports")
                 break
 
-            pkt, time = port.packets.pop(0)
-            yield (port, pkt, time)
+            queue = self.packet_queues[rcv_port_number]
+
+            if len(queue) == 0:
+                self.logger.debug("Out of packets on port %d", rcv_port_number)
+                break
+
+            pkt, time = queue.pop(0)
+            yield (rcv_port_number, pkt, time)
 
     def poll(self, port_number=None, timeout=-1, exp_pkt=None):
         """
@@ -444,50 +313,37 @@
         # Retrieve the packet. Returns (port number, packet, time).
         def grab():
             self.logger.debug("Grabbing packet")
-            for (port, pkt, time) in self.packets(port_number):
-                self.logger.debug("Checking packet from port %d" % port.port_number)
+            for (rcv_port_number, pkt, time) in self.packets(port_number):
+                self.logger.debug("Checking packet from port %d", rcv_port_number)
                 if not exp_pkt or match_exp_pkt(exp_pkt, pkt):
-                    return (port, pkt, time)
+                    return (rcv_port_number, pkt, time)
             self.logger.debug("Did not find packet")
             return None
 
-        with self.pkt_sync:
-            ret = timed_wait(self.pkt_sync, grab, timeout=timeout)
+        with self.cvar:
+            ret = timed_wait(self.cvar, grab, timeout=timeout)
 
         if ret != None:
-            (port, pkt, time) = ret
-            return (port.port_number, pkt, time)
+            return ret
         else:
             self.logger.debug("Poll time out, no packet from " + str(port_number))
             return (None, None, None)
 
-    def kill(self, join_threads=True):
+    def kill(self):
         """
-        Close all sockets for dataplane
-        @param join_threads If True call join on each thread
+        Stop the dataplane thread.
         """
-        for port_number in self.port_list.keys():
-            self.port_list[port_number].kill()
+        self.killed = True
+        self.waker.notify()
+        self.join()
+        # Explicitly release ports to ensure we don't run out of sockets
+        # even if someone keeps holding a reference to the dataplane.
+        del self.ports
 
-        if join_threads:
-            for port_number in self.port_list.keys():
-                self.logger.debug("Joining " + str(port_number))
-                self.port_list[port_number].join()
-
-        self.port_list = None
-
-        self.logger.info("DataPlane shutdown")
-
-    def show(self, prefix=''):
-        print prefix + "Dataplane Controller"
-        for pnum, port in self.port_list.items():
-            print prefix + "OpenFlow Port Number " + str(pnum)
-            port.show(prefix + '  ')
-
-    def port_down(self,port_number):
+    def port_down(self, port_number):
         """Brings the specified port down"""
-        self.port_list[port_number].port_down(port_number,self.config)
+        self.ports[port_number].down()
 
-    def port_up(self,port_number):
+    def port_up(self, port_number):
         """Brings the specified port up"""
-        self.port_list[port_number].port_up(port_number,self.config)
+        self.ports[port_number].up()