Mostly changes to socket deployment

Use select for handling sockets; hopefully better cleanup approach

Added connection semaphore for controller
Support message objects as arguments to controller.message_send
Support initial hello from controller when connected to switch
diff --git a/src/python/oftest/dataplane.py b/src/python/oftest/dataplane.py
index 89cf073..44ec997 100644
--- a/src/python/oftest/dataplane.py
+++ b/src/python/oftest/dataplane.py
@@ -21,13 +21,29 @@
 import netutils
 from threading import Thread
 from threading import Lock
-import oft_config
+from oft_config import *
+import select
 
 #@todo Move these identifiers into config
 ETH_P_ALL = 0x03
 RCV_TIMEOUT = 10000
 RCV_SIZE = 4096
 
+# class packet_queue:
+#     """
+#     Class defining a packet queue across multiple ports
+
+#     Items in the queue are stored as a triple (port number, pkt, pkt in time)
+#     """
+
+#     def __init__(self, max_pkts=1024):
+#         self.sync = Lock()
+#         self.debug_level = debug_level_default
+#         self.packets = []
+#         self.max_pkts = max_pkts
+#         self.packets_total = 0
+#         self.packets_discarded = 0
+
 class DataPlanePort(Thread):
     """
     Class defining a port monitoring object.
@@ -48,20 +64,18 @@
         """
         Thread.__init__(self)
         self.interface_name = interface_name
-        self.debug_level = oft_config.debug_level_default
+        self.debug_level = debug_level_default
         self.max_pkts = max_pkts
-        self.packets_pending = 0
         self.packets_total = 0
         self.packets = []
-        self.packet_times = []
         self.packets_discarded = 0
         self.sync = Lock()
         self.socket = self.interface_open(interface_name)
-        self.dbg(oft_config.DEBUG_INFO, 
-                 "Openned port monitor socket " + interface_name)
+        self.dbg(DEBUG_INFO, "Openned port monitor socket")
 
     def dbg(self, level, string):
-        oft_config.debug_log("DPLANE", self.debug_level, level, string)
+        debug_log("DPLANE", self.debug_level, level, 
+                  self.interface_name + ": " + string)
 
     def interface_open(self, interface_name):
         """
@@ -81,31 +95,52 @@
         Activity function for class
         """
         self.running = True
+        self.socs = [self.socket]
+        error_warned = False # Have we warned about error?
         while self.running:
             try:
+                sel_in, sel_out, sel_err = \
+                    select.select(self.socs, [], [], 1)
+            except:
+                print sys.exc_info()
+                self.dbg(DEBUG_ERROR, "Select error, exiting")
+                sys.exit(1)
+
+            #if not sel_err is None:
+            #    self.dbg(DEBUG_VERBOSE, "Socket error from select set")
+
+            if not self.running:
+                break
+
+            if sel_in is None:
+                continue
+
+            try:
                 rcvmsg = self.socket.recv(RCV_SIZE)
             except socket.error:
-                self.dbg(DEBUG_INFO, "Socket error for " + 
-                         self.interface_name)
+                if not error_warned:
+                    self.dbg(DEBUG_INFO, "Socket error on recv")
+                    error_warned = True
                 continue
+
             if len(rcvmsg) == 0:
-                self.dbg(DEBUG_INFO, "Zero len pkt on " + self.interface_name)
+                self.dbg(DEBUG_INFO, "Zero len pkt rcvd")
                 self.kill()
                 break
 
             rcvtime = time.clock()
+            self.dbg(DEBUG_VERBOSE, "Pkt len " + str(len(rcvmsg)) + 
+                     " in at " + str(rcvtime))
 
             self.sync.acquire()
             if len(self.packets) >= self.max_pkts:
                 self.packets.pop(0)
                 self.packets_discarded += 1
-            self.packets.append(rcvmsg)
-            self.packet_times.append(rcvtime)
-            self.packets_pending += 1
+            self.packets.append((rcvmsg, rcvtime))
             self.packets_total += 1
             self.sync.release()
 
-        self.dbg(DEBUG_INFO, "Thread exit for " + self.interface_name)
+        self.dbg(DEBUG_INFO, "Thread exit ")
 
     def kill(self):
         """
@@ -116,17 +151,15 @@
             self.socket.close()
         except:
             self.dbg(DEBUG_INFO, "Ignoring dataplane soc shutdown error")
-        self.dbg(oft_config.DEBUG_INFO, 
-                 "Port monitor for " + self.interface_name + " exiting")
+        self.dbg(DEBUG_INFO, 
+                 "Port monitor exiting")
 
     def dequeue(self):
         """
         Get the oldest packet in the queue
         """
         self.sync.acquire()
-        pkt = self.packets.pop(0)
-        pkt_time = self.packet_times.pop(0)
-        self.packets_pending -= 1
+        pkt, pkt_time = self.packets.pop(0)
         self.sync.release()
         return pkt, pkt_time
 
@@ -134,9 +167,12 @@
         """
         Return the timestamp of the head of queue or None if empty
         """
-        if self.packets_pending:
-            return self.packet_times[0]
-        return None
+        rv = None
+        self.sync.acquire()
+        if len(self.packets) > 0:
+            rv = self.packets[0][1]
+        self.sync.release()
+        return rv
 
     def flush(self):
         """
@@ -146,7 +182,6 @@
         self.packets_discarded += len(self.packets)
         self.packets = []
         self.packet_times = []
-        self.packets_pending = 0
         self.sync.release()
 
 
@@ -156,7 +191,7 @@
         @param packet The packet data to send to the port
         @retval The number of bytes sent
         """
-        self.dbg(oft_config.DEBUG_VERBOSE,
+        self.dbg(DEBUG_VERBOSE,
                  "port sending " + str(len(packet)) + " bytes")
         return self.socket.send(packet)
 
@@ -174,7 +209,7 @@
 
     def show(self, prefix=''):
         print prefix + "Name:          " + self.interface_name
-        print prefix + "Pkts pending:  " + str(self.packets_pending)
+        print prefix + "Pkts pending:  " + str(len(self.packets))
         print prefix + "Pkts total:    " + str(self.packets_total)
         print prefix + "socket:        " + str(self.socket)
         
@@ -186,10 +221,10 @@
     """
     def __init__(self):
         self.port_list = {}
-        self.debug_level = oft_config.debug_level_default
+        self.debug_level = debug_level_default
 
     def dbg(self, level, string):
-        oft_config.debug_log("DPORT", self.debug_level, level, string)
+        debug_log("DPORT", self.debug_level, level, string)
 
     def port_add(self, interface_name, port_number):
         """
@@ -208,7 +243,7 @@
         @param port_number The port to send the data to
         @param packet Raw packet data to send to port
         """
-        self.dbg(oft_config.DEBUG_VERBOSE,
+        self.dbg(DEBUG_VERBOSE,
                  "Sending %d bytes to port %d" % (len(packet), port_number))
         bytes = self.port_list[port_number].send(packet)
         if bytes != len(packet):
@@ -232,7 +267,8 @@
     def packet_get(self, port_number=None):
         """
         Get a packet from the data plane
-        If port_number is given, get the packet from that port.
+
+        If port_number is given, get the oldest packet from that port.
         Otherwise, find the port with the oldest packet and return
         that packet.
         @param port_number If set, get packet from this port
@@ -241,13 +277,14 @@
         """
 
         if port_number:
-            if self.port_list[port_number].packets_pending != 0:
+            if len(self.port_list[port_number].packets) != 0:
                 pkt, time = self.port_list[port_number].dequeue()
                 return port_number, pkt, time
             else:
                 return None, None, None
 
         # Find port with oldest packet
+        #@todo Consider using a single queue for all ports
         min_time = 0
         min_port = -1
         for port_number in self.port_list.keys():
@@ -263,18 +300,18 @@
         pkt, time = self.port_list[min_port].dequeue()
         return min_port, pkt, time
 
-    def kill(self, join_threads=True):
+    def kill(self, join_threads=False):
         """
         Close all sockets for dataplane
-        @param join_threads If True (default) call join on each thread
+        @param join_threads If True call join on each thread
         """
         for port_number in self.port_list.keys():
             self.port_list[port_number].kill()
             if join_threads:
-                self.dbg(oft_config.DEBUG_INFO, "Joining ", port_number)
+                self.dbg(DEBUG_INFO, "Joining " + str(port_number))
                 self.port_list[port_number].join()
 
-        self.dbg(oft_config.DEBUG_INFO, "DataPlane shutdown")
+        self.dbg(DEBUG_INFO, "DataPlane shutdown")
 
     def show(self, prefix=''):
         print prefix + "Dataplane Controller"