Support for filtering pkt ins, cleanup
Replaced barrier_to with transact_to. Allow filtering of packet-in
messages which is disabled by default. A threshold is set and,
when enabled, if N packet ins are received without other messages
intervening (and without packet-in being expected) subsequent
packet ins are dropped.
With packet storms on OVS, this has detected some overrun errors
on buffering and framing gets messed up.
Cleaned up some termination code; _socket_ready_handle now
returns an error code rather than True/False.
diff --git a/src/python/oftest/controller.py b/src/python/oftest/controller.py
index 70b3aad..9e3354d 100644
--- a/src/python/oftest/controller.py
+++ b/src/python/oftest/controller.py
@@ -40,6 +40,26 @@
import select
import logging
+
+FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.'
+ for x in range(256)])
+
+def hex_dump_buffer(src, length=16):
+ """
+ Convert src to a hex dump string and return the string
+ @param src The source buffer
+ @param length The number of bytes shown in each line
+ @returns A string showing the hex dump
+ """
+ result = ["\n"]
+ for i in xrange(0, len(src), length):
+ chars = src[i:i+length]
+ hex = ' '.join(["%02x" % ord(x) for x in chars])
+ printable = ''.join(["%s" % ((ord(x) <= 127 and
+ FILTER[ord(x)]) or '.') for x in chars])
+ result.append("%04x %-*s %s\n" % (i, length*3, hex, printable))
+ return ''.join(result)
+
##@todo Find a better home for these identifiers (controller)
RCV_SIZE_DEFAULT = 32768
LISTEN_QUEUE_SIZE = 1
@@ -63,11 +83,9 @@
@var rcv_size The receive size to use for receive calls
@var max_pkts The max size of the receive queue
@var keep_alive If true, listen for echo requests and respond w/
- @var keep_alive If true, listen for echo requests and respond w/
echo replies
@var initial_hello If true, will send a hello message immediately
upon connecting to the switch
- @var exit_on_reset If true, terminate controller on connection reset
@var host The host to use for connect
@var port The port to connect on
@var packets_total Total number of packets received
@@ -102,7 +120,6 @@
self.keep_alive = False
self.active = True
self.initial_hello = True
- self.exit_on_reset = True
# Settings
self.max_pkts = max_pkts
@@ -111,7 +128,11 @@
self.port = port
self.dbg_state = "init"
self.logger = logging.getLogger("controller")
- self.barrier_to = 15 # Barrier timeout default value; add to config
+ self.filter_packet_in = False # Drop "excessive" packet ins
+ self.pkt_in_run = 0 # Count on run of packet ins
+ self.pkt_in_filter_limit = 50 # Count on run of packet ins
+ self.pkt_in_dropped = 0 # Total dropped packet ins
+ self.transact_to = 15 # Transact timeout default value; add to config
# Transaction and message type waiting variables
# xid_cv: Condition variable (semaphore) for packet waiters
@@ -132,6 +153,37 @@
self.expect_msg_response = None
self.buffered_input = ""
+ def filter_packet(self, rawmsg, hdr):
+ """
+ Check if packet should be filtered
+
+ Currently filters packet in messages
+ @return Boolean, True if packet should be dropped
+ """
+ # Add check for packet in and rate limit
+ if self.filter_packet_in:
+ # If this is a packet in and not expecting a packet in
+ if ((not self.expect_msg_type or
+ (self.expect_msg_type != hdr.type)) and
+ hdr.type == OFPT_PACKET_IN):
+
+ self.pkt_in_run += 1
+ # Check if limit exceeded
+ if self.pkt_in_run > self.pkt_in_filter_limit:
+ self.pkt_in_dropped += 1
+ return True
+
+ else: # Not in filtering mode
+ # If we were dropping packets, report number dropped
+ if self.pkt_in_run > self.pkt_in_filter_limit:
+ self.logger.debug("Dropped %d packet ins (%d total)"
+ % ((self.pkt_in_run -
+ self.pkt_in_filter_limit),
+ self.pkt_in_dropped))
+ self.pkt_in_run = 0
+
+ return False
+
def _pkt_handle(self, pkt):
"""
Check for all packet handling conditions
@@ -157,23 +209,26 @@
while offset < len(pkt):
# Parse the header to get type
hdr = of_header_parse(pkt[offset:])
- if not hdr:
- self.logger.info("Could not parse header, pkt len", len(pkt))
- self.parse_errors += 1
- return
- if hdr.length == 0:
- self.logger.error("Header length is zero; out of sync")
- self.parse_errors += 1
+ if not hdr or hdr.length == 0:
+ self.logger.error("Could not parse header")
+ self.logger.error("pkt len %d." % len(pkt))
+ if hdr:
+ self.logger.error("hdr len %d." % hdr.length)
+ self.logger.error("%s" % hex_dump_buffer(pkt[:200]))
+ self.kill()
return
# Extract the raw message bytes
if (offset + hdr.length) > len(pkt):
break
rawmsg = pkt[offset : offset + hdr.length]
-
- self.logger.debug("Msg in: len %d. offset %d. type %s. hdr.len %d" %
- (len(pkt), offset, ofp_type_map[hdr.type], hdr.length))
offset += hdr.length
+
+ if self.filter_packet(rawmsg, hdr):
+ continue
+
+ self.logger.debug("Msg in: buf len %d. hdr.type %s. hdr.len %d" %
+ (len(pkt), ofp_type_map[hdr.type], hdr.length))
if hdr.version != OFP_VERSION:
self.logger.error("Version %d does not match OFTest version %d"
% (hdr.version, OFP_VERSION))
@@ -181,7 +236,7 @@
(hdr.version, OFP_VERSION)
self.active = False
self.switch_socket = None
- self.kill()
+ return
msg = of_message_parse(rawmsg)
if not msg:
@@ -228,7 +283,8 @@
rep = echo_reply()
rep.header.xid = hdr.xid
# Ignoring additional data
- self.message_send(rep.pack(), zero_xid=True)
+ if self.message_send(rep.pack(), zero_xid=True) < 0:
+ self.logger.error("Error sending echo reply")
continue
# Now check for message handlers; preference is given to
@@ -259,44 +315,53 @@
def _socket_ready_handle(self, s):
"""
Handle an input-ready socket
+
@param s The socket object that is ready
- @retval True, reset the switch connection
+ @returns 0 on success, -1 on error
"""
- if s == self.listen_socket:
+ if s and s == self.listen_socket:
if self.switch_socket:
- return False
+ return 0 # Ignore listen socket while connected to switch
(self.switch_socket, self.switch_addr) = \
self.listen_socket.accept()
self.logger.info("Got cxn to " + str(self.switch_addr))
+ self.socs.append(self.switch_socket)
# Notify anyone waiting
self.connect_cv.acquire()
self.connect_cv.notify()
self.connect_cv.release()
- self.socs.append(self.switch_socket)
if self.initial_hello:
self.message_send(hello())
- elif s == self.switch_socket:
- try:
- pkt = self.switch_socket.recv(self.rcv_size)
- except:
- self.logger.warning("Error on switch read")
- return True
+ ## @fixme Check return code
+ elif s and s == self.switch_socket:
+ for idx in range(3): # debug: try a couple of times
+ try:
+ pkt = self.switch_socket.recv(self.rcv_size)
+ except:
+ self.logger.warning("Error on switch read")
+ return -1
+
+ if not self.active:
+ return 0
+
+ if len(pkt) == 0:
+ self.logger.warning("Zero-length switch read, %d" % idx)
+ else:
+ break
- if not self.active:
- return False
-
- if len(pkt) == 0:
+ if len(pkt) == 0: # Still no packet
self.logger.warning("Zero-length switch read; closing cxn")
- return True
+ self.logger.info(str(self))
+ return -1
self._pkt_handle(pkt)
else:
self.logger.error("Unknown socket ready: " + str(s))
- return True
+ return -1
- return False
+ return 0
def run(self):
"""
@@ -329,42 +394,25 @@
self.socs = [self.listen_socket]
self.dbg_state = "running"
while self.active:
- reset_switch_cxn = False
try:
sel_in, sel_out, sel_err = \
select.select(self.socs, [], self.socs, 1)
except:
print sys.exc_info()
self.logger.error("Select error, exiting")
- sys.exit(1)
-
- if not self.active:
+ self.active = False
break
- for s in sel_in:
- reset_switch_cxn = self._socket_ready_handle(s)
-
for s in sel_err:
self.logger.error("Got socket error on: " + str(s))
- if s == self.switch_socket:
- reset_switch_cxn = True
- else:
- self.logger.error("Socket error; exiting")
+ self.active = False
+ break
+
+ for s in sel_in:
+ if self._socket_ready_handle(s) == -1:
self.active = False
break
- if self.active and reset_switch_cxn:
- if self.exit_on_reset:
- self.kill()
- else:
- self.logger.warning("Closing switch cxn")
- try:
- self.switch_socket.close()
- except:
- pass
- self.switch_socket = None
- self.socs = self.socs[0:1]
-
# End of main loop
self.dbg_state = "closing"
self.logger.info("Exiting controller thread")
@@ -404,6 +452,7 @@
@todo Might want to synchronize shutdown with self.sync...
"""
+
self.active = False
try:
self.switch_socket.shutdown(socket.SHUT_RDWR)
@@ -416,6 +465,16 @@
except:
self.logger.info("Ignoring listen soc shutdown error")
self.listen_socket = None
+
+ # Release condition variables on which controller may be wait
+ self.xid_cv.acquire()
+ self.xid_cv.notifyAll()
+ self.xid_cv.release()
+
+ self.connect_cv.acquire()
+ self.connect_cv.notifyAll()
+ self.connect_cv.release()
+
self.dbg_state = "down"
def register(self, msg_type, handler):
@@ -512,7 +571,8 @@
received message handling.
@param msg The message object to send; must not be a string
- @param timeout The timeout in seconds (?)
+ @param timeout The timeout in seconds; if -1 use default. if None
+ blocks without time out
@param zero_xid Normally, if the XID is 0 an XID will be generated
for the message. Set xero_xid to override this behavior
@return The matching message object or None if unsuccessful
@@ -523,7 +583,7 @@
msg.header.xid = gen_xid()
if timeout == -1:
- timeout = self.barrier_to
+ timeout = self.transact_to
self.logger.debug("Running transaction %d" % msg.header.xid)
self.xid_cv.acquire()
if self.xid:
@@ -533,7 +593,11 @@
self.xid = msg.header.xid
self.xid_response = None
- self.message_send(msg.pack())
+ if self.message_send(msg.pack()) < 0:
+ self.logger.error("Error sending pkt for transaction %d" %
+ msg.header.xid)
+ return (None, None)
+
self.logger.debug("Waiting for transaction %d" % msg.header.xid)
self.xid_cv.wait(timeout)
if self.xid_response:
@@ -600,6 +664,8 @@
string += " host " + str(self.host) + "\n"
string += " port " + str(self.port) + "\n"
string += " keep_alive " + str(self.keep_alive) + "\n"
+ string += " pkt_in_run " + str(self.pkt_in_run) + "\n"
+ string += " pkt_in_dropped " + str(self.pkt_in_dropped) + "\n"
return string
def show(self):