Merge pull request #37 from blp/master

ovs-dummy fix
diff --git a/platforms/ovs-dummy.py b/platforms/ovs-dummy.py
index 2dad1a5..a2f1790 100644
--- a/platforms/ovs-dummy.py
+++ b/platforms/ovs-dummy.py
@@ -17,30 +17,25 @@
 RCV_TIMEOUT = 10000
 RUN_DIR = os.environ.get("OVS_RUNDIR", "/var/run/openvswitch")
 
-class DataPlanePortOVSDummy(Thread):
+class DataPlanePortOVSDummy:
     """
     Class defining a port monitoring object that uses Unix domain
     sockets for ports, intended for connecting to Open vSwitch "dummy"
     netdevs.
     """
 
-    def __init__(self, interface_name, port_number, parent, max_pkts=1024):
+    def __init__(self, interface_name, port_number, max_pkts=1024):
         """
         Set up a port monitor object
         @param interface_name The name of the physical interface like eth1
         @param port_number The port number associated with this port
-        @param parent The controlling dataplane object; for pkt wait CV
         @param max_pkts Maximum number of pkts to keep in queue
         """
-        Thread.__init__(self)
         self.interface_name = interface_name
         self.max_pkts = max_pkts
-        self.packets_total = 0
-        self.packets = []
-        self.packets_discarded = 0
         self.port_number = port_number
         self.txq = []
-        self.txq_lock = Lock()
+        self.rxbuf = ""
         logname = "dp-" + interface_name
         self.logger = logging.getLogger(logname)
         try:
@@ -49,7 +44,6 @@
             self.logger.info("Could not open socket")
             raise
         self.logger.info("Opened port monitor (class %s)", type(self).__name__)
-        self.parent = parent
 
     @staticmethod
     def interface_open(interface_name):
@@ -64,133 +58,65 @@
         s.connect("%s/%s" % (RUN_DIR, interface_name))
         return s
 
-    def run(self):
-        """
-        Activity function for class
-        """
-        self.running = True
-        rxbuf = ""
-        while self.running:
-            try:
-                self.txq_lock.acquire()
-                if self.txq:
-                    wlist = [self.socket]
-                else:
-                    wlist = []
-                self.txq_lock.release()
-
-                rout, wout, eout = select.select([self.socket], wlist, [], 1)
-            except:
-                print sys.exc_info()
-                self.logger.error("Select error, exiting")
-                break
-
-            if not self.running:
-                break
-
-            if wout:
-                self.txq_lock.acquire()
-                if self.txq:
-                    retval = self.socket.send(self.txq[0])
-                    if retval > 0:
-                        self.txq[0] = self.txq[0][retval:]
-                        if len(self.txq[0]) == 0:
-                            self.txq = self.txq[1:]
-                self.txq_lock.release()
-
-            if rout:
-                if len(rxbuf) < 2:
-                    n = 2 - len(rxbuf)
-                else:
-                    frame_len = struct.unpack('>h', rxbuf[:2])[0]
-                    n = (2 + frame_len) - len(rxbuf)
-
-                data = self.socket.recv(n)
-                rxbuf += data
-                if len(data) == n and len(rxbuf) > 2:
-                    rcvtime = time.time()
-                    self.logger.debug("Pkt len " + str(len(rxbuf)) +
-                             " in at " + str(rcvtime) + " on port " +
-                             str(self.port_number))
-
-                    # Enqueue packet
-                    with self.parent.pkt_sync:
-                        if len(self.packets) >= self.max_pkts:
-                            # Queue full, throw away oldest
-                            self.packets.pop(0)
-                            self.packets_discarded += 1
-                            self.logger.debug("Discarding oldest packet to make room")
-                        self.packets.append((rxbuf[2:], rcvtime))
-                        self.packets_total += 1
-                        self.parent.pkt_sync.notify_all()
-
-                    rxbuf = ''
-
-        self.logger.info("Thread exit")
-
-    def kill(self):
-        """
-        Terminate the running thread
-        """
-        self.logger.debug("Port monitor kill")
-        self.running = False
-        try:
+    def __del__(self):
+        if self.socket:
             self.socket.close()
-        except:
-            self.logger.info("Ignoring dataplane soc shutdown error")
 
-    def timestamp_head(self):
+    def fileno(self):
         """
-        Return the timestamp of the head of queue or None if empty
+        Return an integer file descriptor that can be passed to select(2).
         """
-        rv = None
-        try:
-            rv = self.packets[0][1]
-        except:
-            rv = None
-        return rv
+        return self.socket.fileno()
 
-    def flush(self):
-        """
-        Clear the packet queue
-        """
-        with self.parent.pkt_sync:
-            self.packets_discarded += len(self.packets)
-            self.packets = []
+    def recv(self):
+        while True:
+            rout, wout, eout = select.select([self.socket], [], [], 0)
+            if not rout:
+                return
+
+            if len(self.rxbuf) < 2:
+                n = 2 - len(self.rxbuf)
+            else:
+                frame_len = struct.unpack('>h', self.rxbuf[:2])[0]
+                n = (2 + frame_len) - len(self.rxbuf)
+
+            data = self.socket.recv(n)
+            self.rxbuf += data
+            if len(data) == n and len(self.rxbuf) > 2:
+                rcvtime = time.time()
+                packet = self.rxbuf[2:]
+                self.logger.debug("Pkt len " + str(len(packet)) +
+                         " in at " + str(rcvtime) + " on port " +
+                         str(self.port_number))
+                self.rxbuf = ""
+                return (packet, rcvtime)
 
     def send(self, packet):
-        """
-        Send a packet to the dataplane port
-        @param packet The packet data to send to the port
-        @retval The number of bytes sent
-        """
-
-        self.txq_lock.acquire()
         if len(self.txq) < self.max_pkts:
             self.txq.append(struct.pack('>h', len(packet)) + packet)
             retval = len(packet)
         else:
             retval = 0
-        self.txq_lock.release()
-
+        self.__run_tx()
         return retval
 
-    def register(self, handler):
-        """
-        Register a callback function to receive packets from this
-        port.  The callback will be passed the packet, the
-        interface name and the port number (if set) on which the
-        packet was received.
+    def __run_tx(self):
+        while self.txq:
+            rout, wout, eout = select.select([], [self.socket], [], 0)
+            if not wout:
+                return
 
-        To be implemented
-        """
+            retval = self.socket.send(self.txq[0])
+            if retval > 0:
+                self.txq[0] = self.txq[0][retval:]
+                if len(self.txq[0]) == 0:
+                    self.txq = self.txq[1:]
+        
+    def down(self):
         pass
 
-    def show(self, prefix=''):
-        print prefix + "Name:          " + self.interface_name
-        print prefix + "Pkts pending:  " + str(len(self.packets))
-        print prefix + "Pkts total:    " + str(self.packets_total)
-        print prefix + "socket:        " + str(self.socket)
+    def up(self):
+        pass
 
 # Update this dictionary to suit your environment.
 dummy_port_map = {