Mostly changes to socket deployment

Use select for handling sockets; hopefully better cleanup approach

Added connection semaphore for controller
Support message objects as arguments to controller.message_send
Support initial hello from controller when connected to switch
diff --git a/src/python/oftest/controller.py b/src/python/oftest/controller.py
index 779e42c..0b89ee6 100644
--- a/src/python/oftest/controller.py
+++ b/src/python/oftest/controller.py
@@ -37,6 +37,9 @@
 from message import *
 from parse import *
 from ofutils import *
+# For some reason, it seems select to be last (or later).
+# Otherwise get an attribute error when calling select.select
+import select
 
 class Controller(Thread):
     """
@@ -58,6 +61,8 @@
     @var max_pkts The max size of the receive queue
     @var keep_alive If true, listen for echo requests and respond w/
     echo replies
+    @var initial_hello If true, will send a hello message immediately
+    upon connecting to the switch
     @var host The host to use for connect
     @var port The port to connect on 
     @var packets_total Total number of packets received
@@ -73,6 +78,8 @@
         self.listen_socket = None
         self.switch_socket = None
         self.switch_addr = None
+        self.socs = []
+        self.connect_cv = Condition()
 
         # Counters
         self.socket_errors = 0
@@ -82,11 +89,12 @@
         self.packets_handled = 0
 
         # State
-        self.connected = False
         self.packets = []
         self.sync = Lock()
         self.handlers = {}
         self.keep_alive = False
+        self.active = True
+        self.initial_hello = True
 
         # Settings
         self.max_pkts = max_pkts
@@ -94,8 +102,8 @@
         self.host = controller_host
         self.port = controller_port
         self.dbg_state = "init"
-        # self.debug_level = DEBUG_VERBOSE
         self.debug_level = debug_level_default
+        # self.debug_level = DEBUG_VERBOSE
 
         # Transaction variables 
         #   xid_cv: Condition variable (semaphore) for transaction
@@ -108,6 +116,61 @@
     def dbg(self, level, string):
         debug_log("CTRL", self.debug_level, level, string)
 
+    def _socket_ready_handle(self, s):
+        """
+        Handle an input-ready socket
+        @param s The socket object that is ready
+        @retval True, reset the switch connection
+        """
+
+        if s == self.listen_socket:
+            if self.switch_socket:
+                self.dbg(DEBUG_ERROR, "Multiple switch cxns not supported")
+                sys.exit(1)
+
+            (self.switch_socket, self.switch_addr) = \
+                self.listen_socket.accept()
+            self.dbg(DEBUG_INFO, "Got cxn to " + str(self.switch_addr))
+            # Notify anyone waiting
+            self.connect_cv.acquire()
+            self.connect_cv.notify()
+            self.connect_cv.release()
+            self.socs.append(self.switch_socket)
+            if self.initial_hello:
+                self.message_send(hello())
+        elif s == self.switch_socket:
+            try:
+                pkt = self.switch_socket.recv(self.rcv_size)
+            except:
+                self.dbg(DEBUG_INFO, "error on switch read")
+                return True
+
+            if not self.active:
+                return False
+
+            if len(pkt) == 0:
+                self.dbg(DEBUG_INFO, "zero-len pkt in")
+                return True
+
+            (handled, msg) = self._pkt_handler_check(pkt)
+            if handled:
+                self.packets_handled += 1
+                return False
+
+            # Not handled, enqueue
+            self.sync.acquire()
+            if len(self.packets) >= self.max_pkts:
+                self.packets.pop(0)
+                self.packets_expired += 1
+            self.packets.append((msg, pkt))
+            self.packets_total += 1
+            self.sync.release()
+        else:
+            self.dbg(DEBUG_ERROR, "Unknown socket ready: " + str(s))
+            return True
+
+        return False
+
     def run(self):
         """
         Activity function for class
@@ -123,95 +186,87 @@
         connection for now.
         """
 
-        if not self.switch_socket:
-            self.dbg(DEBUG_ERROR, 
-                     "Error in controller thread, no switch socket")
-            self.shutdown()
-            return
+        self.dbg_state = "starting"
 
-        # Read and process packets from socket connected to switch
-        while 1:
-            try:
-                pkt = self.switch_socket.recv(self.rcv_size)
-            except socket.error:
-                self.dbg(DEBUG_ERROR, "Controller socket read error")
-                self.socket_errors += 1
-                break
-
-            if len(pkt) == 0:
-                # Considered an error; usually means switch has disconnected
-                self.dbg(DEBUG_INFO, "length 0 pkt in")
-                self.socket_errors += 1
-                break
-
-            if not self.connected:
-                break
-                
-            (handled, msg) = self._pkt_handler_check(pkt)
-            if handled:
-                self.packets_handled += 1
-                continue
-
-            # Not handled, enqueue
-            self.sync.acquire()
-            if len(self.packets) >= self.max_pkts:
-                self.packets.pop(0)
-                self.packets_expired += 1
-            self.packets.append((msg, pkt))
-            self.packets_total += 1
-            self.sync.release()
-
-        self.shutdown()
-
-    def connect(self, send_hello=True):
-        """
-        Connect to a switch and start this thread
-
-        Create the listening socket, accept and block until a
-        connection is made to the switch.  Then start the local
-        thread and return.  Parameters to the call are take from
-        member variables.
-
-        @param send_hello If True, send the initial hello packet on connection
-        @return Boolean where True indicates success 
-
-        If already connected, returns True
-        If the listen socket does not exist, will create it
-        """
-
-        self.dbg_state = "connecting"
-        self.dbg(DEBUG_INFO, "open ctl host: >" + str(self.host) + "< port " +
-            str(self.port))
-        # Create the listening socket
-        if not self.listen_socket:
-            self.listen_socket = socket.socket(socket.AF_INET, 
-                                               socket.SOCK_STREAM)
-            self.listen_socket.setsockopt(socket.SOL_SOCKET, 
-                                          socket.SO_REUSEADDR, 1)
-            self.listen_socket.bind((self.host, self.port))
+        # Create listen socket
+        self.dbg(DEBUG_INFO, "Create/listen at " + self.host + ":" + 
+                 str(self.port))
+        self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.listen_socket.setsockopt(socket.SOL_SOCKET, 
+                                      socket.SO_REUSEADDR, 1)
+        self.listen_socket.bind((self.host, self.port))
         self.dbg_state = "listening"
         self.listen_socket.listen(LISTEN_QUEUE_SIZE)
-        self.dbg_state = "accepting"
-        (self.switch_socket, self.switch_addr) = self.listen_socket.accept()
-        if not self.switch_socket:
-            self.socket_errors += 1
-            self.dbg(DEBUG_WARN, "Failed on accept")
-            self.dbg_state = "error"
-            self.listen_socket.close()
-            return False
 
-        self.connected = True
-        self.dbg_state = "connected"
-        self.dbg(DEBUG_INFO, "Got connection to " + str(self.switch_addr))
+        self.dbg(DEBUG_INFO, "Waiting for switch connection")
+        self.socs = [self.listen_socket]
+        self.dbg_state = "running"
+        while self.active:
+            reset_switch_cxn = False
+            try:
+                sel_in, sel_out, sel_err = \
+                    select.select(self.socs, [], self.socs, 1)
+            except:
+                print sys.exc_info()
+                self.dbg(DEBUG_ERROR, "Select error, exiting")
+                sys.exit(1)
 
-        if send_hello:
-            h = hello()
-            self.message_send(h.pack())
+            if not self.active:
+                break
 
-        # Start the background thread
-        self.start()
+            for s in sel_in:
+                reset_switch_cxn = self._socket_ready_handle(s)
 
-        return True
+            for s in sel_err:
+                self.dbg(DEBUG_ERROR, "Got socket error on: " + str(s))
+                if s == self.switch_socket:
+                    reset_switch_cxn = True
+                else:
+                    self.dbg(DEBUG_ERROR, "Socket error; exiting")
+                    self.active = False
+                    break
+
+            if self.active and reset_switch_cxn:
+                self.dbg(DEBUG_INFO, "Closing switch cxn")
+                try:
+                    self.switch_socket.close()
+                except:
+                    pass
+                self.switch_socket = None
+                self.socs = self.socs[0:1]
+
+        # End of main loop
+        self.dbg_state = "closing"
+        self.dbg(DEBUG_INFO, "Exiting controller thread")
+        self.shutdown()
+
+    def connect(self, timeout=None):
+        """
+        Connect to the switch
+
+        @param timeout If None, block until connected.  If 0, return 
+        immedidately.  Otherwise, block for up to timeout seconds
+        @return Boolean, True if connected
+        """
+
+        if timeout == 0:
+            return self.switch_socket is not None
+        if self.switch_socket is not None:
+            return True
+        self.connect_cv.acquire()
+        self.connect_cv.wait(timeout)
+        self.connect_cv.release()
+
+        return self.switch_socket is not None
+        
+    def kill(self):
+        """
+        Force the controller thread to quit
+
+        Just sets the active state variable to false and expects
+        the select timeout to kick in
+        """
+        self.active = False
 
     def shutdown(self):
         """
@@ -219,22 +274,20 @@
 
         @todo Might want to synchronize shutdown with self.sync...
         """
-        self.connected = False
-        self.dbg_state = "closed"
-        if self.switch_socket:
-            try:
-                self.switch_socket.shutdown(socket.SHUT_RDWR)
-            except:
-                self.dbg(DEBUG_INFO, "Ignoring switch soc shutdown error")
-            self.switch_socket = None
+        self.active = False
+        try:
+            self.switch_socket.shutdown(socket.SHUT_RDWR)
+        except:
+            self.dbg(DEBUG_INFO, "Ignoring switch soc shutdown error")
+        self.switch_socket = None
 
-        if self.listen_socket:
-            try:
-                self.listen_socket.shutdown(socket.SHUT_RDWR)
-            except:
-                self.dbg(DEBUG_INFO, "Ignoring listen soc shutdown error")
-            self.listen_socket = None
-        
+        try:
+            self.listen_socket.shutdown(socket.SHUT_RDWR)
+        except:
+            self.dbg(DEBUG_INFO, "Ignoring listen soc shutdown error")
+        self.listen_socket = None
+        self.dbg_state = "down"
+
     def _pkt_handler_check(self, pkt):
         """
         Check for packet handling before being enqueued
@@ -279,7 +332,7 @@
                 rep = echo_reply()
                 rep.header.xid = hdr.xid
                 # Ignoring additional data
-                self.message_send(rep.pack())
+                self.message_send(rep.pack(), zero_xid=True)
                 return (True, None)
 
         # Now check for message handlers; preference is given to
@@ -370,16 +423,22 @@
         self.xid_cv.wait(timeout)
         msg = self.xid_response
         self.xid_response = None
+        self.xid = None
         self.xid_cv.release()
         return msg
 
-    def message_send(self, msg):
+    def message_send(self, msg, zero_xid=False):
         """
         Send the message to the switch
 
-        @param msg An OpenFlow message object to be forwarded to the switch.  
+        @param msg A string or OpenFlow message object to be forwarded to 
+        the switch.  
+        @param zero_xid If msg is an OpenFlow object (not a string) and if 
+        the XID in the header is 0, then an XID will be generated
+        for the message.  Set xero_xid to override this behavior (and keep an
+        existing 0 xid)
 
-        @return None on success
+        @return -1 if error, 0 on success
 
         """
 
@@ -387,18 +446,28 @@
             # Sending a string indicates the message is ready to go
             self.dbg(DEBUG_INFO, "message_send: no socket")
             return -1
-
+        #@todo If not string, try to pack
         if type(msg) != type(""):
-            # Sending a string indicates the message is ready to go
-            self.dbg(DEBUG_INFO, "message_send requires packet as string")
-            return -1
+            try:
+                if msg.header.xid == 0 and not zero_xid:
+                    msg.header.xid = gen_xid()
+                outpkt = msg.pack()
+            except:
+                self.dbg(DEBUG_INFO, 
+                         "message_send: not an OF message or string?")
+                return -1
+        else:
+            outpkt = msg
 
-        self.dbg(DEBUG_INFO, "Sending pkt of len " + str(len(msg)))
-        return self.switch_socket.sendall(msg)
+        self.dbg(DEBUG_INFO, "Sending pkt of len " + str(len(outpkt)))
+        if self.switch_socket.sendall(outpkt) is None:
+            return 0
+
+        self.dbg(DEBUG_ERROR, "Unknown error on sendall")
+        return -1
 
     def __str__(self):
         string = "Controller:\n"
-        string += "  connected       " + str(self.connected) + "\n"
         string += "  state           " + self.dbg_state + "\n"
         string += "  switch_addr     " + str(self.switch_addr) + "\n"
         string += "  pending pkts    " + str(len(self.packets)) + "\n"