Merge branch 'master' of github.com:floodlight/oftest
diff --git a/src/python/oftest/controller.py b/src/python/oftest/controller.py
index 70b3aad..9e3354d 100644
--- a/src/python/oftest/controller.py
+++ b/src/python/oftest/controller.py
@@ -40,6 +40,26 @@
 import select
 import logging
 
+
+FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' 
+                for x in range(256)])
+
+def hex_dump_buffer(src, length=16):
+    """
+    Convert src to a hex dump string and return the string
+    @param src The source buffer
+    @param length The number of bytes shown in each line
+    @returns A string showing the hex dump
+    """
+    result = ["\n"]
+    for i in xrange(0, len(src), length):
+       chars = src[i:i+length]
+       hex = ' '.join(["%02x" % ord(x) for x in chars])
+       printable = ''.join(["%s" % ((ord(x) <= 127 and
+                                     FILTER[ord(x)]) or '.') for x in chars])
+       result.append("%04x  %-*s  %s\n" % (i, length*3, hex, printable))
+    return ''.join(result)
+
 ##@todo Find a better home for these identifiers (controller)
 RCV_SIZE_DEFAULT = 32768
 LISTEN_QUEUE_SIZE = 1
@@ -63,11 +83,9 @@
     @var rcv_size The receive size to use for receive calls
     @var max_pkts The max size of the receive queue
     @var keep_alive If true, listen for echo requests and respond w/
-    @var keep_alive If true, listen for echo requests and respond w/
     echo replies
     @var initial_hello If true, will send a hello message immediately
     upon connecting to the switch
-    @var exit_on_reset If true, terminate controller on connection reset
     @var host The host to use for connect
     @var port The port to connect on 
     @var packets_total Total number of packets received
@@ -102,7 +120,6 @@
         self.keep_alive = False
         self.active = True
         self.initial_hello = True
-        self.exit_on_reset = True
 
         # Settings
         self.max_pkts = max_pkts
@@ -111,7 +128,11 @@
         self.port = port
         self.dbg_state = "init"
         self.logger = logging.getLogger("controller")
-        self.barrier_to = 15 # Barrier timeout default value; add to config
+        self.filter_packet_in = False # Drop "excessive" packet ins
+        self.pkt_in_run = 0 # Count on run of packet ins
+        self.pkt_in_filter_limit = 50 # Count on run of packet ins
+        self.pkt_in_dropped = 0 # Total dropped packet ins
+        self.transact_to = 15 # Transact timeout default value; add to config
 
         # Transaction and message type waiting variables 
         #   xid_cv: Condition variable (semaphore) for packet waiters
@@ -132,6 +153,37 @@
         self.expect_msg_response = None
         self.buffered_input = ""
 
+    def filter_packet(self, rawmsg, hdr):
+        """
+        Check if packet should be filtered
+
+        Currently filters packet in messages
+        @return Boolean, True if packet should be dropped
+        """
+        # Add check for packet in and rate limit
+        if self.filter_packet_in:
+            # If this is a packet in and not expecting a packet in
+            if ((not self.expect_msg_type or
+                 (self.expect_msg_type != hdr.type)) and
+                hdr.type == OFPT_PACKET_IN):
+
+                self.pkt_in_run += 1
+                # Check if limit exceeded
+                if self.pkt_in_run > self.pkt_in_filter_limit:
+                    self.pkt_in_dropped += 1
+                    return True
+
+            else: # Not in filtering mode
+                # If we were dropping packets, report number dropped
+                if self.pkt_in_run > self.pkt_in_filter_limit:
+                    self.logger.debug("Dropped %d packet ins (%d total)"
+                             % ((self.pkt_in_run - 
+                                 self.pkt_in_filter_limit),
+                                 self.pkt_in_dropped))
+                self.pkt_in_run = 0
+
+        return False
+
     def _pkt_handle(self, pkt):
         """
         Check for all packet handling conditions
@@ -157,23 +209,26 @@
         while offset < len(pkt):
             # Parse the header to get type
             hdr = of_header_parse(pkt[offset:])
-            if not hdr:
-                self.logger.info("Could not parse header, pkt len", len(pkt))
-                self.parse_errors += 1
-                return
-            if hdr.length == 0:
-                self.logger.error("Header length is zero; out of sync")
-                self.parse_errors += 1
+            if not hdr or hdr.length == 0:
+                self.logger.error("Could not parse header")
+                self.logger.error("pkt len %d." % len(pkt))
+                if hdr:
+                    self.logger.error("hdr len %d." % hdr.length)
+                self.logger.error("%s" % hex_dump_buffer(pkt[:200]))
+                self.kill()
                 return
 
             # Extract the raw message bytes
             if (offset + hdr.length) > len(pkt):
                 break
             rawmsg = pkt[offset : offset + hdr.length]
-
-            self.logger.debug("Msg in: len %d. offset %d. type %s. hdr.len %d" %
-                (len(pkt), offset, ofp_type_map[hdr.type], hdr.length))
             offset += hdr.length
+
+            if self.filter_packet(rawmsg, hdr):
+                continue
+
+            self.logger.debug("Msg in: buf len %d. hdr.type %s. hdr.len %d" %
+                              (len(pkt), ofp_type_map[hdr.type], hdr.length))
             if hdr.version != OFP_VERSION:
                 self.logger.error("Version %d does not match OFTest version %d"
                                   % (hdr.version, OFP_VERSION))
@@ -181,7 +236,7 @@
                     (hdr.version, OFP_VERSION)
                 self.active = False
                 self.switch_socket = None
-                self.kill()
+                return
 
             msg = of_message_parse(rawmsg)
             if not msg:
@@ -228,7 +283,8 @@
                     rep = echo_reply()
                     rep.header.xid = hdr.xid
                     # Ignoring additional data
-                    self.message_send(rep.pack(), zero_xid=True)
+                    if self.message_send(rep.pack(), zero_xid=True) < 0:
+                        self.logger.error("Error sending echo reply")
                     continue
 
             # Now check for message handlers; preference is given to
@@ -259,44 +315,53 @@
     def _socket_ready_handle(self, s):
         """
         Handle an input-ready socket
+
         @param s The socket object that is ready
-        @retval True, reset the switch connection
+        @returns 0 on success, -1 on error
         """
 
-        if s == self.listen_socket:
+        if s and s == self.listen_socket:
             if self.switch_socket:
-                return False
+                return 0 # Ignore listen socket while connected to switch
 
             (self.switch_socket, self.switch_addr) = \
                 self.listen_socket.accept()
             self.logger.info("Got cxn to " + str(self.switch_addr))
+            self.socs.append(self.switch_socket)
             # Notify anyone waiting
             self.connect_cv.acquire()
             self.connect_cv.notify()
             self.connect_cv.release()
-            self.socs.append(self.switch_socket)
             if self.initial_hello:
                 self.message_send(hello())
-        elif s == self.switch_socket:
-            try:
-                pkt = self.switch_socket.recv(self.rcv_size)
-            except:
-                self.logger.warning("Error on switch read")
-                return True
+                ## @fixme Check return code
+        elif s and s == self.switch_socket:
+            for idx in range(3): # debug: try a couple of times
+                try:
+                    pkt = self.switch_socket.recv(self.rcv_size)
+                except:
+                    self.logger.warning("Error on switch read")
+                    return -1
+      
+                if not self.active:
+                    return 0
+      
+                if len(pkt) == 0:
+                    self.logger.warning("Zero-length switch read, %d" % idx)
+                else:
+                    break
 
-            if not self.active:
-                return False
-
-            if len(pkt) == 0:
+            if len(pkt) == 0: # Still no packet
                 self.logger.warning("Zero-length switch read; closing cxn")
-                return True
+                self.logger.info(str(self))
+                return -1
 
             self._pkt_handle(pkt)
         else:
             self.logger.error("Unknown socket ready: " + str(s))
-            return True
+            return -1
 
-        return False
+        return 0
 
     def run(self):
         """
@@ -329,42 +394,25 @@
         self.socs = [self.listen_socket]
         self.dbg_state = "running"
         while self.active:
-            reset_switch_cxn = False
             try:
                 sel_in, sel_out, sel_err = \
                     select.select(self.socs, [], self.socs, 1)
             except:
                 print sys.exc_info()
                 self.logger.error("Select error, exiting")
-                sys.exit(1)
-
-            if not self.active:
+                self.active = False
                 break
 
-            for s in sel_in:
-                reset_switch_cxn = self._socket_ready_handle(s)
-
             for s in sel_err:
                 self.logger.error("Got socket error on: " + str(s))
-                if s == self.switch_socket:
-                    reset_switch_cxn = True
-                else:
-                    self.logger.error("Socket error; exiting")
+                self.active = False
+                break
+
+            for s in sel_in:
+                if self._socket_ready_handle(s) == -1:
                     self.active = False
                     break
 
-            if self.active and reset_switch_cxn:
-                if self.exit_on_reset:
-                    self.kill()
-                else:
-                    self.logger.warning("Closing switch cxn")
-                    try:
-                        self.switch_socket.close()
-                    except:
-                        pass
-                    self.switch_socket = None
-                    self.socs = self.socs[0:1]
-
         # End of main loop
         self.dbg_state = "closing"
         self.logger.info("Exiting controller thread")
@@ -404,6 +452,7 @@
 
         @todo Might want to synchronize shutdown with self.sync...
         """
+
         self.active = False
         try:
             self.switch_socket.shutdown(socket.SHUT_RDWR)
@@ -416,6 +465,16 @@
         except:
             self.logger.info("Ignoring listen soc shutdown error")
         self.listen_socket = None
+
+        # Release condition variables on which controller may be wait
+        self.xid_cv.acquire()
+        self.xid_cv.notifyAll()
+        self.xid_cv.release()
+
+        self.connect_cv.acquire()
+        self.connect_cv.notifyAll()
+        self.connect_cv.release()
+
         self.dbg_state = "down"
 
     def register(self, msg_type, handler):
@@ -512,7 +571,8 @@
         received message handling.
 
         @param msg The message object to send; must not be a string
-        @param timeout The timeout in seconds (?)
+        @param timeout The timeout in seconds; if -1 use default. if None
+        blocks without time out
         @param zero_xid Normally, if the XID is 0 an XID will be generated
         for the message.  Set xero_xid to override this behavior
         @return The matching message object or None if unsuccessful
@@ -523,7 +583,7 @@
             msg.header.xid = gen_xid()
 
         if timeout == -1:
-            timeout = self.barrier_to
+            timeout = self.transact_to
         self.logger.debug("Running transaction %d" % msg.header.xid)
         self.xid_cv.acquire()
         if self.xid:
@@ -533,7 +593,11 @@
 
         self.xid = msg.header.xid
         self.xid_response = None
-        self.message_send(msg.pack())
+        if self.message_send(msg.pack()) < 0:
+            self.logger.error("Error sending pkt for transaction %d" %
+                              msg.header.xid)
+            return (None, None)
+
         self.logger.debug("Waiting for transaction %d" % msg.header.xid)
         self.xid_cv.wait(timeout)
         if self.xid_response:
@@ -600,6 +664,8 @@
         string += "  host            " + str(self.host) + "\n"
         string += "  port            " + str(self.port) + "\n"
         string += "  keep_alive      " + str(self.keep_alive) + "\n"
+        string += "  pkt_in_run      " + str(self.pkt_in_run) + "\n"
+        string += "  pkt_in_dropped  " + str(self.pkt_in_dropped) + "\n"
         return string
 
     def show(self):
diff --git a/tests/load.py b/tests/load.py
new file mode 100644
index 0000000..d168a17
--- /dev/null
+++ b/tests/load.py
@@ -0,0 +1,134 @@
+"""
+Prototype test cases related to operation under load
+
+It is recommended that these definitions be kept in their own
+namespace as different groups of tests will likely define 
+similar identifiers.
+
+  The function test_set_init is called with a complete configuration
+dictionary prior to the invocation of any tests from this file.
+
+  The switch is actively attempting to contact the controller at the address
+indicated in oft_config
+
+In general these test cases make some assumption about the external
+configuration of the switch under test.  For now, the assumption is
+that the first two OF ports are connected by a loopback cable.
+"""
+
+import copy
+
+import logging
+
+import unittest
+
+import oftest.controller as controller
+import oftest.cstruct as ofp
+import oftest.message as message
+import oftest.dataplane as dataplane
+import oftest.action as action
+import oftest.parse as parse
+import basic
+import time
+
+from testutils import *
+
+#@var load_port_map Local copy of the configuration map from OF port
+# numbers to OS interfaces
+load_port_map = None
+#@var load_logger Local logger object
+load_logger = None
+#@var load_config Local copy of global configuration data
+load_config = None
+
+# For test priority
+#@var test_prio Set test priority for local tests
+test_prio = {}
+
+
+def test_set_init(config):
+    """
+    Set up function for packet action test classes
+
+    @param config The configuration dictionary; see oft
+    """
+
+    global load_port_map
+    global load_logger
+    global load_config
+
+    load_logger = logging.getLogger("load")
+    load_logger.info("Initializing test set")
+    load_port_map = config["port_map"]
+    load_config = config
+
+class LoadBarrier(basic.SimpleProtocol):
+    """
+    Test barrier under load with loopback
+
+    This test assumes there is a pair of ports on the switch with
+    a loopback cable connected and that spanning tree is disabled.
+    A flow is installed to cause a storm of packet-in messages
+    when a packet is sent to the loopbacked interface.  After causing 
+    this storm, a barrier request is sent.
+
+    The test succeeds if the barrier response is received.  Otherwise
+    the test fails.
+    """
+    def runTest(self):
+        # Set up flow to send from port 1 to port 2 and copy to CPU
+        # Test parameter gives LB port base (assumes consecutive)
+        lb_port = test_param_get(self.config, 'lb_port', default=1)
+        barrier_count = test_param_get(self.config, 'barrier_count', 
+                                       default=10)
+
+        # Set controller to filter packet ins
+        self.controller.filter_packet_in = True
+        self.controller.pkt_in_filter_limit = 10
+
+        pkt = simple_tcp_packet()
+        match = parse.packet_to_flow_match(pkt)
+        match.wildcards &= ~ofp.OFPFW_IN_PORT
+        match.in_port = lb_port
+        act = action.action_output()
+        act.port = lb_port + 1
+
+        request = message.flow_mod()
+        request.match = match
+        request.hard_timeout = 2 * barrier_count
+
+        request.buffer_id = 0xffffffff
+        self.assertTrue(request.actions.add(act), "Could not add action")
+
+        act = action.action_output()
+        act.port = ofp.OFPP_CONTROLLER
+        self.assertTrue(request.actions.add(act), "Could not add action")
+
+        rv = self.controller.message_send(request)
+        self.assertTrue(rv != -1, "Error installing flow mod")
+        self.assertEqual(do_barrier(self.controller), 0, "Barrier failed")
+
+        # Create packet out and send to port lb_port + 1
+        msg = message.packet_out()
+        msg.data = str(pkt)
+        act = action.action_output()
+        act.port = lb_port + 1
+        self.assertTrue(msg.actions.add(act), 'Could not add action to msg')
+        load_logger.info("Sleeping before starting storm")
+        time.sleep(1) # Root causing issue with fast disconnects
+        load_logger.info("Sending packet out to %d" % (lb_port + 1))
+        rv = self.controller.message_send(msg)
+        self.assertTrue(rv == 0, "Error sending out message")
+
+        for idx in range(0, barrier_count):
+            self.assertEqual(do_barrier(self.controller), 0, "Barrier failed")
+            # To do:  Add some interesting functionality here
+            load_logger.info("Barrier %d completed" % idx)
+
+        # Clear the flow table when done
+        load_logger.debug("Deleting all flows from switch")
+        rc = delete_all_flows(self.controller, load_logger)
+        self.assertEqual(rc, 0, "Failed to delete all flows")
+
+# Do not run by default; still mysterious disconnects often
+test_prio["LoadBarrier"] = -1
diff --git a/tests/oft b/tests/oft
index 828eefa..c834b8e 100755
--- a/tests/oft
+++ b/tests/oft
@@ -417,11 +417,14 @@
 # Check if test list is requested; display and exit if so
 if config["list"]:
     did_print = False
+    mod_count = 0
+    test_count = 0
     print "\nTest List:"
     for mod in config["all_tests"].keys():
         if config["test_spec"] != "all" and \
                 config["test_spec"] != mod.__name__:
             continue
+        mod_count += 1
         did_print = True
         desc = mod.__doc__.strip()
         desc = desc.split('\n')[0]
@@ -440,10 +443,14 @@
             if len(start_str) > 22:
                 desc = "\n" + _space_to(22, "") + desc
             print start_str + _space_to(22, start_str) + desc
+            test_count += 1
         print
     if not did_print:
         print "No tests found for " + config["test_spec"]
     else:
+        print "%d modules shown with a total of %d tests" % \
+            (mod_count, test_count)
+        print
         print "Tests preceded by * are not run by default"
     print "Tests marked (TP1) after name take --test-params including:"
     print "    'vid=N;strip_vlan=bool;add_vlan=bool'"
diff --git a/tests/pktact.py b/tests/pktact.py
index 4598262..a686dbd 100644
--- a/tests/pktact.py
+++ b/tests/pktact.py
@@ -874,9 +874,10 @@
 
 
 
-    def installFlow(self, prio, inp, egp):
+    def installFlow(self, prio, inp, egp, 
+                    wildcards=ofp.OFPFW_DL_SRC):
         request = flow_msg_create(self, self.pkt, ing_port=inp, 
-                                  wildcards=ofp.OFPFW_DL_SRC, 
+                                  wildcards=wildcards,
                                   egr_ports=egp)
         request.priority = prio
         flow_msg_install(self, request, clear_table_override=False)
@@ -895,13 +896,16 @@
             raise Exception("Not initialized")
 
 
-    def verifyFlow(self, inp, egp):
+    def verifyFlow(self, inp, egp, pkt=None):
+        if pkt == None:
+            pkt = self.pkt
+
         self.logger.info("Pkt match test: " + str(inp) + 
                          " to " + str(egp))
         self.logger.debug("Send packet: " + str(inp) + " to " 
                             + str(egp))
-        self.dataplane.send(inp, str(self.pkt))
-        receive_pkt_verify(self, egp, self.pkt, inp)
+        self.dataplane.send(inp, str(pkt))
+        receive_pkt_verify(self, egp, pkt, inp)
 
 
        
@@ -930,7 +934,26 @@
 
 
 
+class WildcardPriority(SingleWildcardMatchPriority):
+
+    def runTest(self):
         
+        self._Init()
+
+        of_ports = pa_port_map.keys()
+        of_ports.sort()
+
+        self._ClearTable()
+        # Install a flow with no wildcards for our packet:
+        self.installFlow(1000, of_ports[0], of_ports[1], wildcards=0)
+        self.verifyFlow(of_ports[0], of_ports[1])
+        # Install a flow with wildcards for our packet with higher
+        # priority. 
+        self.installFlow(1001, of_ports[0], of_ports[2])
+        self.verifyFlow(of_ports[0], of_ports[2])
+        
+
+
 class SingleWildcardMatch(BaseMatchCase):
     """
     Exercise wildcard matching for all ports
diff --git a/tests/testutils.py b/tests/testutils.py
index 9c7b81f..68ee343 100644
--- a/tests/testutils.py
+++ b/tests/testutils.py
@@ -174,8 +174,8 @@
     Return 0 on success, -1 on error
     """
     b = message.barrier_request()
-    (resp, pkt) = ctrl.transact(b, timeout=ctrl.barrier_to)
-    # We'll trust the transaction processing in the controller
+    (resp, pkt) = ctrl.transact(b)
+    # We'll trust the transaction processing in the controller that xid matched
     if not resp:
         return -1
     return 0
@@ -838,7 +838,7 @@
     @param length The number of bytes shown in each line
     @returns A string showing the hex dump
     """
-    result = []
+    result = ["\n"]
     for i in xrange(0, len(src), length):
        chars = src[i:i+length]
        hex = ' '.join(["%02x" % ord(x) for x in chars])
diff --git a/tools/ovs-ctl/ovs-ctl.py b/tools/ovs-ctl/ovs-ctl.py
index 5df57d9..4d4f100 100755
--- a/tools/ovs-ctl/ovs-ctl.py
+++ b/tools/ovs-ctl/ovs-ctl.py
@@ -178,8 +178,19 @@
 use this option""", 
                      action='store_true', default=False)
 
+gParser.add_argument('-lb', '--loopback',
+                     help="Create a loopback pair.  The port numbers are port_count+1 and port_count+2.", 
+                     default=False, action='store_true')
 
 
+gParser.add_argument("--cli", 
+                     help="Run the ovs-ctl cli after initialization", 
+                     action='store_true', default=False)
+
+gParser.add_argument("--teardown", 
+                     help="Kill OVS instance after CLI exits", 
+                     action='store_true', default=False)
+
 #
 # Reset defaults based on config files and override
 #
@@ -374,22 +385,26 @@
     print gServerPid
     vsctl(["del-br", gArgs.bridge])
 
-# Kill existing DB/vswitchd
-killp(gSwitchPid)
-killp(gServerPid)
-killp(gLogPid)
 
-# Remove old logpid file, since this does not happen automagically
-if os.path.exists(gLogPid):
-    os.remove(gLogPid)
+def killall():
+    # Kill existing DB/vswitchd
+    killp(gSwitchPid)
+    killp(gServerPid)
+    killp(gLogPid)
 
-if gArgs.keep_veths == False:
-    lcall(['/sbin/rmmod', 'veth'])
-    lcall(['/sbin/modprobe', 'veth'])
+    # Remove old logpid file, since this does not happen automagically
+    if os.path.exists(gLogPid):
+        os.remove(gLogPid)
 
-# Remove kmod
-lcall(['/sbin/rmmod', gArgs.ovs_kmod])
+    if gArgs.keep_veths == False:
+        lcall(['/sbin/rmmod', 'veth'])
+        lcall(['/sbin/modprobe', 'veth'])
 
+    # Remove kmod
+    lcall(['/sbin/rmmod', gArgs.ovs_kmod])
+
+
+killall()
 if gArgs.kill == True:
     # Don't do anything else
     sys.exit()
@@ -400,8 +415,11 @@
 # Insert openvswitch module
 lcall(['/sbin/insmod', gArgs.ovs_kmod])
 
-createVeths(gArgs.port_count)
-vethsUp(gArgs.port_count)
+port_count = gArgs.port_count
+if gArgs.loopback:
+    port_count += 1
+createVeths(port_count)
+vethsUp(port_count)
 
 if not os.path.exists(gArgs.ovs_db_file) or gArgs.keep_db == False:
     print "Initializing DB @ %s" % (gArgs.ovs_db_file)
@@ -450,6 +468,11 @@
 for idx in range(0, gArgs.port_count):
     vsctl(["add-port", gArgs.bridge, "veth%s" % (idx*2)])
 
+# Check if loopback port added
+if gArgs.loopback:
+    lb_idx = gArgs.port_count * 2
+    vsctl(["add-port", gArgs.bridge, "veth%d" % (lb_idx)])
+    vsctl(["add-port", gArgs.bridge, "veth%d" % (lb_idx+1)])
 
 # Set controller
 vsctl(["set-controller", gArgs.bridge, "tcp:%s:%s" % (
@@ -463,3 +486,30 @@
 ofctl(["show", gArgs.bridge])
 
 
+if gArgs.cli:
+    while True:
+        cmd = raw_input("[%s] ovs-ctl> " % gConfigSection)
+        if cmd and cmd != "":
+            args = cmd.split(" ")
+            if args[0] == "vsctl" or args[0] == "ovs-vsctl":
+                vsctl(args[1:])
+            elif args[0] == "ofctl" or args[0] == "ovs-ofctl":
+                ofctl(args[1:])
+            elif args[0] == "exit" or args[0] == "quit":
+                break; 
+            elif args[0] == "kill":
+                gArgs.teardown = True
+                break
+            else:
+                print "unknown command '%s'" % args[0]
+            
+
+if gArgs.teardown:
+    print "Killing OVS"
+    killall()
+
+
+
+
+                        
+