Mostly changes to socket deployment
Use select for handling sockets; hopefully better cleanup approach
Added connection semaphore for controller
Support message objects as arguments to controller.message_send
Support initial hello from controller when connected to switch
diff --git a/src/python/oftest/controller.py b/src/python/oftest/controller.py
index 779e42c..0b89ee6 100644
--- a/src/python/oftest/controller.py
+++ b/src/python/oftest/controller.py
@@ -37,6 +37,9 @@
from message import *
from parse import *
from ofutils import *
+# For some reason, it seems select to be last (or later).
+# Otherwise get an attribute error when calling select.select
+import select
class Controller(Thread):
"""
@@ -58,6 +61,8 @@
@var max_pkts The max size of the receive queue
@var keep_alive If true, listen for echo requests and respond w/
echo replies
+ @var initial_hello If true, will send a hello message immediately
+ upon connecting to the switch
@var host The host to use for connect
@var port The port to connect on
@var packets_total Total number of packets received
@@ -73,6 +78,8 @@
self.listen_socket = None
self.switch_socket = None
self.switch_addr = None
+ self.socs = []
+ self.connect_cv = Condition()
# Counters
self.socket_errors = 0
@@ -82,11 +89,12 @@
self.packets_handled = 0
# State
- self.connected = False
self.packets = []
self.sync = Lock()
self.handlers = {}
self.keep_alive = False
+ self.active = True
+ self.initial_hello = True
# Settings
self.max_pkts = max_pkts
@@ -94,8 +102,8 @@
self.host = controller_host
self.port = controller_port
self.dbg_state = "init"
- # self.debug_level = DEBUG_VERBOSE
self.debug_level = debug_level_default
+ # self.debug_level = DEBUG_VERBOSE
# Transaction variables
# xid_cv: Condition variable (semaphore) for transaction
@@ -108,6 +116,61 @@
def dbg(self, level, string):
debug_log("CTRL", self.debug_level, level, string)
+ def _socket_ready_handle(self, s):
+ """
+ Handle an input-ready socket
+ @param s The socket object that is ready
+ @retval True, reset the switch connection
+ """
+
+ if s == self.listen_socket:
+ if self.switch_socket:
+ self.dbg(DEBUG_ERROR, "Multiple switch cxns not supported")
+ sys.exit(1)
+
+ (self.switch_socket, self.switch_addr) = \
+ self.listen_socket.accept()
+ self.dbg(DEBUG_INFO, "Got cxn to " + str(self.switch_addr))
+ # Notify anyone waiting
+ self.connect_cv.acquire()
+ self.connect_cv.notify()
+ self.connect_cv.release()
+ self.socs.append(self.switch_socket)
+ if self.initial_hello:
+ self.message_send(hello())
+ elif s == self.switch_socket:
+ try:
+ pkt = self.switch_socket.recv(self.rcv_size)
+ except:
+ self.dbg(DEBUG_INFO, "error on switch read")
+ return True
+
+ if not self.active:
+ return False
+
+ if len(pkt) == 0:
+ self.dbg(DEBUG_INFO, "zero-len pkt in")
+ return True
+
+ (handled, msg) = self._pkt_handler_check(pkt)
+ if handled:
+ self.packets_handled += 1
+ return False
+
+ # Not handled, enqueue
+ self.sync.acquire()
+ if len(self.packets) >= self.max_pkts:
+ self.packets.pop(0)
+ self.packets_expired += 1
+ self.packets.append((msg, pkt))
+ self.packets_total += 1
+ self.sync.release()
+ else:
+ self.dbg(DEBUG_ERROR, "Unknown socket ready: " + str(s))
+ return True
+
+ return False
+
def run(self):
"""
Activity function for class
@@ -123,95 +186,87 @@
connection for now.
"""
- if not self.switch_socket:
- self.dbg(DEBUG_ERROR,
- "Error in controller thread, no switch socket")
- self.shutdown()
- return
+ self.dbg_state = "starting"
- # Read and process packets from socket connected to switch
- while 1:
- try:
- pkt = self.switch_socket.recv(self.rcv_size)
- except socket.error:
- self.dbg(DEBUG_ERROR, "Controller socket read error")
- self.socket_errors += 1
- break
-
- if len(pkt) == 0:
- # Considered an error; usually means switch has disconnected
- self.dbg(DEBUG_INFO, "length 0 pkt in")
- self.socket_errors += 1
- break
-
- if not self.connected:
- break
-
- (handled, msg) = self._pkt_handler_check(pkt)
- if handled:
- self.packets_handled += 1
- continue
-
- # Not handled, enqueue
- self.sync.acquire()
- if len(self.packets) >= self.max_pkts:
- self.packets.pop(0)
- self.packets_expired += 1
- self.packets.append((msg, pkt))
- self.packets_total += 1
- self.sync.release()
-
- self.shutdown()
-
- def connect(self, send_hello=True):
- """
- Connect to a switch and start this thread
-
- Create the listening socket, accept and block until a
- connection is made to the switch. Then start the local
- thread and return. Parameters to the call are take from
- member variables.
-
- @param send_hello If True, send the initial hello packet on connection
- @return Boolean where True indicates success
-
- If already connected, returns True
- If the listen socket does not exist, will create it
- """
-
- self.dbg_state = "connecting"
- self.dbg(DEBUG_INFO, "open ctl host: >" + str(self.host) + "< port " +
- str(self.port))
- # Create the listening socket
- if not self.listen_socket:
- self.listen_socket = socket.socket(socket.AF_INET,
- socket.SOCK_STREAM)
- self.listen_socket.setsockopt(socket.SOL_SOCKET,
- socket.SO_REUSEADDR, 1)
- self.listen_socket.bind((self.host, self.port))
+ # Create listen socket
+ self.dbg(DEBUG_INFO, "Create/listen at " + self.host + ":" +
+ str(self.port))
+ self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.listen_socket.setsockopt(socket.SOL_SOCKET,
+ socket.SO_REUSEADDR, 1)
+ self.listen_socket.bind((self.host, self.port))
self.dbg_state = "listening"
self.listen_socket.listen(LISTEN_QUEUE_SIZE)
- self.dbg_state = "accepting"
- (self.switch_socket, self.switch_addr) = self.listen_socket.accept()
- if not self.switch_socket:
- self.socket_errors += 1
- self.dbg(DEBUG_WARN, "Failed on accept")
- self.dbg_state = "error"
- self.listen_socket.close()
- return False
- self.connected = True
- self.dbg_state = "connected"
- self.dbg(DEBUG_INFO, "Got connection to " + str(self.switch_addr))
+ self.dbg(DEBUG_INFO, "Waiting for switch connection")
+ self.socs = [self.listen_socket]
+ self.dbg_state = "running"
+ while self.active:
+ reset_switch_cxn = False
+ try:
+ sel_in, sel_out, sel_err = \
+ select.select(self.socs, [], self.socs, 1)
+ except:
+ print sys.exc_info()
+ self.dbg(DEBUG_ERROR, "Select error, exiting")
+ sys.exit(1)
- if send_hello:
- h = hello()
- self.message_send(h.pack())
+ if not self.active:
+ break
- # Start the background thread
- self.start()
+ for s in sel_in:
+ reset_switch_cxn = self._socket_ready_handle(s)
- return True
+ for s in sel_err:
+ self.dbg(DEBUG_ERROR, "Got socket error on: " + str(s))
+ if s == self.switch_socket:
+ reset_switch_cxn = True
+ else:
+ self.dbg(DEBUG_ERROR, "Socket error; exiting")
+ self.active = False
+ break
+
+ if self.active and reset_switch_cxn:
+ self.dbg(DEBUG_INFO, "Closing switch cxn")
+ try:
+ self.switch_socket.close()
+ except:
+ pass
+ self.switch_socket = None
+ self.socs = self.socs[0:1]
+
+ # End of main loop
+ self.dbg_state = "closing"
+ self.dbg(DEBUG_INFO, "Exiting controller thread")
+ self.shutdown()
+
+ def connect(self, timeout=None):
+ """
+ Connect to the switch
+
+ @param timeout If None, block until connected. If 0, return
+ immedidately. Otherwise, block for up to timeout seconds
+ @return Boolean, True if connected
+ """
+
+ if timeout == 0:
+ return self.switch_socket is not None
+ if self.switch_socket is not None:
+ return True
+ self.connect_cv.acquire()
+ self.connect_cv.wait(timeout)
+ self.connect_cv.release()
+
+ return self.switch_socket is not None
+
+ def kill(self):
+ """
+ Force the controller thread to quit
+
+ Just sets the active state variable to false and expects
+ the select timeout to kick in
+ """
+ self.active = False
def shutdown(self):
"""
@@ -219,22 +274,20 @@
@todo Might want to synchronize shutdown with self.sync...
"""
- self.connected = False
- self.dbg_state = "closed"
- if self.switch_socket:
- try:
- self.switch_socket.shutdown(socket.SHUT_RDWR)
- except:
- self.dbg(DEBUG_INFO, "Ignoring switch soc shutdown error")
- self.switch_socket = None
+ self.active = False
+ try:
+ self.switch_socket.shutdown(socket.SHUT_RDWR)
+ except:
+ self.dbg(DEBUG_INFO, "Ignoring switch soc shutdown error")
+ self.switch_socket = None
- if self.listen_socket:
- try:
- self.listen_socket.shutdown(socket.SHUT_RDWR)
- except:
- self.dbg(DEBUG_INFO, "Ignoring listen soc shutdown error")
- self.listen_socket = None
-
+ try:
+ self.listen_socket.shutdown(socket.SHUT_RDWR)
+ except:
+ self.dbg(DEBUG_INFO, "Ignoring listen soc shutdown error")
+ self.listen_socket = None
+ self.dbg_state = "down"
+
def _pkt_handler_check(self, pkt):
"""
Check for packet handling before being enqueued
@@ -279,7 +332,7 @@
rep = echo_reply()
rep.header.xid = hdr.xid
# Ignoring additional data
- self.message_send(rep.pack())
+ self.message_send(rep.pack(), zero_xid=True)
return (True, None)
# Now check for message handlers; preference is given to
@@ -370,16 +423,22 @@
self.xid_cv.wait(timeout)
msg = self.xid_response
self.xid_response = None
+ self.xid = None
self.xid_cv.release()
return msg
- def message_send(self, msg):
+ def message_send(self, msg, zero_xid=False):
"""
Send the message to the switch
- @param msg An OpenFlow message object to be forwarded to the switch.
+ @param msg A string or OpenFlow message object to be forwarded to
+ the switch.
+ @param zero_xid If msg is an OpenFlow object (not a string) and if
+ the XID in the header is 0, then an XID will be generated
+ for the message. Set xero_xid to override this behavior (and keep an
+ existing 0 xid)
- @return None on success
+ @return -1 if error, 0 on success
"""
@@ -387,18 +446,28 @@
# Sending a string indicates the message is ready to go
self.dbg(DEBUG_INFO, "message_send: no socket")
return -1
-
+ #@todo If not string, try to pack
if type(msg) != type(""):
- # Sending a string indicates the message is ready to go
- self.dbg(DEBUG_INFO, "message_send requires packet as string")
- return -1
+ try:
+ if msg.header.xid == 0 and not zero_xid:
+ msg.header.xid = gen_xid()
+ outpkt = msg.pack()
+ except:
+ self.dbg(DEBUG_INFO,
+ "message_send: not an OF message or string?")
+ return -1
+ else:
+ outpkt = msg
- self.dbg(DEBUG_INFO, "Sending pkt of len " + str(len(msg)))
- return self.switch_socket.sendall(msg)
+ self.dbg(DEBUG_INFO, "Sending pkt of len " + str(len(outpkt)))
+ if self.switch_socket.sendall(outpkt) is None:
+ return 0
+
+ self.dbg(DEBUG_ERROR, "Unknown error on sendall")
+ return -1
def __str__(self):
string = "Controller:\n"
- string += " connected " + str(self.connected) + "\n"
string += " state " + self.dbg_state + "\n"
string += " switch_addr " + str(self.switch_addr) + "\n"
string += " pending pkts " + str(len(self.packets)) + "\n"
diff --git a/src/python/oftest/dataplane.py b/src/python/oftest/dataplane.py
index 89cf073..44ec997 100644
--- a/src/python/oftest/dataplane.py
+++ b/src/python/oftest/dataplane.py
@@ -21,13 +21,29 @@
import netutils
from threading import Thread
from threading import Lock
-import oft_config
+from oft_config import *
+import select
#@todo Move these identifiers into config
ETH_P_ALL = 0x03
RCV_TIMEOUT = 10000
RCV_SIZE = 4096
+# class packet_queue:
+# """
+# Class defining a packet queue across multiple ports
+
+# Items in the queue are stored as a triple (port number, pkt, pkt in time)
+# """
+
+# def __init__(self, max_pkts=1024):
+# self.sync = Lock()
+# self.debug_level = debug_level_default
+# self.packets = []
+# self.max_pkts = max_pkts
+# self.packets_total = 0
+# self.packets_discarded = 0
+
class DataPlanePort(Thread):
"""
Class defining a port monitoring object.
@@ -48,20 +64,18 @@
"""
Thread.__init__(self)
self.interface_name = interface_name
- self.debug_level = oft_config.debug_level_default
+ self.debug_level = debug_level_default
self.max_pkts = max_pkts
- self.packets_pending = 0
self.packets_total = 0
self.packets = []
- self.packet_times = []
self.packets_discarded = 0
self.sync = Lock()
self.socket = self.interface_open(interface_name)
- self.dbg(oft_config.DEBUG_INFO,
- "Openned port monitor socket " + interface_name)
+ self.dbg(DEBUG_INFO, "Openned port monitor socket")
def dbg(self, level, string):
- oft_config.debug_log("DPLANE", self.debug_level, level, string)
+ debug_log("DPLANE", self.debug_level, level,
+ self.interface_name + ": " + string)
def interface_open(self, interface_name):
"""
@@ -81,31 +95,52 @@
Activity function for class
"""
self.running = True
+ self.socs = [self.socket]
+ error_warned = False # Have we warned about error?
while self.running:
try:
+ sel_in, sel_out, sel_err = \
+ select.select(self.socs, [], [], 1)
+ except:
+ print sys.exc_info()
+ self.dbg(DEBUG_ERROR, "Select error, exiting")
+ sys.exit(1)
+
+ #if not sel_err is None:
+ # self.dbg(DEBUG_VERBOSE, "Socket error from select set")
+
+ if not self.running:
+ break
+
+ if sel_in is None:
+ continue
+
+ try:
rcvmsg = self.socket.recv(RCV_SIZE)
except socket.error:
- self.dbg(DEBUG_INFO, "Socket error for " +
- self.interface_name)
+ if not error_warned:
+ self.dbg(DEBUG_INFO, "Socket error on recv")
+ error_warned = True
continue
+
if len(rcvmsg) == 0:
- self.dbg(DEBUG_INFO, "Zero len pkt on " + self.interface_name)
+ self.dbg(DEBUG_INFO, "Zero len pkt rcvd")
self.kill()
break
rcvtime = time.clock()
+ self.dbg(DEBUG_VERBOSE, "Pkt len " + str(len(rcvmsg)) +
+ " in at " + str(rcvtime))
self.sync.acquire()
if len(self.packets) >= self.max_pkts:
self.packets.pop(0)
self.packets_discarded += 1
- self.packets.append(rcvmsg)
- self.packet_times.append(rcvtime)
- self.packets_pending += 1
+ self.packets.append((rcvmsg, rcvtime))
self.packets_total += 1
self.sync.release()
- self.dbg(DEBUG_INFO, "Thread exit for " + self.interface_name)
+ self.dbg(DEBUG_INFO, "Thread exit ")
def kill(self):
"""
@@ -116,17 +151,15 @@
self.socket.close()
except:
self.dbg(DEBUG_INFO, "Ignoring dataplane soc shutdown error")
- self.dbg(oft_config.DEBUG_INFO,
- "Port monitor for " + self.interface_name + " exiting")
+ self.dbg(DEBUG_INFO,
+ "Port monitor exiting")
def dequeue(self):
"""
Get the oldest packet in the queue
"""
self.sync.acquire()
- pkt = self.packets.pop(0)
- pkt_time = self.packet_times.pop(0)
- self.packets_pending -= 1
+ pkt, pkt_time = self.packets.pop(0)
self.sync.release()
return pkt, pkt_time
@@ -134,9 +167,12 @@
"""
Return the timestamp of the head of queue or None if empty
"""
- if self.packets_pending:
- return self.packet_times[0]
- return None
+ rv = None
+ self.sync.acquire()
+ if len(self.packets) > 0:
+ rv = self.packets[0][1]
+ self.sync.release()
+ return rv
def flush(self):
"""
@@ -146,7 +182,6 @@
self.packets_discarded += len(self.packets)
self.packets = []
self.packet_times = []
- self.packets_pending = 0
self.sync.release()
@@ -156,7 +191,7 @@
@param packet The packet data to send to the port
@retval The number of bytes sent
"""
- self.dbg(oft_config.DEBUG_VERBOSE,
+ self.dbg(DEBUG_VERBOSE,
"port sending " + str(len(packet)) + " bytes")
return self.socket.send(packet)
@@ -174,7 +209,7 @@
def show(self, prefix=''):
print prefix + "Name: " + self.interface_name
- print prefix + "Pkts pending: " + str(self.packets_pending)
+ print prefix + "Pkts pending: " + str(len(self.packets))
print prefix + "Pkts total: " + str(self.packets_total)
print prefix + "socket: " + str(self.socket)
@@ -186,10 +221,10 @@
"""
def __init__(self):
self.port_list = {}
- self.debug_level = oft_config.debug_level_default
+ self.debug_level = debug_level_default
def dbg(self, level, string):
- oft_config.debug_log("DPORT", self.debug_level, level, string)
+ debug_log("DPORT", self.debug_level, level, string)
def port_add(self, interface_name, port_number):
"""
@@ -208,7 +243,7 @@
@param port_number The port to send the data to
@param packet Raw packet data to send to port
"""
- self.dbg(oft_config.DEBUG_VERBOSE,
+ self.dbg(DEBUG_VERBOSE,
"Sending %d bytes to port %d" % (len(packet), port_number))
bytes = self.port_list[port_number].send(packet)
if bytes != len(packet):
@@ -232,7 +267,8 @@
def packet_get(self, port_number=None):
"""
Get a packet from the data plane
- If port_number is given, get the packet from that port.
+
+ If port_number is given, get the oldest packet from that port.
Otherwise, find the port with the oldest packet and return
that packet.
@param port_number If set, get packet from this port
@@ -241,13 +277,14 @@
"""
if port_number:
- if self.port_list[port_number].packets_pending != 0:
+ if len(self.port_list[port_number].packets) != 0:
pkt, time = self.port_list[port_number].dequeue()
return port_number, pkt, time
else:
return None, None, None
# Find port with oldest packet
+ #@todo Consider using a single queue for all ports
min_time = 0
min_port = -1
for port_number in self.port_list.keys():
@@ -263,18 +300,18 @@
pkt, time = self.port_list[min_port].dequeue()
return min_port, pkt, time
- def kill(self, join_threads=True):
+ def kill(self, join_threads=False):
"""
Close all sockets for dataplane
- @param join_threads If True (default) call join on each thread
+ @param join_threads If True call join on each thread
"""
for port_number in self.port_list.keys():
self.port_list[port_number].kill()
if join_threads:
- self.dbg(oft_config.DEBUG_INFO, "Joining ", port_number)
+ self.dbg(DEBUG_INFO, "Joining " + str(port_number))
self.port_list[port_number].join()
- self.dbg(oft_config.DEBUG_INFO, "DataPlane shutdown")
+ self.dbg(DEBUG_INFO, "DataPlane shutdown")
def show(self, prefix=''):
print prefix + "Dataplane Controller"
diff --git a/src/python/oftest/oft_config.py b/src/python/oftest/oft_config.py
index e7311e0..264c94d 100644
--- a/src/python/oftest/oft_config.py
+++ b/src/python/oftest/oft_config.py
@@ -101,10 +101,10 @@
elif platform == "bcm_indigo":
interface_ofport_map = {
-# 1 : "eth2",
-# 2 : "eth3",
- 3 : "eth4",
-# 4 : "eth5"
+ 23 : "eth2",
+ 24 : "eth3",
+ 25 : "eth4",
+ 26 : "eth5"
}
# For SSH connections to switch
switch_cxn_type = "ssh"