Support for filtering pkt ins, cleanup

Replaced barrier_to with transact_to.  Allow filtering of packet-in
messages which is disabled by default.  A threshold is set and,
when enabled, if N packet ins are received without other messages
intervening (and without packet-in being expected) subsequent
packet ins are dropped.

With packet storms on OVS, this has detected some overrun errors
on buffering and framing gets messed up.

Cleaned up some termination code; _socket_ready_handle now
returns an error code rather than True/False.
diff --git a/src/python/oftest/controller.py b/src/python/oftest/controller.py
index 70b3aad..9e3354d 100644
--- a/src/python/oftest/controller.py
+++ b/src/python/oftest/controller.py
@@ -40,6 +40,26 @@
 import select
 import logging
 
+
+FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' 
+                for x in range(256)])
+
+def hex_dump_buffer(src, length=16):
+    """
+    Convert src to a hex dump string and return the string
+    @param src The source buffer
+    @param length The number of bytes shown in each line
+    @returns A string showing the hex dump
+    """
+    result = ["\n"]
+    for i in xrange(0, len(src), length):
+       chars = src[i:i+length]
+       hex = ' '.join(["%02x" % ord(x) for x in chars])
+       printable = ''.join(["%s" % ((ord(x) <= 127 and
+                                     FILTER[ord(x)]) or '.') for x in chars])
+       result.append("%04x  %-*s  %s\n" % (i, length*3, hex, printable))
+    return ''.join(result)
+
 ##@todo Find a better home for these identifiers (controller)
 RCV_SIZE_DEFAULT = 32768
 LISTEN_QUEUE_SIZE = 1
@@ -63,11 +83,9 @@
     @var rcv_size The receive size to use for receive calls
     @var max_pkts The max size of the receive queue
     @var keep_alive If true, listen for echo requests and respond w/
-    @var keep_alive If true, listen for echo requests and respond w/
     echo replies
     @var initial_hello If true, will send a hello message immediately
     upon connecting to the switch
-    @var exit_on_reset If true, terminate controller on connection reset
     @var host The host to use for connect
     @var port The port to connect on 
     @var packets_total Total number of packets received
@@ -102,7 +120,6 @@
         self.keep_alive = False
         self.active = True
         self.initial_hello = True
-        self.exit_on_reset = True
 
         # Settings
         self.max_pkts = max_pkts
@@ -111,7 +128,11 @@
         self.port = port
         self.dbg_state = "init"
         self.logger = logging.getLogger("controller")
-        self.barrier_to = 15 # Barrier timeout default value; add to config
+        self.filter_packet_in = False # Drop "excessive" packet ins
+        self.pkt_in_run = 0 # Count on run of packet ins
+        self.pkt_in_filter_limit = 50 # Count on run of packet ins
+        self.pkt_in_dropped = 0 # Total dropped packet ins
+        self.transact_to = 15 # Transact timeout default value; add to config
 
         # Transaction and message type waiting variables 
         #   xid_cv: Condition variable (semaphore) for packet waiters
@@ -132,6 +153,37 @@
         self.expect_msg_response = None
         self.buffered_input = ""
 
+    def filter_packet(self, rawmsg, hdr):
+        """
+        Check if packet should be filtered
+
+        Currently filters packet in messages
+        @return Boolean, True if packet should be dropped
+        """
+        # Add check for packet in and rate limit
+        if self.filter_packet_in:
+            # If this is a packet in and not expecting a packet in
+            if ((not self.expect_msg_type or
+                 (self.expect_msg_type != hdr.type)) and
+                hdr.type == OFPT_PACKET_IN):
+
+                self.pkt_in_run += 1
+                # Check if limit exceeded
+                if self.pkt_in_run > self.pkt_in_filter_limit:
+                    self.pkt_in_dropped += 1
+                    return True
+
+            else: # Not in filtering mode
+                # If we were dropping packets, report number dropped
+                if self.pkt_in_run > self.pkt_in_filter_limit:
+                    self.logger.debug("Dropped %d packet ins (%d total)"
+                             % ((self.pkt_in_run - 
+                                 self.pkt_in_filter_limit),
+                                 self.pkt_in_dropped))
+                self.pkt_in_run = 0
+
+        return False
+
     def _pkt_handle(self, pkt):
         """
         Check for all packet handling conditions
@@ -157,23 +209,26 @@
         while offset < len(pkt):
             # Parse the header to get type
             hdr = of_header_parse(pkt[offset:])
-            if not hdr:
-                self.logger.info("Could not parse header, pkt len", len(pkt))
-                self.parse_errors += 1
-                return
-            if hdr.length == 0:
-                self.logger.error("Header length is zero; out of sync")
-                self.parse_errors += 1
+            if not hdr or hdr.length == 0:
+                self.logger.error("Could not parse header")
+                self.logger.error("pkt len %d." % len(pkt))
+                if hdr:
+                    self.logger.error("hdr len %d." % hdr.length)
+                self.logger.error("%s" % hex_dump_buffer(pkt[:200]))
+                self.kill()
                 return
 
             # Extract the raw message bytes
             if (offset + hdr.length) > len(pkt):
                 break
             rawmsg = pkt[offset : offset + hdr.length]
-
-            self.logger.debug("Msg in: len %d. offset %d. type %s. hdr.len %d" %
-                (len(pkt), offset, ofp_type_map[hdr.type], hdr.length))
             offset += hdr.length
+
+            if self.filter_packet(rawmsg, hdr):
+                continue
+
+            self.logger.debug("Msg in: buf len %d. hdr.type %s. hdr.len %d" %
+                              (len(pkt), ofp_type_map[hdr.type], hdr.length))
             if hdr.version != OFP_VERSION:
                 self.logger.error("Version %d does not match OFTest version %d"
                                   % (hdr.version, OFP_VERSION))
@@ -181,7 +236,7 @@
                     (hdr.version, OFP_VERSION)
                 self.active = False
                 self.switch_socket = None
-                self.kill()
+                return
 
             msg = of_message_parse(rawmsg)
             if not msg:
@@ -228,7 +283,8 @@
                     rep = echo_reply()
                     rep.header.xid = hdr.xid
                     # Ignoring additional data
-                    self.message_send(rep.pack(), zero_xid=True)
+                    if self.message_send(rep.pack(), zero_xid=True) < 0:
+                        self.logger.error("Error sending echo reply")
                     continue
 
             # Now check for message handlers; preference is given to
@@ -259,44 +315,53 @@
     def _socket_ready_handle(self, s):
         """
         Handle an input-ready socket
+
         @param s The socket object that is ready
-        @retval True, reset the switch connection
+        @returns 0 on success, -1 on error
         """
 
-        if s == self.listen_socket:
+        if s and s == self.listen_socket:
             if self.switch_socket:
-                return False
+                return 0 # Ignore listen socket while connected to switch
 
             (self.switch_socket, self.switch_addr) = \
                 self.listen_socket.accept()
             self.logger.info("Got cxn to " + str(self.switch_addr))
+            self.socs.append(self.switch_socket)
             # Notify anyone waiting
             self.connect_cv.acquire()
             self.connect_cv.notify()
             self.connect_cv.release()
-            self.socs.append(self.switch_socket)
             if self.initial_hello:
                 self.message_send(hello())
-        elif s == self.switch_socket:
-            try:
-                pkt = self.switch_socket.recv(self.rcv_size)
-            except:
-                self.logger.warning("Error on switch read")
-                return True
+                ## @fixme Check return code
+        elif s and s == self.switch_socket:
+            for idx in range(3): # debug: try a couple of times
+                try:
+                    pkt = self.switch_socket.recv(self.rcv_size)
+                except:
+                    self.logger.warning("Error on switch read")
+                    return -1
+      
+                if not self.active:
+                    return 0
+      
+                if len(pkt) == 0:
+                    self.logger.warning("Zero-length switch read, %d" % idx)
+                else:
+                    break
 
-            if not self.active:
-                return False
-
-            if len(pkt) == 0:
+            if len(pkt) == 0: # Still no packet
                 self.logger.warning("Zero-length switch read; closing cxn")
-                return True
+                self.logger.info(str(self))
+                return -1
 
             self._pkt_handle(pkt)
         else:
             self.logger.error("Unknown socket ready: " + str(s))
-            return True
+            return -1
 
-        return False
+        return 0
 
     def run(self):
         """
@@ -329,42 +394,25 @@
         self.socs = [self.listen_socket]
         self.dbg_state = "running"
         while self.active:
-            reset_switch_cxn = False
             try:
                 sel_in, sel_out, sel_err = \
                     select.select(self.socs, [], self.socs, 1)
             except:
                 print sys.exc_info()
                 self.logger.error("Select error, exiting")
-                sys.exit(1)
-
-            if not self.active:
+                self.active = False
                 break
 
-            for s in sel_in:
-                reset_switch_cxn = self._socket_ready_handle(s)
-
             for s in sel_err:
                 self.logger.error("Got socket error on: " + str(s))
-                if s == self.switch_socket:
-                    reset_switch_cxn = True
-                else:
-                    self.logger.error("Socket error; exiting")
+                self.active = False
+                break
+
+            for s in sel_in:
+                if self._socket_ready_handle(s) == -1:
                     self.active = False
                     break
 
-            if self.active and reset_switch_cxn:
-                if self.exit_on_reset:
-                    self.kill()
-                else:
-                    self.logger.warning("Closing switch cxn")
-                    try:
-                        self.switch_socket.close()
-                    except:
-                        pass
-                    self.switch_socket = None
-                    self.socs = self.socs[0:1]
-
         # End of main loop
         self.dbg_state = "closing"
         self.logger.info("Exiting controller thread")
@@ -404,6 +452,7 @@
 
         @todo Might want to synchronize shutdown with self.sync...
         """
+
         self.active = False
         try:
             self.switch_socket.shutdown(socket.SHUT_RDWR)
@@ -416,6 +465,16 @@
         except:
             self.logger.info("Ignoring listen soc shutdown error")
         self.listen_socket = None
+
+        # Release condition variables on which controller may be wait
+        self.xid_cv.acquire()
+        self.xid_cv.notifyAll()
+        self.xid_cv.release()
+
+        self.connect_cv.acquire()
+        self.connect_cv.notifyAll()
+        self.connect_cv.release()
+
         self.dbg_state = "down"
 
     def register(self, msg_type, handler):
@@ -512,7 +571,8 @@
         received message handling.
 
         @param msg The message object to send; must not be a string
-        @param timeout The timeout in seconds (?)
+        @param timeout The timeout in seconds; if -1 use default. if None
+        blocks without time out
         @param zero_xid Normally, if the XID is 0 an XID will be generated
         for the message.  Set xero_xid to override this behavior
         @return The matching message object or None if unsuccessful
@@ -523,7 +583,7 @@
             msg.header.xid = gen_xid()
 
         if timeout == -1:
-            timeout = self.barrier_to
+            timeout = self.transact_to
         self.logger.debug("Running transaction %d" % msg.header.xid)
         self.xid_cv.acquire()
         if self.xid:
@@ -533,7 +593,11 @@
 
         self.xid = msg.header.xid
         self.xid_response = None
-        self.message_send(msg.pack())
+        if self.message_send(msg.pack()) < 0:
+            self.logger.error("Error sending pkt for transaction %d" %
+                              msg.header.xid)
+            return (None, None)
+
         self.logger.debug("Waiting for transaction %d" % msg.header.xid)
         self.xid_cv.wait(timeout)
         if self.xid_response:
@@ -600,6 +664,8 @@
         string += "  host            " + str(self.host) + "\n"
         string += "  port            " + str(self.port) + "\n"
         string += "  keep_alive      " + str(self.keep_alive) + "\n"
+        string += "  pkt_in_run      " + str(self.pkt_in_run) + "\n"
+        string += "  pkt_in_dropped  " + str(self.pkt_in_dropped) + "\n"
         return string
 
     def show(self):