Lots of reorg and wrestling with sockets
diff --git a/src/python/oftest/controller.py b/src/python/oftest/controller.py
index 4262ca9..779e42c 100644
--- a/src/python/oftest/controller.py
+++ b/src/python/oftest/controller.py
@@ -17,6 +17,14 @@
@todo Support transaction semantics via xid
@todo Set up reasonable logging facility
+@todo Support select and listen on an administrative socket (or
+use a timeout to support clean shutdown).
+
+Currently only one connection is accepted during the life of
+the controller. There seems
+to be no clean way to interrupt an accept call. Using select that also listens
+on an administrative socket and can shut down the socket might work.
+
"""
from oft_config import *
@@ -28,7 +36,6 @@
from threading import Condition
from message import *
from parse import *
-from netutils import *
from ofutils import *
class Controller(Thread):
@@ -61,27 +68,35 @@
def __init__(self, max_pkts=1024):
Thread.__init__(self)
+ # Socket related
self.rcv_size = RCV_SIZE_DEFAULT
+ self.listen_socket = None
+ self.switch_socket = None
+ self.switch_addr = None
+
+ # Counters
self.socket_errors = 0
self.parse_errors = 0
- self.connected = False
- self.running = False
- self.max_pkts = max_pkts
self.packets_total = 0
+ self.packets_expired = 0
+ self.packets_handled = 0
+
+ # State
+ self.connected = False
self.packets = []
self.sync = Lock()
self.handlers = {}
self.keep_alive = False
+
+ # Settings
+ self.max_pkts = max_pkts
+ self.passive = True
self.host = controller_host
self.port = controller_port
- self.passive = True
- self.packets_expired = 0
- self.packets_handled = 0
self.dbg_state = "init"
- self.listen_socket = None
- self.switch_socket = None
- self.switch_addr = None
- self.debug_level = DEBUG_VERBOSE
+ # self.debug_level = DEBUG_VERBOSE
+ self.debug_level = debug_level_default
+
# Transaction variables
# xid_cv: Condition variable (semaphore) for transaction
# xid: Transaction ID being waited on
@@ -90,39 +105,84 @@
self.xid = None
self.xid_response = None
- # Connection condition variable to notify anyone waiting for a cxn
- self.connect_cv = Condition()
-
def dbg(self, level, string):
debug_log("CTRL", self.debug_level, level, string)
- def connect(self):
+ def run(self):
"""
- Open the socket connection
+ Activity function for class
- @param host The host address to use for the socket
- @param port The port number to use for the socket
- @param passive If True, use passive cxn: Not yet supported.
+ Assumes connection to switch already exists. Listens on
+ switch_socket for messages until an error (or zero len pkt)
+ occurs.
+ When there is a message on the socket, check for handlers; queue the
+ packet if no one handles the packet.
+
+ See note for controller describing the limitation of a single
+ connection for now.
+ """
+
+ if not self.switch_socket:
+ self.dbg(DEBUG_ERROR,
+ "Error in controller thread, no switch socket")
+ self.shutdown()
+ return
+
+ # Read and process packets from socket connected to switch
+ while 1:
+ try:
+ pkt = self.switch_socket.recv(self.rcv_size)
+ except socket.error:
+ self.dbg(DEBUG_ERROR, "Controller socket read error")
+ self.socket_errors += 1
+ break
+
+ if len(pkt) == 0:
+ # Considered an error; usually means switch has disconnected
+ self.dbg(DEBUG_INFO, "length 0 pkt in")
+ self.socket_errors += 1
+ break
+
+ if not self.connected:
+ break
+
+ (handled, msg) = self._pkt_handler_check(pkt)
+ if handled:
+ self.packets_handled += 1
+ continue
+
+ # Not handled, enqueue
+ self.sync.acquire()
+ if len(self.packets) >= self.max_pkts:
+ self.packets.pop(0)
+ self.packets_expired += 1
+ self.packets.append((msg, pkt))
+ self.packets_total += 1
+ self.sync.release()
+
+ self.shutdown()
+
+ def connect(self, send_hello=True):
+ """
+ Connect to a switch and start this thread
+
+ Create the listening socket, accept and block until a
+ connection is made to the switch. Then start the local
+ thread and return. Parameters to the call are take from
+ member variables.
+
+ @param send_hello If True, send the initial hello packet on connection
@return Boolean where True indicates success
- If already connected, will close the current connection
+ If already connected, returns True
+ If the listen socket does not exist, will create it
"""
- oldstate = self.dbg_state
+
self.dbg_state = "connecting"
- if self.connected:
- self.dbg(DEBUG_WARN, "Disconnect when already connected")
- self.disconnect()
-
- if not self.passive:
- print "Error in controller init: Active cxn not supported"
- # raise unsupported
- self.dbg_state = oldstate
- return False
-
- # FIXME: add error handling; try SocketServer?
self.dbg(DEBUG_INFO, "open ctl host: >" + str(self.host) + "< port " +
str(self.port))
+ # Create the listening socket
if not self.listen_socket:
self.listen_socket = socket.socket(socket.AF_INET,
socket.SOCK_STREAM)
@@ -131,31 +191,50 @@
self.listen_socket.bind((self.host, self.port))
self.dbg_state = "listening"
self.listen_socket.listen(LISTEN_QUEUE_SIZE)
+ self.dbg_state = "accepting"
(self.switch_socket, self.switch_addr) = self.listen_socket.accept()
if not self.switch_socket:
self.socket_errors += 1
self.dbg(DEBUG_WARN, "Failed on accept")
self.dbg_state = "error"
+ self.listen_socket.close()
return False
self.connected = True
self.dbg_state = "connected"
self.dbg(DEBUG_INFO, "Got connection to " + str(self.switch_addr))
+
+ if send_hello:
+ h = hello()
+ self.message_send(h.pack())
+
+ # Start the background thread
+ self.start()
+
return True
- def disconnect(self):
+ def shutdown(self):
"""
- Disconnect the switch socket
- """
- self.dbg_state = "disconnecting"
- if not self.connected:
- self.dbg(DEBUG_INFO, "disconnect when not connected")
- self.dbg_state = "disconnected"
- return
- self.switch_socket.close()
- self.connected = False
- self.dbg_state = "disconnected"
+ Shutdown the controller closing all sockets
+ @todo Might want to synchronize shutdown with self.sync...
+ """
+ self.connected = False
+ self.dbg_state = "closed"
+ if self.switch_socket:
+ try:
+ self.switch_socket.shutdown(socket.SHUT_RDWR)
+ except:
+ self.dbg(DEBUG_INFO, "Ignoring switch soc shutdown error")
+ self.switch_socket = None
+
+ if self.listen_socket:
+ try:
+ self.listen_socket.shutdown(socket.SHUT_RDWR)
+ except:
+ self.dbg(DEBUG_INFO, "Ignoring listen soc shutdown error")
+ self.listen_socket = None
+
def _pkt_handler_check(self, pkt):
"""
Check for packet handling before being enqueued
@@ -173,8 +252,8 @@
self.dbg(DEBUG_INFO, "Could not parse header, pkt len", len(pkt))
self.parse_errors += 1
return (True, None)
- if self.debug_level <= DEBUG_VERBOSE:
- hdr.show()
+ # if self.debug_level <= DEBUG_VERBOSE:
+ # hdr.show()
self.dbg(DEBUG_VERBOSE, "message: len %d. type %s. hdr.len %d" %
(len(pkt), ofp_type_map[hdr.type], hdr.length))
@@ -213,61 +292,6 @@
return (handled, msg)
- def run(self):
- """
- Activity function for class
-
- Loops until stop is called (or self.running is set to False).
- If the connection drops, it connects again. It then receives
- a message on the socket and checks for handlers, queuing the
- packet if no one handles the packet.
- """
- self.running = True
- while self.running:
- if not self.connected:
- if not self.connect():
- self.dbg(DEBUG_ERROR,
- "Controller thread error connecting; exit")
- break
- # Notify anyone waiting that a connection is made
- self.connect_cv.acquire()
- self.connect_cv.notify()
- self.connect_cv.release()
- try:
- pkt = self.switch_socket.recv(self.rcv_size)
- if len(pkt) == 0:
- self.dbg(DEBUG_WARN, "length 0 pkt in")
- self.disconnect()
- continue
-
- (handled, msg) = self._pkt_handler_check(pkt)
- if handled:
- self.packets_handled += 1
- continue
-
- # Not handled, enqueue
- self.sync.acquire()
- if len(self.packets) >= self.max_pkts:
- self.packets.pop(0)
- self.packets_expired += 1
- self.packets.append((msg, pkt))
- self.packets_total += 1
- self.sync.release()
-
- except socket.error:
- print "Controller socket read error"
- self.socket_errors += 1
- self.disconnect()
-
- def stop(self):
- """
- Stop the running loop and disconnect the socket
- """
- self.running = False
- # Is there something to do to switch_socket to stop an inprogress
- # connect?
- self.disconnect()
-
def register(self, msg_type, handler):
"""
Register a callback to receive a specific message type.
@@ -291,6 +315,7 @@
@param exp_msg If set, return only when this type of message
is received.
+ @param timeout Not yet supported
@retval A pair (msg, pkt) where msg is a message object and pkt
the string representing the packet as received from the socket.
@@ -302,7 +327,7 @@
# For now do not support time out;
if timeout:
- print "DEBUG WARNING: poll time out not supported"
+ self.dbg(DEBUG_WARN, "Poll time out not supported")
while len(self.packets) > 0:
self.sync.acquire()
@@ -311,6 +336,8 @@
if not exp_msg or (exp_msg and (msg.header.type == exp_msg)):
return msg, pkt
+ return None, None
+
def transact(self, msg, timeout=None, zero_xid=False):
"""
Run a message transaction with the switch
@@ -356,10 +383,15 @@
"""
+ if not self.switch_socket:
+ # Sending a string indicates the message is ready to go
+ self.dbg(DEBUG_INFO, "message_send: no socket")
+ return -1
+
if type(msg) != type(""):
# Sending a string indicates the message is ready to go
self.dbg(DEBUG_INFO, "message_send requires packet as string")
- return 0
+ return -1
self.dbg(DEBUG_INFO, "Sending pkt of len " + str(len(msg)))
return self.switch_socket.sendall(msg)
@@ -368,7 +400,6 @@
string = "Controller:\n"
string += " connected " + str(self.connected) + "\n"
string += " state " + self.dbg_state + "\n"
- string += " running " + str(self.running) + "\n"
string += " switch_addr " + str(self.switch_addr) + "\n"
string += " pending pkts " + str(len(self.packets)) + "\n"
string += " total pkts " + str(self.packets_total) + "\n"
@@ -385,34 +416,6 @@
def show(self):
print str(self)
- def connect_to_switch(self, timeout=None, send_hello=True):
- """
- Connect to switch
-
- Block until a connection to a switch
- is established or a timeout occurs. Sends a hello message
- once established.
- @param timeout Timeout in seconds
- @param send_hello If True, will send hello when connection established
- @return True if successful, False otherwise
- """
- h = hello()
- print "x1"
- self.connect_cv.acquire()
- if self.connected:
- print "x2"
- self.connect_cv.release()
- return True
-
- print "x3"
- self.connect_cv.wait(timeout)
- self.connect_cv.release()
- print "x4"
- if send_hello:
- self.message_send(h.pack())
- print "x5"
- return self.connected
-
def sample_handler(controller, msg, pkt):
"""
Sample message handler