Mostly changes to socket deployment
Use select for handling sockets; hopefully better cleanup approach
Added connection semaphore for controller
Support message objects as arguments to controller.message_send
Support initial hello from controller when connected to switch
diff --git a/src/python/oftest/controller.py b/src/python/oftest/controller.py
index 779e42c..0b89ee6 100644
--- a/src/python/oftest/controller.py
+++ b/src/python/oftest/controller.py
@@ -37,6 +37,9 @@
from message import *
from parse import *
from ofutils import *
+# For some reason, it seems select to be last (or later).
+# Otherwise get an attribute error when calling select.select
+import select
class Controller(Thread):
"""
@@ -58,6 +61,8 @@
@var max_pkts The max size of the receive queue
@var keep_alive If true, listen for echo requests and respond w/
echo replies
+ @var initial_hello If true, will send a hello message immediately
+ upon connecting to the switch
@var host The host to use for connect
@var port The port to connect on
@var packets_total Total number of packets received
@@ -73,6 +78,8 @@
self.listen_socket = None
self.switch_socket = None
self.switch_addr = None
+ self.socs = []
+ self.connect_cv = Condition()
# Counters
self.socket_errors = 0
@@ -82,11 +89,12 @@
self.packets_handled = 0
# State
- self.connected = False
self.packets = []
self.sync = Lock()
self.handlers = {}
self.keep_alive = False
+ self.active = True
+ self.initial_hello = True
# Settings
self.max_pkts = max_pkts
@@ -94,8 +102,8 @@
self.host = controller_host
self.port = controller_port
self.dbg_state = "init"
- # self.debug_level = DEBUG_VERBOSE
self.debug_level = debug_level_default
+ # self.debug_level = DEBUG_VERBOSE
# Transaction variables
# xid_cv: Condition variable (semaphore) for transaction
@@ -108,6 +116,61 @@
def dbg(self, level, string):
debug_log("CTRL", self.debug_level, level, string)
+ def _socket_ready_handle(self, s):
+ """
+ Handle an input-ready socket
+ @param s The socket object that is ready
+ @retval True, reset the switch connection
+ """
+
+ if s == self.listen_socket:
+ if self.switch_socket:
+ self.dbg(DEBUG_ERROR, "Multiple switch cxns not supported")
+ sys.exit(1)
+
+ (self.switch_socket, self.switch_addr) = \
+ self.listen_socket.accept()
+ self.dbg(DEBUG_INFO, "Got cxn to " + str(self.switch_addr))
+ # Notify anyone waiting
+ self.connect_cv.acquire()
+ self.connect_cv.notify()
+ self.connect_cv.release()
+ self.socs.append(self.switch_socket)
+ if self.initial_hello:
+ self.message_send(hello())
+ elif s == self.switch_socket:
+ try:
+ pkt = self.switch_socket.recv(self.rcv_size)
+ except:
+ self.dbg(DEBUG_INFO, "error on switch read")
+ return True
+
+ if not self.active:
+ return False
+
+ if len(pkt) == 0:
+ self.dbg(DEBUG_INFO, "zero-len pkt in")
+ return True
+
+ (handled, msg) = self._pkt_handler_check(pkt)
+ if handled:
+ self.packets_handled += 1
+ return False
+
+ # Not handled, enqueue
+ self.sync.acquire()
+ if len(self.packets) >= self.max_pkts:
+ self.packets.pop(0)
+ self.packets_expired += 1
+ self.packets.append((msg, pkt))
+ self.packets_total += 1
+ self.sync.release()
+ else:
+ self.dbg(DEBUG_ERROR, "Unknown socket ready: " + str(s))
+ return True
+
+ return False
+
def run(self):
"""
Activity function for class
@@ -123,95 +186,87 @@
connection for now.
"""
- if not self.switch_socket:
- self.dbg(DEBUG_ERROR,
- "Error in controller thread, no switch socket")
- self.shutdown()
- return
+ self.dbg_state = "starting"
- # Read and process packets from socket connected to switch
- while 1:
- try:
- pkt = self.switch_socket.recv(self.rcv_size)
- except socket.error:
- self.dbg(DEBUG_ERROR, "Controller socket read error")
- self.socket_errors += 1
- break
-
- if len(pkt) == 0:
- # Considered an error; usually means switch has disconnected
- self.dbg(DEBUG_INFO, "length 0 pkt in")
- self.socket_errors += 1
- break
-
- if not self.connected:
- break
-
- (handled, msg) = self._pkt_handler_check(pkt)
- if handled:
- self.packets_handled += 1
- continue
-
- # Not handled, enqueue
- self.sync.acquire()
- if len(self.packets) >= self.max_pkts:
- self.packets.pop(0)
- self.packets_expired += 1
- self.packets.append((msg, pkt))
- self.packets_total += 1
- self.sync.release()
-
- self.shutdown()
-
- def connect(self, send_hello=True):
- """
- Connect to a switch and start this thread
-
- Create the listening socket, accept and block until a
- connection is made to the switch. Then start the local
- thread and return. Parameters to the call are take from
- member variables.
-
- @param send_hello If True, send the initial hello packet on connection
- @return Boolean where True indicates success
-
- If already connected, returns True
- If the listen socket does not exist, will create it
- """
-
- self.dbg_state = "connecting"
- self.dbg(DEBUG_INFO, "open ctl host: >" + str(self.host) + "< port " +
- str(self.port))
- # Create the listening socket
- if not self.listen_socket:
- self.listen_socket = socket.socket(socket.AF_INET,
- socket.SOCK_STREAM)
- self.listen_socket.setsockopt(socket.SOL_SOCKET,
- socket.SO_REUSEADDR, 1)
- self.listen_socket.bind((self.host, self.port))
+ # Create listen socket
+ self.dbg(DEBUG_INFO, "Create/listen at " + self.host + ":" +
+ str(self.port))
+ self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.listen_socket.setsockopt(socket.SOL_SOCKET,
+ socket.SO_REUSEADDR, 1)
+ self.listen_socket.bind((self.host, self.port))
self.dbg_state = "listening"
self.listen_socket.listen(LISTEN_QUEUE_SIZE)
- self.dbg_state = "accepting"
- (self.switch_socket, self.switch_addr) = self.listen_socket.accept()
- if not self.switch_socket:
- self.socket_errors += 1
- self.dbg(DEBUG_WARN, "Failed on accept")
- self.dbg_state = "error"
- self.listen_socket.close()
- return False
- self.connected = True
- self.dbg_state = "connected"
- self.dbg(DEBUG_INFO, "Got connection to " + str(self.switch_addr))
+ self.dbg(DEBUG_INFO, "Waiting for switch connection")
+ self.socs = [self.listen_socket]
+ self.dbg_state = "running"
+ while self.active:
+ reset_switch_cxn = False
+ try:
+ sel_in, sel_out, sel_err = \
+ select.select(self.socs, [], self.socs, 1)
+ except:
+ print sys.exc_info()
+ self.dbg(DEBUG_ERROR, "Select error, exiting")
+ sys.exit(1)
- if send_hello:
- h = hello()
- self.message_send(h.pack())
+ if not self.active:
+ break
- # Start the background thread
- self.start()
+ for s in sel_in:
+ reset_switch_cxn = self._socket_ready_handle(s)
- return True
+ for s in sel_err:
+ self.dbg(DEBUG_ERROR, "Got socket error on: " + str(s))
+ if s == self.switch_socket:
+ reset_switch_cxn = True
+ else:
+ self.dbg(DEBUG_ERROR, "Socket error; exiting")
+ self.active = False
+ break
+
+ if self.active and reset_switch_cxn:
+ self.dbg(DEBUG_INFO, "Closing switch cxn")
+ try:
+ self.switch_socket.close()
+ except:
+ pass
+ self.switch_socket = None
+ self.socs = self.socs[0:1]
+
+ # End of main loop
+ self.dbg_state = "closing"
+ self.dbg(DEBUG_INFO, "Exiting controller thread")
+ self.shutdown()
+
+ def connect(self, timeout=None):
+ """
+ Connect to the switch
+
+ @param timeout If None, block until connected. If 0, return
+ immedidately. Otherwise, block for up to timeout seconds
+ @return Boolean, True if connected
+ """
+
+ if timeout == 0:
+ return self.switch_socket is not None
+ if self.switch_socket is not None:
+ return True
+ self.connect_cv.acquire()
+ self.connect_cv.wait(timeout)
+ self.connect_cv.release()
+
+ return self.switch_socket is not None
+
+ def kill(self):
+ """
+ Force the controller thread to quit
+
+ Just sets the active state variable to false and expects
+ the select timeout to kick in
+ """
+ self.active = False
def shutdown(self):
"""
@@ -219,22 +274,20 @@
@todo Might want to synchronize shutdown with self.sync...
"""
- self.connected = False
- self.dbg_state = "closed"
- if self.switch_socket:
- try:
- self.switch_socket.shutdown(socket.SHUT_RDWR)
- except:
- self.dbg(DEBUG_INFO, "Ignoring switch soc shutdown error")
- self.switch_socket = None
+ self.active = False
+ try:
+ self.switch_socket.shutdown(socket.SHUT_RDWR)
+ except:
+ self.dbg(DEBUG_INFO, "Ignoring switch soc shutdown error")
+ self.switch_socket = None
- if self.listen_socket:
- try:
- self.listen_socket.shutdown(socket.SHUT_RDWR)
- except:
- self.dbg(DEBUG_INFO, "Ignoring listen soc shutdown error")
- self.listen_socket = None
-
+ try:
+ self.listen_socket.shutdown(socket.SHUT_RDWR)
+ except:
+ self.dbg(DEBUG_INFO, "Ignoring listen soc shutdown error")
+ self.listen_socket = None
+ self.dbg_state = "down"
+
def _pkt_handler_check(self, pkt):
"""
Check for packet handling before being enqueued
@@ -279,7 +332,7 @@
rep = echo_reply()
rep.header.xid = hdr.xid
# Ignoring additional data
- self.message_send(rep.pack())
+ self.message_send(rep.pack(), zero_xid=True)
return (True, None)
# Now check for message handlers; preference is given to
@@ -370,16 +423,22 @@
self.xid_cv.wait(timeout)
msg = self.xid_response
self.xid_response = None
+ self.xid = None
self.xid_cv.release()
return msg
- def message_send(self, msg):
+ def message_send(self, msg, zero_xid=False):
"""
Send the message to the switch
- @param msg An OpenFlow message object to be forwarded to the switch.
+ @param msg A string or OpenFlow message object to be forwarded to
+ the switch.
+ @param zero_xid If msg is an OpenFlow object (not a string) and if
+ the XID in the header is 0, then an XID will be generated
+ for the message. Set xero_xid to override this behavior (and keep an
+ existing 0 xid)
- @return None on success
+ @return -1 if error, 0 on success
"""
@@ -387,18 +446,28 @@
# Sending a string indicates the message is ready to go
self.dbg(DEBUG_INFO, "message_send: no socket")
return -1
-
+ #@todo If not string, try to pack
if type(msg) != type(""):
- # Sending a string indicates the message is ready to go
- self.dbg(DEBUG_INFO, "message_send requires packet as string")
- return -1
+ try:
+ if msg.header.xid == 0 and not zero_xid:
+ msg.header.xid = gen_xid()
+ outpkt = msg.pack()
+ except:
+ self.dbg(DEBUG_INFO,
+ "message_send: not an OF message or string?")
+ return -1
+ else:
+ outpkt = msg
- self.dbg(DEBUG_INFO, "Sending pkt of len " + str(len(msg)))
- return self.switch_socket.sendall(msg)
+ self.dbg(DEBUG_INFO, "Sending pkt of len " + str(len(outpkt)))
+ if self.switch_socket.sendall(outpkt) is None:
+ return 0
+
+ self.dbg(DEBUG_ERROR, "Unknown error on sendall")
+ return -1
def __str__(self):
string = "Controller:\n"
- string += " connected " + str(self.connected) + "\n"
string += " state " + self.dbg_state + "\n"
string += " switch_addr " + str(self.switch_addr) + "\n"
string += " pending pkts " + str(len(self.packets)) + "\n"