Lots of reorg and wrestling with sockets
diff --git a/src/python/oftest/controller.py b/src/python/oftest/controller.py
index 4262ca9..779e42c 100644
--- a/src/python/oftest/controller.py
+++ b/src/python/oftest/controller.py
@@ -17,6 +17,14 @@
@todo Support transaction semantics via xid
@todo Set up reasonable logging facility
+@todo Support select and listen on an administrative socket (or
+use a timeout to support clean shutdown).
+
+Currently only one connection is accepted during the life of
+the controller. There seems
+to be no clean way to interrupt an accept call. Using select that also listens
+on an administrative socket and can shut down the socket might work.
+
"""
from oft_config import *
@@ -28,7 +36,6 @@
from threading import Condition
from message import *
from parse import *
-from netutils import *
from ofutils import *
class Controller(Thread):
@@ -61,27 +68,35 @@
def __init__(self, max_pkts=1024):
Thread.__init__(self)
+ # Socket related
self.rcv_size = RCV_SIZE_DEFAULT
+ self.listen_socket = None
+ self.switch_socket = None
+ self.switch_addr = None
+
+ # Counters
self.socket_errors = 0
self.parse_errors = 0
- self.connected = False
- self.running = False
- self.max_pkts = max_pkts
self.packets_total = 0
+ self.packets_expired = 0
+ self.packets_handled = 0
+
+ # State
+ self.connected = False
self.packets = []
self.sync = Lock()
self.handlers = {}
self.keep_alive = False
+
+ # Settings
+ self.max_pkts = max_pkts
+ self.passive = True
self.host = controller_host
self.port = controller_port
- self.passive = True
- self.packets_expired = 0
- self.packets_handled = 0
self.dbg_state = "init"
- self.listen_socket = None
- self.switch_socket = None
- self.switch_addr = None
- self.debug_level = DEBUG_VERBOSE
+ # self.debug_level = DEBUG_VERBOSE
+ self.debug_level = debug_level_default
+
# Transaction variables
# xid_cv: Condition variable (semaphore) for transaction
# xid: Transaction ID being waited on
@@ -90,39 +105,84 @@
self.xid = None
self.xid_response = None
- # Connection condition variable to notify anyone waiting for a cxn
- self.connect_cv = Condition()
-
def dbg(self, level, string):
debug_log("CTRL", self.debug_level, level, string)
- def connect(self):
+ def run(self):
"""
- Open the socket connection
+ Activity function for class
- @param host The host address to use for the socket
- @param port The port number to use for the socket
- @param passive If True, use passive cxn: Not yet supported.
+ Assumes connection to switch already exists. Listens on
+ switch_socket for messages until an error (or zero len pkt)
+ occurs.
+ When there is a message on the socket, check for handlers; queue the
+ packet if no one handles the packet.
+
+ See note for controller describing the limitation of a single
+ connection for now.
+ """
+
+ if not self.switch_socket:
+ self.dbg(DEBUG_ERROR,
+ "Error in controller thread, no switch socket")
+ self.shutdown()
+ return
+
+ # Read and process packets from socket connected to switch
+ while 1:
+ try:
+ pkt = self.switch_socket.recv(self.rcv_size)
+ except socket.error:
+ self.dbg(DEBUG_ERROR, "Controller socket read error")
+ self.socket_errors += 1
+ break
+
+ if len(pkt) == 0:
+ # Considered an error; usually means switch has disconnected
+ self.dbg(DEBUG_INFO, "length 0 pkt in")
+ self.socket_errors += 1
+ break
+
+ if not self.connected:
+ break
+
+ (handled, msg) = self._pkt_handler_check(pkt)
+ if handled:
+ self.packets_handled += 1
+ continue
+
+ # Not handled, enqueue
+ self.sync.acquire()
+ if len(self.packets) >= self.max_pkts:
+ self.packets.pop(0)
+ self.packets_expired += 1
+ self.packets.append((msg, pkt))
+ self.packets_total += 1
+ self.sync.release()
+
+ self.shutdown()
+
+ def connect(self, send_hello=True):
+ """
+ Connect to a switch and start this thread
+
+ Create the listening socket, accept and block until a
+ connection is made to the switch. Then start the local
+ thread and return. Parameters to the call are take from
+ member variables.
+
+ @param send_hello If True, send the initial hello packet on connection
@return Boolean where True indicates success
- If already connected, will close the current connection
+ If already connected, returns True
+ If the listen socket does not exist, will create it
"""
- oldstate = self.dbg_state
+
self.dbg_state = "connecting"
- if self.connected:
- self.dbg(DEBUG_WARN, "Disconnect when already connected")
- self.disconnect()
-
- if not self.passive:
- print "Error in controller init: Active cxn not supported"
- # raise unsupported
- self.dbg_state = oldstate
- return False
-
- # FIXME: add error handling; try SocketServer?
self.dbg(DEBUG_INFO, "open ctl host: >" + str(self.host) + "< port " +
str(self.port))
+ # Create the listening socket
if not self.listen_socket:
self.listen_socket = socket.socket(socket.AF_INET,
socket.SOCK_STREAM)
@@ -131,31 +191,50 @@
self.listen_socket.bind((self.host, self.port))
self.dbg_state = "listening"
self.listen_socket.listen(LISTEN_QUEUE_SIZE)
+ self.dbg_state = "accepting"
(self.switch_socket, self.switch_addr) = self.listen_socket.accept()
if not self.switch_socket:
self.socket_errors += 1
self.dbg(DEBUG_WARN, "Failed on accept")
self.dbg_state = "error"
+ self.listen_socket.close()
return False
self.connected = True
self.dbg_state = "connected"
self.dbg(DEBUG_INFO, "Got connection to " + str(self.switch_addr))
+
+ if send_hello:
+ h = hello()
+ self.message_send(h.pack())
+
+ # Start the background thread
+ self.start()
+
return True
- def disconnect(self):
+ def shutdown(self):
"""
- Disconnect the switch socket
- """
- self.dbg_state = "disconnecting"
- if not self.connected:
- self.dbg(DEBUG_INFO, "disconnect when not connected")
- self.dbg_state = "disconnected"
- return
- self.switch_socket.close()
- self.connected = False
- self.dbg_state = "disconnected"
+ Shutdown the controller closing all sockets
+ @todo Might want to synchronize shutdown with self.sync...
+ """
+ self.connected = False
+ self.dbg_state = "closed"
+ if self.switch_socket:
+ try:
+ self.switch_socket.shutdown(socket.SHUT_RDWR)
+ except:
+ self.dbg(DEBUG_INFO, "Ignoring switch soc shutdown error")
+ self.switch_socket = None
+
+ if self.listen_socket:
+ try:
+ self.listen_socket.shutdown(socket.SHUT_RDWR)
+ except:
+ self.dbg(DEBUG_INFO, "Ignoring listen soc shutdown error")
+ self.listen_socket = None
+
def _pkt_handler_check(self, pkt):
"""
Check for packet handling before being enqueued
@@ -173,8 +252,8 @@
self.dbg(DEBUG_INFO, "Could not parse header, pkt len", len(pkt))
self.parse_errors += 1
return (True, None)
- if self.debug_level <= DEBUG_VERBOSE:
- hdr.show()
+ # if self.debug_level <= DEBUG_VERBOSE:
+ # hdr.show()
self.dbg(DEBUG_VERBOSE, "message: len %d. type %s. hdr.len %d" %
(len(pkt), ofp_type_map[hdr.type], hdr.length))
@@ -213,61 +292,6 @@
return (handled, msg)
- def run(self):
- """
- Activity function for class
-
- Loops until stop is called (or self.running is set to False).
- If the connection drops, it connects again. It then receives
- a message on the socket and checks for handlers, queuing the
- packet if no one handles the packet.
- """
- self.running = True
- while self.running:
- if not self.connected:
- if not self.connect():
- self.dbg(DEBUG_ERROR,
- "Controller thread error connecting; exit")
- break
- # Notify anyone waiting that a connection is made
- self.connect_cv.acquire()
- self.connect_cv.notify()
- self.connect_cv.release()
- try:
- pkt = self.switch_socket.recv(self.rcv_size)
- if len(pkt) == 0:
- self.dbg(DEBUG_WARN, "length 0 pkt in")
- self.disconnect()
- continue
-
- (handled, msg) = self._pkt_handler_check(pkt)
- if handled:
- self.packets_handled += 1
- continue
-
- # Not handled, enqueue
- self.sync.acquire()
- if len(self.packets) >= self.max_pkts:
- self.packets.pop(0)
- self.packets_expired += 1
- self.packets.append((msg, pkt))
- self.packets_total += 1
- self.sync.release()
-
- except socket.error:
- print "Controller socket read error"
- self.socket_errors += 1
- self.disconnect()
-
- def stop(self):
- """
- Stop the running loop and disconnect the socket
- """
- self.running = False
- # Is there something to do to switch_socket to stop an inprogress
- # connect?
- self.disconnect()
-
def register(self, msg_type, handler):
"""
Register a callback to receive a specific message type.
@@ -291,6 +315,7 @@
@param exp_msg If set, return only when this type of message
is received.
+ @param timeout Not yet supported
@retval A pair (msg, pkt) where msg is a message object and pkt
the string representing the packet as received from the socket.
@@ -302,7 +327,7 @@
# For now do not support time out;
if timeout:
- print "DEBUG WARNING: poll time out not supported"
+ self.dbg(DEBUG_WARN, "Poll time out not supported")
while len(self.packets) > 0:
self.sync.acquire()
@@ -311,6 +336,8 @@
if not exp_msg or (exp_msg and (msg.header.type == exp_msg)):
return msg, pkt
+ return None, None
+
def transact(self, msg, timeout=None, zero_xid=False):
"""
Run a message transaction with the switch
@@ -356,10 +383,15 @@
"""
+ if not self.switch_socket:
+ # Sending a string indicates the message is ready to go
+ self.dbg(DEBUG_INFO, "message_send: no socket")
+ return -1
+
if type(msg) != type(""):
# Sending a string indicates the message is ready to go
self.dbg(DEBUG_INFO, "message_send requires packet as string")
- return 0
+ return -1
self.dbg(DEBUG_INFO, "Sending pkt of len " + str(len(msg)))
return self.switch_socket.sendall(msg)
@@ -368,7 +400,6 @@
string = "Controller:\n"
string += " connected " + str(self.connected) + "\n"
string += " state " + self.dbg_state + "\n"
- string += " running " + str(self.running) + "\n"
string += " switch_addr " + str(self.switch_addr) + "\n"
string += " pending pkts " + str(len(self.packets)) + "\n"
string += " total pkts " + str(self.packets_total) + "\n"
@@ -385,34 +416,6 @@
def show(self):
print str(self)
- def connect_to_switch(self, timeout=None, send_hello=True):
- """
- Connect to switch
-
- Block until a connection to a switch
- is established or a timeout occurs. Sends a hello message
- once established.
- @param timeout Timeout in seconds
- @param send_hello If True, will send hello when connection established
- @return True if successful, False otherwise
- """
- h = hello()
- print "x1"
- self.connect_cv.acquire()
- if self.connected:
- print "x2"
- self.connect_cv.release()
- return True
-
- print "x3"
- self.connect_cv.wait(timeout)
- self.connect_cv.release()
- print "x4"
- if send_hello:
- self.message_send(h.pack())
- print "x5"
- return self.connected
-
def sample_handler(controller, msg, pkt):
"""
Sample message handler
diff --git a/src/python/oftest/dataplane.py b/src/python/oftest/dataplane.py
index 0462f5d..89cf073 100644
--- a/src/python/oftest/dataplane.py
+++ b/src/python/oftest/dataplane.py
@@ -21,6 +21,7 @@
import netutils
from threading import Thread
from threading import Lock
+import oft_config
#@todo Move these identifiers into config
ETH_P_ALL = 0x03
@@ -47,14 +48,20 @@
"""
Thread.__init__(self)
self.interface_name = interface_name
+ self.debug_level = oft_config.debug_level_default
self.max_pkts = max_pkts
self.packets_pending = 0
self.packets_total = 0
self.packets = []
self.packet_times = []
+ self.packets_discarded = 0
self.sync = Lock()
self.socket = self.interface_open(interface_name)
- print "Openned port monitor socket " + interface_name
+ self.dbg(oft_config.DEBUG_INFO,
+ "Openned port monitor socket " + interface_name)
+
+ def dbg(self, level, string):
+ oft_config.debug_log("DPLANE", self.debug_level, level, string)
def interface_open(self, interface_name):
"""
@@ -65,18 +72,10 @@
s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
socket.htons(ETH_P_ALL))
s.bind((interface_name, 0))
- promisc.set_promisc(s, interface_name)
+ netutils.set_promisc(s, interface_name)
s.settimeout(RCV_TIMEOUT)
return s
- def kill(self):
- """
- Terminate the running thread
- """
- self.running = False
- self.socket.close()
- print "Port monitor for " + self.interface_name + " exiting"
-
def run(self):
"""
Activity function for class
@@ -85,23 +84,41 @@
while self.running:
try:
rcvmsg = self.socket.recv(RCV_SIZE)
- rcvtime = time.clock()
-
- self.sync.acquire()
- self.packets.append(rcvmsg)
- self.packet_times.append(rcvtime)
- self.packets_pending += 1
- self.packets_total += 1
- self.sync.release()
-
- except socket.timeout:
- print "Socket timeout for " + self.interface_name
except socket.error:
- print "Socket closed for " + self.interface_name
- if self.running:
- self.kill()
+ self.dbg(DEBUG_INFO, "Socket error for " +
+ self.interface_name)
+ continue
+ if len(rcvmsg) == 0:
+ self.dbg(DEBUG_INFO, "Zero len pkt on " + self.interface_name)
+ self.kill()
break
+ rcvtime = time.clock()
+
+ self.sync.acquire()
+ if len(self.packets) >= self.max_pkts:
+ self.packets.pop(0)
+ self.packets_discarded += 1
+ self.packets.append(rcvmsg)
+ self.packet_times.append(rcvtime)
+ self.packets_pending += 1
+ self.packets_total += 1
+ self.sync.release()
+
+ self.dbg(DEBUG_INFO, "Thread exit for " + self.interface_name)
+
+ def kill(self):
+ """
+ Terminate the running thread
+ """
+ self.running = False
+ try:
+ self.socket.close()
+ except:
+ self.dbg(DEBUG_INFO, "Ignoring dataplane soc shutdown error")
+ self.dbg(oft_config.DEBUG_INFO,
+ "Port monitor for " + self.interface_name + " exiting")
+
def dequeue(self):
"""
Get the oldest packet in the queue
@@ -126,6 +143,7 @@
Clear the packet queue
"""
self.sync.acquire()
+ self.packets_discarded += len(self.packets)
self.packets = []
self.packet_times = []
self.packets_pending = 0
@@ -138,6 +156,8 @@
@param packet The packet data to send to the port
@retval The number of bytes sent
"""
+ self.dbg(oft_config.DEBUG_VERBOSE,
+ "port sending " + str(len(packet)) + " bytes")
return self.socket.send(packet)
@@ -152,6 +172,12 @@
"""
pass
+ def show(self, prefix=''):
+ print prefix + "Name: " + self.interface_name
+ print prefix + "Pkts pending: " + str(self.packets_pending)
+ print prefix + "Pkts total: " + str(self.packets_total)
+ print prefix + "socket: " + str(self.socket)
+
class DataPlane:
"""
@@ -160,6 +186,10 @@
"""
def __init__(self):
self.port_list = {}
+ self.debug_level = oft_config.debug_level_default
+
+ def dbg(self, level, string):
+ oft_config.debug_log("DPORT", self.debug_level, level, string)
def port_add(self, interface_name, port_number):
"""
@@ -178,10 +208,13 @@
@param port_number The port to send the data to
@param packet Raw packet data to send to port
"""
+ self.dbg(oft_config.DEBUG_VERBOSE,
+ "Sending %d bytes to port %d" % (len(packet), port_number))
bytes = self.port_list[port_number].send(packet)
if bytes != len(packet):
- print "Unhandled send error, length mismatch %d != %d" % \
- (bytes, len(packet))
+ self.dbg(DEBUG_ERROR,"Unhandled send error, " +
+ "length mismatch %d != %d" %
+ (bytes, len(packet)))
return bytes
def flood(self, packet):
@@ -192,9 +225,9 @@
for port_number in self.port_list.keys():
bytes = self.port_list[port_number].send(packet)
if bytes != len(packet):
- print "Unhandled send error" + \
- ", port %d, length mismatch %d != %d" % \
- (port_number, bytes, len(packet))
+ self.dbg(DEBUG_ERROR, "Unhandled send error" +
+ ", port %d, length mismatch %d != %d" %
+ (port_number, bytes, len(packet)))
def packet_get(self, port_number=None):
"""
@@ -230,8 +263,22 @@
pkt, time = self.port_list[min_port].dequeue()
return min_port, pkt, time
- def kill(self):
+ def kill(self, join_threads=True):
+ """
+ Close all sockets for dataplane
+ @param join_threads If True (default) call join on each thread
+ """
for port_number in self.port_list.keys():
self.port_list[port_number].kill()
+ if join_threads:
+ self.dbg(oft_config.DEBUG_INFO, "Joining ", port_number)
+ self.port_list[port_number].join()
- print "DataPlane shutdown"
+ self.dbg(oft_config.DEBUG_INFO, "DataPlane shutdown")
+
+ def show(self, prefix=''):
+ print prefix + "Dataplane Controller"
+ for pnum, port in self.port_list.items():
+ print prefix + "OpenFlow Port Number " + str(pnum)
+ port.show(prefix + ' ')
+