Mostly changes to socket deployment

Use select for handling sockets; hopefully better cleanup approach

Added connection semaphore for controller
Support message objects as arguments to controller.message_send
Support initial hello from controller when connected to switch
diff --git a/src/python/oftest/controller.py b/src/python/oftest/controller.py
index 779e42c..0b89ee6 100644
--- a/src/python/oftest/controller.py
+++ b/src/python/oftest/controller.py
@@ -37,6 +37,9 @@
 from message import *
 from parse import *
 from ofutils import *
+# For some reason, it seems select to be last (or later).
+# Otherwise get an attribute error when calling select.select
+import select
 
 class Controller(Thread):
     """
@@ -58,6 +61,8 @@
     @var max_pkts The max size of the receive queue
     @var keep_alive If true, listen for echo requests and respond w/
     echo replies
+    @var initial_hello If true, will send a hello message immediately
+    upon connecting to the switch
     @var host The host to use for connect
     @var port The port to connect on 
     @var packets_total Total number of packets received
@@ -73,6 +78,8 @@
         self.listen_socket = None
         self.switch_socket = None
         self.switch_addr = None
+        self.socs = []
+        self.connect_cv = Condition()
 
         # Counters
         self.socket_errors = 0
@@ -82,11 +89,12 @@
         self.packets_handled = 0
 
         # State
-        self.connected = False
         self.packets = []
         self.sync = Lock()
         self.handlers = {}
         self.keep_alive = False
+        self.active = True
+        self.initial_hello = True
 
         # Settings
         self.max_pkts = max_pkts
@@ -94,8 +102,8 @@
         self.host = controller_host
         self.port = controller_port
         self.dbg_state = "init"
-        # self.debug_level = DEBUG_VERBOSE
         self.debug_level = debug_level_default
+        # self.debug_level = DEBUG_VERBOSE
 
         # Transaction variables 
         #   xid_cv: Condition variable (semaphore) for transaction
@@ -108,6 +116,61 @@
     def dbg(self, level, string):
         debug_log("CTRL", self.debug_level, level, string)
 
+    def _socket_ready_handle(self, s):
+        """
+        Handle an input-ready socket
+        @param s The socket object that is ready
+        @retval True, reset the switch connection
+        """
+
+        if s == self.listen_socket:
+            if self.switch_socket:
+                self.dbg(DEBUG_ERROR, "Multiple switch cxns not supported")
+                sys.exit(1)
+
+            (self.switch_socket, self.switch_addr) = \
+                self.listen_socket.accept()
+            self.dbg(DEBUG_INFO, "Got cxn to " + str(self.switch_addr))
+            # Notify anyone waiting
+            self.connect_cv.acquire()
+            self.connect_cv.notify()
+            self.connect_cv.release()
+            self.socs.append(self.switch_socket)
+            if self.initial_hello:
+                self.message_send(hello())
+        elif s == self.switch_socket:
+            try:
+                pkt = self.switch_socket.recv(self.rcv_size)
+            except:
+                self.dbg(DEBUG_INFO, "error on switch read")
+                return True
+
+            if not self.active:
+                return False
+
+            if len(pkt) == 0:
+                self.dbg(DEBUG_INFO, "zero-len pkt in")
+                return True
+
+            (handled, msg) = self._pkt_handler_check(pkt)
+            if handled:
+                self.packets_handled += 1
+                return False
+
+            # Not handled, enqueue
+            self.sync.acquire()
+            if len(self.packets) >= self.max_pkts:
+                self.packets.pop(0)
+                self.packets_expired += 1
+            self.packets.append((msg, pkt))
+            self.packets_total += 1
+            self.sync.release()
+        else:
+            self.dbg(DEBUG_ERROR, "Unknown socket ready: " + str(s))
+            return True
+
+        return False
+
     def run(self):
         """
         Activity function for class
@@ -123,95 +186,87 @@
         connection for now.
         """
 
-        if not self.switch_socket:
-            self.dbg(DEBUG_ERROR, 
-                     "Error in controller thread, no switch socket")
-            self.shutdown()
-            return
+        self.dbg_state = "starting"
 
-        # Read and process packets from socket connected to switch
-        while 1:
-            try:
-                pkt = self.switch_socket.recv(self.rcv_size)
-            except socket.error:
-                self.dbg(DEBUG_ERROR, "Controller socket read error")
-                self.socket_errors += 1
-                break
-
-            if len(pkt) == 0:
-                # Considered an error; usually means switch has disconnected
-                self.dbg(DEBUG_INFO, "length 0 pkt in")
-                self.socket_errors += 1
-                break
-
-            if not self.connected:
-                break
-                
-            (handled, msg) = self._pkt_handler_check(pkt)
-            if handled:
-                self.packets_handled += 1
-                continue
-
-            # Not handled, enqueue
-            self.sync.acquire()
-            if len(self.packets) >= self.max_pkts:
-                self.packets.pop(0)
-                self.packets_expired += 1
-            self.packets.append((msg, pkt))
-            self.packets_total += 1
-            self.sync.release()
-
-        self.shutdown()
-
-    def connect(self, send_hello=True):
-        """
-        Connect to a switch and start this thread
-
-        Create the listening socket, accept and block until a
-        connection is made to the switch.  Then start the local
-        thread and return.  Parameters to the call are take from
-        member variables.
-
-        @param send_hello If True, send the initial hello packet on connection
-        @return Boolean where True indicates success 
-
-        If already connected, returns True
-        If the listen socket does not exist, will create it
-        """
-
-        self.dbg_state = "connecting"
-        self.dbg(DEBUG_INFO, "open ctl host: >" + str(self.host) + "< port " +
-            str(self.port))
-        # Create the listening socket
-        if not self.listen_socket:
-            self.listen_socket = socket.socket(socket.AF_INET, 
-                                               socket.SOCK_STREAM)
-            self.listen_socket.setsockopt(socket.SOL_SOCKET, 
-                                          socket.SO_REUSEADDR, 1)
-            self.listen_socket.bind((self.host, self.port))
+        # Create listen socket
+        self.dbg(DEBUG_INFO, "Create/listen at " + self.host + ":" + 
+                 str(self.port))
+        self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.listen_socket.setsockopt(socket.SOL_SOCKET, 
+                                      socket.SO_REUSEADDR, 1)
+        self.listen_socket.bind((self.host, self.port))
         self.dbg_state = "listening"
         self.listen_socket.listen(LISTEN_QUEUE_SIZE)
-        self.dbg_state = "accepting"
-        (self.switch_socket, self.switch_addr) = self.listen_socket.accept()
-        if not self.switch_socket:
-            self.socket_errors += 1
-            self.dbg(DEBUG_WARN, "Failed on accept")
-            self.dbg_state = "error"
-            self.listen_socket.close()
-            return False
 
-        self.connected = True
-        self.dbg_state = "connected"
-        self.dbg(DEBUG_INFO, "Got connection to " + str(self.switch_addr))
+        self.dbg(DEBUG_INFO, "Waiting for switch connection")
+        self.socs = [self.listen_socket]
+        self.dbg_state = "running"
+        while self.active:
+            reset_switch_cxn = False
+            try:
+                sel_in, sel_out, sel_err = \
+                    select.select(self.socs, [], self.socs, 1)
+            except:
+                print sys.exc_info()
+                self.dbg(DEBUG_ERROR, "Select error, exiting")
+                sys.exit(1)
 
-        if send_hello:
-            h = hello()
-            self.message_send(h.pack())
+            if not self.active:
+                break
 
-        # Start the background thread
-        self.start()
+            for s in sel_in:
+                reset_switch_cxn = self._socket_ready_handle(s)
 
-        return True
+            for s in sel_err:
+                self.dbg(DEBUG_ERROR, "Got socket error on: " + str(s))
+                if s == self.switch_socket:
+                    reset_switch_cxn = True
+                else:
+                    self.dbg(DEBUG_ERROR, "Socket error; exiting")
+                    self.active = False
+                    break
+
+            if self.active and reset_switch_cxn:
+                self.dbg(DEBUG_INFO, "Closing switch cxn")
+                try:
+                    self.switch_socket.close()
+                except:
+                    pass
+                self.switch_socket = None
+                self.socs = self.socs[0:1]
+
+        # End of main loop
+        self.dbg_state = "closing"
+        self.dbg(DEBUG_INFO, "Exiting controller thread")
+        self.shutdown()
+
+    def connect(self, timeout=None):
+        """
+        Connect to the switch
+
+        @param timeout If None, block until connected.  If 0, return 
+        immedidately.  Otherwise, block for up to timeout seconds
+        @return Boolean, True if connected
+        """
+
+        if timeout == 0:
+            return self.switch_socket is not None
+        if self.switch_socket is not None:
+            return True
+        self.connect_cv.acquire()
+        self.connect_cv.wait(timeout)
+        self.connect_cv.release()
+
+        return self.switch_socket is not None
+        
+    def kill(self):
+        """
+        Force the controller thread to quit
+
+        Just sets the active state variable to false and expects
+        the select timeout to kick in
+        """
+        self.active = False
 
     def shutdown(self):
         """
@@ -219,22 +274,20 @@
 
         @todo Might want to synchronize shutdown with self.sync...
         """
-        self.connected = False
-        self.dbg_state = "closed"
-        if self.switch_socket:
-            try:
-                self.switch_socket.shutdown(socket.SHUT_RDWR)
-            except:
-                self.dbg(DEBUG_INFO, "Ignoring switch soc shutdown error")
-            self.switch_socket = None
+        self.active = False
+        try:
+            self.switch_socket.shutdown(socket.SHUT_RDWR)
+        except:
+            self.dbg(DEBUG_INFO, "Ignoring switch soc shutdown error")
+        self.switch_socket = None
 
-        if self.listen_socket:
-            try:
-                self.listen_socket.shutdown(socket.SHUT_RDWR)
-            except:
-                self.dbg(DEBUG_INFO, "Ignoring listen soc shutdown error")
-            self.listen_socket = None
-        
+        try:
+            self.listen_socket.shutdown(socket.SHUT_RDWR)
+        except:
+            self.dbg(DEBUG_INFO, "Ignoring listen soc shutdown error")
+        self.listen_socket = None
+        self.dbg_state = "down"
+
     def _pkt_handler_check(self, pkt):
         """
         Check for packet handling before being enqueued
@@ -279,7 +332,7 @@
                 rep = echo_reply()
                 rep.header.xid = hdr.xid
                 # Ignoring additional data
-                self.message_send(rep.pack())
+                self.message_send(rep.pack(), zero_xid=True)
                 return (True, None)
 
         # Now check for message handlers; preference is given to
@@ -370,16 +423,22 @@
         self.xid_cv.wait(timeout)
         msg = self.xid_response
         self.xid_response = None
+        self.xid = None
         self.xid_cv.release()
         return msg
 
-    def message_send(self, msg):
+    def message_send(self, msg, zero_xid=False):
         """
         Send the message to the switch
 
-        @param msg An OpenFlow message object to be forwarded to the switch.  
+        @param msg A string or OpenFlow message object to be forwarded to 
+        the switch.  
+        @param zero_xid If msg is an OpenFlow object (not a string) and if 
+        the XID in the header is 0, then an XID will be generated
+        for the message.  Set xero_xid to override this behavior (and keep an
+        existing 0 xid)
 
-        @return None on success
+        @return -1 if error, 0 on success
 
         """
 
@@ -387,18 +446,28 @@
             # Sending a string indicates the message is ready to go
             self.dbg(DEBUG_INFO, "message_send: no socket")
             return -1
-
+        #@todo If not string, try to pack
         if type(msg) != type(""):
-            # Sending a string indicates the message is ready to go
-            self.dbg(DEBUG_INFO, "message_send requires packet as string")
-            return -1
+            try:
+                if msg.header.xid == 0 and not zero_xid:
+                    msg.header.xid = gen_xid()
+                outpkt = msg.pack()
+            except:
+                self.dbg(DEBUG_INFO, 
+                         "message_send: not an OF message or string?")
+                return -1
+        else:
+            outpkt = msg
 
-        self.dbg(DEBUG_INFO, "Sending pkt of len " + str(len(msg)))
-        return self.switch_socket.sendall(msg)
+        self.dbg(DEBUG_INFO, "Sending pkt of len " + str(len(outpkt)))
+        if self.switch_socket.sendall(outpkt) is None:
+            return 0
+
+        self.dbg(DEBUG_ERROR, "Unknown error on sendall")
+        return -1
 
     def __str__(self):
         string = "Controller:\n"
-        string += "  connected       " + str(self.connected) + "\n"
         string += "  state           " + self.dbg_state + "\n"
         string += "  switch_addr     " + str(self.switch_addr) + "\n"
         string += "  pending pkts    " + str(len(self.packets)) + "\n"
diff --git a/src/python/oftest/dataplane.py b/src/python/oftest/dataplane.py
index 89cf073..44ec997 100644
--- a/src/python/oftest/dataplane.py
+++ b/src/python/oftest/dataplane.py
@@ -21,13 +21,29 @@
 import netutils
 from threading import Thread
 from threading import Lock
-import oft_config
+from oft_config import *
+import select
 
 #@todo Move these identifiers into config
 ETH_P_ALL = 0x03
 RCV_TIMEOUT = 10000
 RCV_SIZE = 4096
 
+# class packet_queue:
+#     """
+#     Class defining a packet queue across multiple ports
+
+#     Items in the queue are stored as a triple (port number, pkt, pkt in time)
+#     """
+
+#     def __init__(self, max_pkts=1024):
+#         self.sync = Lock()
+#         self.debug_level = debug_level_default
+#         self.packets = []
+#         self.max_pkts = max_pkts
+#         self.packets_total = 0
+#         self.packets_discarded = 0
+
 class DataPlanePort(Thread):
     """
     Class defining a port monitoring object.
@@ -48,20 +64,18 @@
         """
         Thread.__init__(self)
         self.interface_name = interface_name
-        self.debug_level = oft_config.debug_level_default
+        self.debug_level = debug_level_default
         self.max_pkts = max_pkts
-        self.packets_pending = 0
         self.packets_total = 0
         self.packets = []
-        self.packet_times = []
         self.packets_discarded = 0
         self.sync = Lock()
         self.socket = self.interface_open(interface_name)
-        self.dbg(oft_config.DEBUG_INFO, 
-                 "Openned port monitor socket " + interface_name)
+        self.dbg(DEBUG_INFO, "Openned port monitor socket")
 
     def dbg(self, level, string):
-        oft_config.debug_log("DPLANE", self.debug_level, level, string)
+        debug_log("DPLANE", self.debug_level, level, 
+                  self.interface_name + ": " + string)
 
     def interface_open(self, interface_name):
         """
@@ -81,31 +95,52 @@
         Activity function for class
         """
         self.running = True
+        self.socs = [self.socket]
+        error_warned = False # Have we warned about error?
         while self.running:
             try:
+                sel_in, sel_out, sel_err = \
+                    select.select(self.socs, [], [], 1)
+            except:
+                print sys.exc_info()
+                self.dbg(DEBUG_ERROR, "Select error, exiting")
+                sys.exit(1)
+
+            #if not sel_err is None:
+            #    self.dbg(DEBUG_VERBOSE, "Socket error from select set")
+
+            if not self.running:
+                break
+
+            if sel_in is None:
+                continue
+
+            try:
                 rcvmsg = self.socket.recv(RCV_SIZE)
             except socket.error:
-                self.dbg(DEBUG_INFO, "Socket error for " + 
-                         self.interface_name)
+                if not error_warned:
+                    self.dbg(DEBUG_INFO, "Socket error on recv")
+                    error_warned = True
                 continue
+
             if len(rcvmsg) == 0:
-                self.dbg(DEBUG_INFO, "Zero len pkt on " + self.interface_name)
+                self.dbg(DEBUG_INFO, "Zero len pkt rcvd")
                 self.kill()
                 break
 
             rcvtime = time.clock()
+            self.dbg(DEBUG_VERBOSE, "Pkt len " + str(len(rcvmsg)) + 
+                     " in at " + str(rcvtime))
 
             self.sync.acquire()
             if len(self.packets) >= self.max_pkts:
                 self.packets.pop(0)
                 self.packets_discarded += 1
-            self.packets.append(rcvmsg)
-            self.packet_times.append(rcvtime)
-            self.packets_pending += 1
+            self.packets.append((rcvmsg, rcvtime))
             self.packets_total += 1
             self.sync.release()
 
-        self.dbg(DEBUG_INFO, "Thread exit for " + self.interface_name)
+        self.dbg(DEBUG_INFO, "Thread exit ")
 
     def kill(self):
         """
@@ -116,17 +151,15 @@
             self.socket.close()
         except:
             self.dbg(DEBUG_INFO, "Ignoring dataplane soc shutdown error")
-        self.dbg(oft_config.DEBUG_INFO, 
-                 "Port monitor for " + self.interface_name + " exiting")
+        self.dbg(DEBUG_INFO, 
+                 "Port monitor exiting")
 
     def dequeue(self):
         """
         Get the oldest packet in the queue
         """
         self.sync.acquire()
-        pkt = self.packets.pop(0)
-        pkt_time = self.packet_times.pop(0)
-        self.packets_pending -= 1
+        pkt, pkt_time = self.packets.pop(0)
         self.sync.release()
         return pkt, pkt_time
 
@@ -134,9 +167,12 @@
         """
         Return the timestamp of the head of queue or None if empty
         """
-        if self.packets_pending:
-            return self.packet_times[0]
-        return None
+        rv = None
+        self.sync.acquire()
+        if len(self.packets) > 0:
+            rv = self.packets[0][1]
+        self.sync.release()
+        return rv
 
     def flush(self):
         """
@@ -146,7 +182,6 @@
         self.packets_discarded += len(self.packets)
         self.packets = []
         self.packet_times = []
-        self.packets_pending = 0
         self.sync.release()
 
 
@@ -156,7 +191,7 @@
         @param packet The packet data to send to the port
         @retval The number of bytes sent
         """
-        self.dbg(oft_config.DEBUG_VERBOSE,
+        self.dbg(DEBUG_VERBOSE,
                  "port sending " + str(len(packet)) + " bytes")
         return self.socket.send(packet)
 
@@ -174,7 +209,7 @@
 
     def show(self, prefix=''):
         print prefix + "Name:          " + self.interface_name
-        print prefix + "Pkts pending:  " + str(self.packets_pending)
+        print prefix + "Pkts pending:  " + str(len(self.packets))
         print prefix + "Pkts total:    " + str(self.packets_total)
         print prefix + "socket:        " + str(self.socket)
         
@@ -186,10 +221,10 @@
     """
     def __init__(self):
         self.port_list = {}
-        self.debug_level = oft_config.debug_level_default
+        self.debug_level = debug_level_default
 
     def dbg(self, level, string):
-        oft_config.debug_log("DPORT", self.debug_level, level, string)
+        debug_log("DPORT", self.debug_level, level, string)
 
     def port_add(self, interface_name, port_number):
         """
@@ -208,7 +243,7 @@
         @param port_number The port to send the data to
         @param packet Raw packet data to send to port
         """
-        self.dbg(oft_config.DEBUG_VERBOSE,
+        self.dbg(DEBUG_VERBOSE,
                  "Sending %d bytes to port %d" % (len(packet), port_number))
         bytes = self.port_list[port_number].send(packet)
         if bytes != len(packet):
@@ -232,7 +267,8 @@
     def packet_get(self, port_number=None):
         """
         Get a packet from the data plane
-        If port_number is given, get the packet from that port.
+
+        If port_number is given, get the oldest packet from that port.
         Otherwise, find the port with the oldest packet and return
         that packet.
         @param port_number If set, get packet from this port
@@ -241,13 +277,14 @@
         """
 
         if port_number:
-            if self.port_list[port_number].packets_pending != 0:
+            if len(self.port_list[port_number].packets) != 0:
                 pkt, time = self.port_list[port_number].dequeue()
                 return port_number, pkt, time
             else:
                 return None, None, None
 
         # Find port with oldest packet
+        #@todo Consider using a single queue for all ports
         min_time = 0
         min_port = -1
         for port_number in self.port_list.keys():
@@ -263,18 +300,18 @@
         pkt, time = self.port_list[min_port].dequeue()
         return min_port, pkt, time
 
-    def kill(self, join_threads=True):
+    def kill(self, join_threads=False):
         """
         Close all sockets for dataplane
-        @param join_threads If True (default) call join on each thread
+        @param join_threads If True call join on each thread
         """
         for port_number in self.port_list.keys():
             self.port_list[port_number].kill()
             if join_threads:
-                self.dbg(oft_config.DEBUG_INFO, "Joining ", port_number)
+                self.dbg(DEBUG_INFO, "Joining " + str(port_number))
                 self.port_list[port_number].join()
 
-        self.dbg(oft_config.DEBUG_INFO, "DataPlane shutdown")
+        self.dbg(DEBUG_INFO, "DataPlane shutdown")
 
     def show(self, prefix=''):
         print prefix + "Dataplane Controller"
diff --git a/src/python/oftest/oft_config.py b/src/python/oftest/oft_config.py
index e7311e0..264c94d 100644
--- a/src/python/oftest/oft_config.py
+++ b/src/python/oftest/oft_config.py
@@ -101,10 +101,10 @@
 
 elif platform == "bcm_indigo":
     interface_ofport_map = {
-#        1 : "eth2",
-#        2 : "eth3",
-        3 : "eth4",
-#        4 : "eth5"
+        23 : "eth2",
+        24 : "eth3",
+        25 : "eth4",
+        26 : "eth5"
         }
     # For SSH connections to switch
     switch_cxn_type = "ssh"