Lots of reorg and wrestling with sockets
diff --git a/src/python/oftest/controller.py b/src/python/oftest/controller.py
index 4262ca9..779e42c 100644
--- a/src/python/oftest/controller.py
+++ b/src/python/oftest/controller.py
@@ -17,6 +17,14 @@
 
 @todo Support transaction semantics via xid
 @todo Set up reasonable logging facility
+@todo Support select and listen on an administrative socket (or
+use a timeout to support clean shutdown).
+
+Currently only one connection is accepted during the life of
+the controller.   There seems
+to be no clean way to interrupt an accept call.  Using select that also listens
+on an administrative socket and can shut down the socket might work.
+
 """
 
 from oft_config import *
@@ -28,7 +36,6 @@
 from threading import Condition
 from message import *
 from parse import *
-from netutils import *
 from ofutils import *
 
 class Controller(Thread):
@@ -61,27 +68,35 @@
 
     def __init__(self, max_pkts=1024):
         Thread.__init__(self)
+        # Socket related
         self.rcv_size = RCV_SIZE_DEFAULT
+        self.listen_socket = None
+        self.switch_socket = None
+        self.switch_addr = None
+
+        # Counters
         self.socket_errors = 0
         self.parse_errors = 0
-        self.connected = False
-        self.running = False
-        self.max_pkts = max_pkts
         self.packets_total = 0
+        self.packets_expired = 0
+        self.packets_handled = 0
+
+        # State
+        self.connected = False
         self.packets = []
         self.sync = Lock()
         self.handlers = {}
         self.keep_alive = False
+
+        # Settings
+        self.max_pkts = max_pkts
+        self.passive = True
         self.host = controller_host
         self.port = controller_port
-        self.passive = True
-        self.packets_expired = 0
-        self.packets_handled = 0
         self.dbg_state = "init"
-        self.listen_socket = None
-        self.switch_socket = None
-        self.switch_addr = None
-        self.debug_level = DEBUG_VERBOSE
+        # self.debug_level = DEBUG_VERBOSE
+        self.debug_level = debug_level_default
+
         # Transaction variables 
         #   xid_cv: Condition variable (semaphore) for transaction
         #   xid: Transaction ID being waited on
@@ -90,39 +105,84 @@
         self.xid = None
         self.xid_response = None
 
-        # Connection condition variable to notify anyone waiting for a cxn
-        self.connect_cv = Condition()
-
     def dbg(self, level, string):
         debug_log("CTRL", self.debug_level, level, string)
 
-    def connect(self):
+    def run(self):
         """
-        Open the socket connection
+        Activity function for class
 
-        @param host The host address to use for the socket
-        @param port The port number to use for the socket
-        @param passive If True, use passive cxn: Not yet supported.
+        Assumes connection to switch already exists.  Listens on
+        switch_socket for messages until an error (or zero len pkt)
+        occurs.
 
+        When there is a message on the socket, check for handlers; queue the
+        packet if no one handles the packet.
+
+        See note for controller describing the limitation of a single
+        connection for now.
+        """
+
+        if not self.switch_socket:
+            self.dbg(DEBUG_ERROR, 
+                     "Error in controller thread, no switch socket")
+            self.shutdown()
+            return
+
+        # Read and process packets from socket connected to switch
+        while 1:
+            try:
+                pkt = self.switch_socket.recv(self.rcv_size)
+            except socket.error:
+                self.dbg(DEBUG_ERROR, "Controller socket read error")
+                self.socket_errors += 1
+                break
+
+            if len(pkt) == 0:
+                # Considered an error; usually means switch has disconnected
+                self.dbg(DEBUG_INFO, "length 0 pkt in")
+                self.socket_errors += 1
+                break
+
+            if not self.connected:
+                break
+                
+            (handled, msg) = self._pkt_handler_check(pkt)
+            if handled:
+                self.packets_handled += 1
+                continue
+
+            # Not handled, enqueue
+            self.sync.acquire()
+            if len(self.packets) >= self.max_pkts:
+                self.packets.pop(0)
+                self.packets_expired += 1
+            self.packets.append((msg, pkt))
+            self.packets_total += 1
+            self.sync.release()
+
+        self.shutdown()
+
+    def connect(self, send_hello=True):
+        """
+        Connect to a switch and start this thread
+
+        Create the listening socket, accept and block until a
+        connection is made to the switch.  Then start the local
+        thread and return.  Parameters to the call are take from
+        member variables.
+
+        @param send_hello If True, send the initial hello packet on connection
         @return Boolean where True indicates success 
 
-        If already connected, will close the current connection
+        If already connected, returns True
+        If the listen socket does not exist, will create it
         """
-        oldstate = self.dbg_state
+
         self.dbg_state = "connecting"
-        if self.connected:
-            self.dbg(DEBUG_WARN, "Disconnect when already connected")
-            self.disconnect()
-
-        if not self.passive:
-            print "Error in controller init: Active cxn not supported"
-            # raise unsupported
-            self.dbg_state = oldstate
-            return False
-
-        # FIXME: add error handling; try SocketServer?
         self.dbg(DEBUG_INFO, "open ctl host: >" + str(self.host) + "< port " +
             str(self.port))
+        # Create the listening socket
         if not self.listen_socket:
             self.listen_socket = socket.socket(socket.AF_INET, 
                                                socket.SOCK_STREAM)
@@ -131,31 +191,50 @@
             self.listen_socket.bind((self.host, self.port))
         self.dbg_state = "listening"
         self.listen_socket.listen(LISTEN_QUEUE_SIZE)
+        self.dbg_state = "accepting"
         (self.switch_socket, self.switch_addr) = self.listen_socket.accept()
         if not self.switch_socket:
             self.socket_errors += 1
             self.dbg(DEBUG_WARN, "Failed on accept")
             self.dbg_state = "error"
+            self.listen_socket.close()
             return False
 
         self.connected = True
         self.dbg_state = "connected"
         self.dbg(DEBUG_INFO, "Got connection to " + str(self.switch_addr))
+
+        if send_hello:
+            h = hello()
+            self.message_send(h.pack())
+
+        # Start the background thread
+        self.start()
+
         return True
 
-    def disconnect(self):
+    def shutdown(self):
         """
-        Disconnect the switch socket
-        """
-        self.dbg_state = "disconnecting"
-        if not self.connected:
-            self.dbg(DEBUG_INFO, "disconnect when not connected")
-            self.dbg_state = "disconnected"
-            return
-        self.switch_socket.close()
-        self.connected = False
-        self.dbg_state = "disconnected"
+        Shutdown the controller closing all sockets
 
+        @todo Might want to synchronize shutdown with self.sync...
+        """
+        self.connected = False
+        self.dbg_state = "closed"
+        if self.switch_socket:
+            try:
+                self.switch_socket.shutdown(socket.SHUT_RDWR)
+            except:
+                self.dbg(DEBUG_INFO, "Ignoring switch soc shutdown error")
+            self.switch_socket = None
+
+        if self.listen_socket:
+            try:
+                self.listen_socket.shutdown(socket.SHUT_RDWR)
+            except:
+                self.dbg(DEBUG_INFO, "Ignoring listen soc shutdown error")
+            self.listen_socket = None
+        
     def _pkt_handler_check(self, pkt):
         """
         Check for packet handling before being enqueued
@@ -173,8 +252,8 @@
             self.dbg(DEBUG_INFO, "Could not parse header, pkt len", len(pkt))
             self.parse_errors += 1
             return (True, None)
-        if self.debug_level <= DEBUG_VERBOSE:
-            hdr.show()
+        # if self.debug_level <= DEBUG_VERBOSE:
+        #     hdr.show()
 
         self.dbg(DEBUG_VERBOSE, "message: len %d. type %s. hdr.len %d" %
             (len(pkt), ofp_type_map[hdr.type], hdr.length))
@@ -213,61 +292,6 @@
 
         return (handled, msg)
 
-    def run(self):
-        """
-        Activity function for class
-
-        Loops until stop is called (or self.running is set to False).
-        If the connection drops, it connects again.  It then receives
-        a message on the socket and checks for handlers, queuing the
-        packet if no one handles the packet.
-        """
-        self.running = True
-        while self.running:
-            if not self.connected:
-                if not self.connect():
-                    self.dbg(DEBUG_ERROR, 
-                             "Controller thread error connecting; exit")
-                    break
-            # Notify anyone waiting that a connection is made
-            self.connect_cv.acquire()
-            self.connect_cv.notify()
-            self.connect_cv.release()
-            try:
-                pkt = self.switch_socket.recv(self.rcv_size)
-                if len(pkt) == 0:
-                    self.dbg(DEBUG_WARN, "length 0 pkt in")
-                    self.disconnect()
-                    continue
-
-                (handled, msg) = self._pkt_handler_check(pkt)
-                if handled:
-                    self.packets_handled += 1
-                    continue
-
-                # Not handled, enqueue
-                self.sync.acquire()
-                if len(self.packets) >= self.max_pkts:
-                    self.packets.pop(0)
-                    self.packets_expired += 1
-                self.packets.append((msg, pkt))
-                self.packets_total += 1
-                self.sync.release()
-
-            except socket.error:
-                print "Controller socket read error"
-                self.socket_errors += 1
-                self.disconnect()
-
-    def stop(self):
-        """
-        Stop the running loop and disconnect the socket
-        """
-        self.running = False
-        # Is there something to do to switch_socket to stop an inprogress
-        # connect?
-        self.disconnect()
-
     def register(self, msg_type, handler):
         """
         Register a callback to receive a specific message type.
@@ -291,6 +315,7 @@
 
         @param exp_msg If set, return only when this type of message 
         is received.
+        @param timeout Not yet supported
 
         @retval A pair (msg, pkt) where msg is a message object and pkt
         the string representing the packet as received from the socket.
@@ -302,7 +327,7 @@
 
         # For now do not support time out;
         if timeout:
-            print "DEBUG WARNING:  poll time out not supported"
+            self.dbg(DEBUG_WARN, "Poll time out not supported")
 
         while len(self.packets) > 0:
             self.sync.acquire()
@@ -311,6 +336,8 @@
             if not exp_msg or (exp_msg and (msg.header.type == exp_msg)):
                 return msg, pkt
 
+        return None, None
+
     def transact(self, msg, timeout=None, zero_xid=False):
         """
         Run a message transaction with the switch
@@ -356,10 +383,15 @@
 
         """
 
+        if not self.switch_socket:
+            # Sending a string indicates the message is ready to go
+            self.dbg(DEBUG_INFO, "message_send: no socket")
+            return -1
+
         if type(msg) != type(""):
             # Sending a string indicates the message is ready to go
             self.dbg(DEBUG_INFO, "message_send requires packet as string")
-            return 0
+            return -1
 
         self.dbg(DEBUG_INFO, "Sending pkt of len " + str(len(msg)))
         return self.switch_socket.sendall(msg)
@@ -368,7 +400,6 @@
         string = "Controller:\n"
         string += "  connected       " + str(self.connected) + "\n"
         string += "  state           " + self.dbg_state + "\n"
-        string += "  running         " + str(self.running) + "\n"
         string += "  switch_addr     " + str(self.switch_addr) + "\n"
         string += "  pending pkts    " + str(len(self.packets)) + "\n"
         string += "  total pkts      " + str(self.packets_total) + "\n"
@@ -385,34 +416,6 @@
     def show(self):
         print str(self)
 
-    def connect_to_switch(self, timeout=None, send_hello=True):
-        """
-        Connect to switch
-
-        Block until a connection to a switch
-        is established or a timeout occurs.  Sends a hello message
-        once established.
-        @param timeout Timeout in seconds
-        @param send_hello If True, will send hello when connection established
-        @return True if successful, False otherwise
-        """
-        h = hello()
-        print "x1"
-        self.connect_cv.acquire()
-        if self.connected:
-            print "x2"
-            self.connect_cv.release()
-            return True
-
-        print "x3"
-        self.connect_cv.wait(timeout)
-        self.connect_cv.release()
-        print "x4"
-        if send_hello:
-            self.message_send(h.pack())
-            print "x5"
-        return self.connected
-
 def sample_handler(controller, msg, pkt):
     """
     Sample message handler
diff --git a/src/python/oftest/dataplane.py b/src/python/oftest/dataplane.py
index 0462f5d..89cf073 100644
--- a/src/python/oftest/dataplane.py
+++ b/src/python/oftest/dataplane.py
@@ -21,6 +21,7 @@
 import netutils
 from threading import Thread
 from threading import Lock
+import oft_config
 
 #@todo Move these identifiers into config
 ETH_P_ALL = 0x03
@@ -47,14 +48,20 @@
         """
         Thread.__init__(self)
         self.interface_name = interface_name
+        self.debug_level = oft_config.debug_level_default
         self.max_pkts = max_pkts
         self.packets_pending = 0
         self.packets_total = 0
         self.packets = []
         self.packet_times = []
+        self.packets_discarded = 0
         self.sync = Lock()
         self.socket = self.interface_open(interface_name)
-        print "Openned port monitor socket " + interface_name
+        self.dbg(oft_config.DEBUG_INFO, 
+                 "Openned port monitor socket " + interface_name)
+
+    def dbg(self, level, string):
+        oft_config.debug_log("DPLANE", self.debug_level, level, string)
 
     def interface_open(self, interface_name):
         """
@@ -65,18 +72,10 @@
         s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 
                           socket.htons(ETH_P_ALL))
         s.bind((interface_name, 0))
-        promisc.set_promisc(s, interface_name)
+        netutils.set_promisc(s, interface_name)
         s.settimeout(RCV_TIMEOUT)
         return s
 
-    def kill(self):
-        """
-        Terminate the running thread
-        """
-        self.running = False
-        self.socket.close()
-        print "Port monitor for " + self.interface_name + " exiting"
-
     def run(self):
         """
         Activity function for class
@@ -85,23 +84,41 @@
         while self.running:
             try:
                 rcvmsg = self.socket.recv(RCV_SIZE)
-                rcvtime = time.clock()
-
-                self.sync.acquire()
-                self.packets.append(rcvmsg)
-                self.packet_times.append(rcvtime)
-                self.packets_pending += 1
-                self.packets_total += 1
-                self.sync.release()
-
-            except socket.timeout:
-                print "Socket timeout for " + self.interface_name
             except socket.error:
-                print "Socket closed for " + self.interface_name
-                if self.running:
-                    self.kill()
+                self.dbg(DEBUG_INFO, "Socket error for " + 
+                         self.interface_name)
+                continue
+            if len(rcvmsg) == 0:
+                self.dbg(DEBUG_INFO, "Zero len pkt on " + self.interface_name)
+                self.kill()
                 break
 
+            rcvtime = time.clock()
+
+            self.sync.acquire()
+            if len(self.packets) >= self.max_pkts:
+                self.packets.pop(0)
+                self.packets_discarded += 1
+            self.packets.append(rcvmsg)
+            self.packet_times.append(rcvtime)
+            self.packets_pending += 1
+            self.packets_total += 1
+            self.sync.release()
+
+        self.dbg(DEBUG_INFO, "Thread exit for " + self.interface_name)
+
+    def kill(self):
+        """
+        Terminate the running thread
+        """
+        self.running = False
+        try:
+            self.socket.close()
+        except:
+            self.dbg(DEBUG_INFO, "Ignoring dataplane soc shutdown error")
+        self.dbg(oft_config.DEBUG_INFO, 
+                 "Port monitor for " + self.interface_name + " exiting")
+
     def dequeue(self):
         """
         Get the oldest packet in the queue
@@ -126,6 +143,7 @@
         Clear the packet queue
         """
         self.sync.acquire()
+        self.packets_discarded += len(self.packets)
         self.packets = []
         self.packet_times = []
         self.packets_pending = 0
@@ -138,6 +156,8 @@
         @param packet The packet data to send to the port
         @retval The number of bytes sent
         """
+        self.dbg(oft_config.DEBUG_VERBOSE,
+                 "port sending " + str(len(packet)) + " bytes")
         return self.socket.send(packet)
 
 
@@ -152,6 +172,12 @@
         """
         pass
 
+    def show(self, prefix=''):
+        print prefix + "Name:          " + self.interface_name
+        print prefix + "Pkts pending:  " + str(self.packets_pending)
+        print prefix + "Pkts total:    " + str(self.packets_total)
+        print prefix + "socket:        " + str(self.socket)
+        
 
 class DataPlane:
     """
@@ -160,6 +186,10 @@
     """
     def __init__(self):
         self.port_list = {}
+        self.debug_level = oft_config.debug_level_default
+
+    def dbg(self, level, string):
+        oft_config.debug_log("DPORT", self.debug_level, level, string)
 
     def port_add(self, interface_name, port_number):
         """
@@ -178,10 +208,13 @@
         @param port_number The port to send the data to
         @param packet Raw packet data to send to port
         """
+        self.dbg(oft_config.DEBUG_VERBOSE,
+                 "Sending %d bytes to port %d" % (len(packet), port_number))
         bytes = self.port_list[port_number].send(packet)
         if bytes != len(packet):
-            print "Unhandled send error, length mismatch %d != %d" % \
-                (bytes, len(packet))
+            self.dbg(DEBUG_ERROR,"Unhandled send error, " + 
+                     "length mismatch %d != %d" %
+                     (bytes, len(packet)))
         return bytes
 
     def flood(self, packet):
@@ -192,9 +225,9 @@
         for port_number in self.port_list.keys():
             bytes = self.port_list[port_number].send(packet)
             if bytes != len(packet):
-                print "Unhandled send error" + \
-                    ", port %d, length mismatch %d != %d" % \
-                    (port_number, bytes, len(packet))
+                self.dbg(DEBUG_ERROR, "Unhandled send error" +
+                         ", port %d, length mismatch %d != %d" %
+                         (port_number, bytes, len(packet)))
 
     def packet_get(self, port_number=None):
         """
@@ -230,8 +263,22 @@
         pkt, time = self.port_list[min_port].dequeue()
         return min_port, pkt, time
 
-    def kill(self):
+    def kill(self, join_threads=True):
+        """
+        Close all sockets for dataplane
+        @param join_threads If True (default) call join on each thread
+        """
         for port_number in self.port_list.keys():
             self.port_list[port_number].kill()
+            if join_threads:
+                self.dbg(oft_config.DEBUG_INFO, "Joining ", port_number)
+                self.port_list[port_number].join()
 
-        print "DataPlane shutdown"
+        self.dbg(oft_config.DEBUG_INFO, "DataPlane shutdown")
+
+    def show(self, prefix=''):
+        print prefix + "Dataplane Controller"
+        for pnum, port in self.port_list.items():
+            print prefix + "OpenFlow Port Number " + str(pnum)
+            port.show(prefix + '  ')
+