Merge pull request #25 from InCNTRE/master

Counters.py & detailed testing methodology
diff --git a/platforms/ovs-dummy.py b/platforms/ovs-dummy.py
new file mode 100644
index 0000000..bc6f862
--- /dev/null
+++ b/platforms/ovs-dummy.py
@@ -0,0 +1,213 @@
+"""
+Dummy platform
+
+This platform uses Open vSwitch dummy interfaces.
+"""
+
+import logging
+import os
+import select
+import socket
+import struct
+import sys
+import time
+from threading import Thread
+from threading import Lock
+
+RCV_TIMEOUT = 10000
+RUN_DIR = os.environ.get("OVS_RUNDIR", "/var/run/openvswitch")
+
+class DataPlanePortOVSDummy(Thread):
+    """
+    Class defining a port monitoring object that uses Unix domain
+    sockets for ports, intended for connecting to Open vSwitch "dummy"
+    netdevs.
+    """
+
+    def __init__(self, interface_name, port_number, parent, max_pkts=1024):
+        """
+        Set up a port monitor object
+        @param interface_name The name of the physical interface like eth1
+        @param port_number The port number associated with this port
+        @param parent The controlling dataplane object; for pkt wait CV
+        @param max_pkts Maximum number of pkts to keep in queue
+        """
+        Thread.__init__(self)
+        self.interface_name = interface_name
+        self.max_pkts = max_pkts
+        self.packets_total = 0
+        self.packets = []
+        self.packets_discarded = 0
+        self.port_number = port_number
+        self.txq = []
+        self.txq_lock = Lock()
+        logname = "dp-" + interface_name
+        self.logger = logging.getLogger(logname)
+        try:
+            self.socket = DataPlanePortOVSDummy.interface_open(interface_name)
+        except:
+            self.logger.info("Could not open socket")
+            raise
+        self.logger.info("Opened port monitor (class %s)", type(self).__name__)
+        self.parent = parent
+
+    @staticmethod
+    def interface_open(interface_name):
+        """
+        Open a Unix domain socket interface.
+        @param interface_name port name as a string such as 'eth1'
+        @retval s socket
+        """
+        s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        s.settimeout(RCV_TIMEOUT)
+        s.setblocking(0)
+        s.connect("%s/%s" % (RUN_DIR, interface_name))
+        return s
+
+    def run(self):
+        """
+        Activity function for class
+        """
+        self.running = True
+        rxbuf = ""
+        while self.running:
+            try:
+                self.txq_lock.acquire()
+                if self.txq:
+                    wlist = [self.socket]
+                else:
+                    wlist = []
+                self.txq_lock.release()
+
+                rout, wout, eout = select.select([self.socket], wlist, [], 1)
+            except:
+                print sys.exc_info()
+                self.logger.error("Select error, exiting")
+                break
+
+            if not self.running:
+                break
+
+            if wout:
+                self.txq_lock.acquire()
+                if self.txq:
+                    retval = self.socket.send(self.txq[0])
+                    if retval > 0:
+                        self.txq[0] = self.txq[0][retval:]
+                        if len(self.txq[0]) == 0:
+                            self.txq = self.txq[1:]
+                self.txq_lock.release()
+
+            if rout:
+                if len(rxbuf) < 2:
+                    n = 2 - len(rxbuf)
+                else:
+                    frame_len = struct.unpack('>h', rxbuf[:2])[0]
+                    n = (2 + frame_len) - len(rxbuf)
+
+                data = self.socket.recv(n)
+                rxbuf += data
+                if len(data) == n and len(rxbuf) > 2:
+                    rcvtime = time.clock()
+                    self.logger.debug("Pkt len " + str(len(rxbuf)) +
+                             " in at " + str(rcvtime) + " on port " +
+                             str(self.port_number))
+
+                    # Enqueue packet
+                    with self.parent.pkt_sync:
+                        if len(self.packets) >= self.max_pkts:
+                            # Queue full, throw away oldest
+                            self.packets.pop(0)
+                            self.packets_discarded += 1
+                            self.logger.debug("Discarding oldest packet to make room")
+                        self.packets.append((rxbuf[2:], rcvtime))
+                        self.packets_total += 1
+                        self.parent.pkt_sync.notify_all()
+
+                    rxbuf = ''
+
+        self.logger.info("Thread exit")
+
+    def kill(self):
+        """
+        Terminate the running thread
+        """
+        self.logger.debug("Port monitor kill")
+        self.running = False
+        try:
+            self.socket.close()
+        except:
+            self.logger.info("Ignoring dataplane soc shutdown error")
+
+    def timestamp_head(self):
+        """
+        Return the timestamp of the head of queue or None if empty
+        """
+        rv = None
+        try:
+            rv = self.packets[0][1]
+        except:
+            rv = None
+        return rv
+
+    def flush(self):
+        """
+        Clear the packet queue
+        """
+        with self.parent.pkt_sync:
+            self.packets_discarded += len(self.packets)
+            self.packets = []
+
+    def send(self, packet):
+        """
+        Send a packet to the dataplane port
+        @param packet The packet data to send to the port
+        @retval The number of bytes sent
+        """
+
+        self.txq_lock.acquire()
+        if len(self.txq) < self.max_pkts:
+            self.txq.append(struct.pack('>h', len(packet)) + packet)
+            retval = len(packet)
+        else:
+            retval = 0
+        self.txq_lock.release()
+
+        return retval
+
+    def register(self, handler):
+        """
+        Register a callback function to receive packets from this
+        port.  The callback will be passed the packet, the
+        interface name and the port number (if set) on which the
+        packet was received.
+
+        To be implemented
+        """
+        pass
+
+    def show(self, prefix=''):
+        print prefix + "Name:          " + self.interface_name
+        print prefix + "Pkts pending:  " + str(len(self.packets))
+        print prefix + "Pkts total:    " + str(self.packets_total)
+        print prefix + "socket:        " + str(self.socket)
+
+# Update this dictionary to suit your environment.
+dummy_port_map = {
+    1 : "p1",
+    2 : "p2",
+    3 : "p3",
+    4 : "p4"
+}
+
+def platform_config_update(config):
+    """
+    Update configuration for the dummy platform
+
+    @param config The configuration dictionary to use/update
+    """
+    global dummy_port_map
+    config["port_map"] = dummy_port_map.copy()
+    config["caps_table_idx"] = 0
+    config["dataplane"] = {"portclass": DataPlanePortOVSDummy}
+    config["allow_user"] = True
diff --git a/src/python/oftest/controller.py b/src/python/oftest/controller.py
index cfe2eda..fff8534 100644
--- a/src/python/oftest/controller.py
+++ b/src/python/oftest/controller.py
@@ -295,7 +295,7 @@
                 self.logger.warning("Error on listen socket accept")
                 return -1
             self.socs.append(sock)
-            self.logger.info("Incoming connection from %s" % str(addr))
+            self.logger.info(self.host+":"+str(self.port)+": Incoming connection from "+str(addr))
 
             with self.connect_cv:
                 (self.switch_socket, self.switch_addr) = (sock, addr)
@@ -496,7 +496,7 @@
         If an error occurs, (None, None) is returned
         """
 
-        if exp_msg:
+        if exp_msg is not None:
             self.logger.debug("Poll for %s" % ofp_type_map[exp_msg])
         else:
             self.logger.debug("Poll for any OF message")
@@ -504,7 +504,7 @@
         # Take the packet from the queue
         def grab():
             if len(self.packets) > 0:
-                if not exp_msg:
+                if exp_msg is None:
                     self.logger.debug("Looking for any packet")
                     (msg, pkt) = self.packets.pop(0)
                     return (msg, pkt)
diff --git a/src/python/oftest/testutils.py b/src/python/oftest/testutils.py
index 53c18fe..8b0b3b6 100644
--- a/src/python/oftest/testutils.py
+++ b/src/python/oftest/testutils.py
@@ -109,9 +109,64 @@
                 scapy.TCP(sport=tcp_sport, dport=tcp_dport)
 
     pkt = pkt/("D" * (pktlen - len(pkt)))
-    
-    #print pkt.show()
-    #print scapy.Ether(str(pkt)).show()
+
+    return pkt
+
+def simple_udp_packet(pktlen=100,
+                      dl_dst='00:01:02:03:04:05',
+                      dl_src='00:06:07:08:09:0a',
+                      dl_vlan_enable=False,
+                      dl_vlan=0,
+                      dl_vlan_pcp=0,
+                      dl_vlan_cfi=0,
+                      ip_src='192.168.0.1',
+                      ip_dst='192.168.0.2',
+                      ip_tos=0,
+                      udp_sport=1234,
+                      udp_dport=80,
+                      ip_ihl=None,
+                      ip_options=False
+                      ):
+    """
+    Return a simple dataplane UDP packet
+
+    Supports a few parameters:
+    @param len Length of packet in bytes w/o CRC
+    @param dl_dst Destination MAC
+    @param dl_src Source MAC
+    @param dl_vlan_enable True if the packet is with vlan, False otherwise
+    @param dl_vlan VLAN ID
+    @param dl_vlan_pcp VLAN priority
+    @param ip_src IP source
+    @param ip_dst IP destination
+    @param ip_tos IP ToS
+    @param udp_dport UDP destination port
+    @param udp_sport UDP source port
+
+    Generates a simple UDP packet. Users shouldn't assume anything about
+    this packet other than that it is a valid ethernet/IP/UDP frame.
+    """
+
+    if MINSIZE > pktlen:
+        pktlen = MINSIZE
+
+    # Note Dot1Q.id is really CFI
+    if (dl_vlan_enable):
+        pkt = scapy.Ether(dst=dl_dst, src=dl_src)/ \
+            scapy.Dot1Q(prio=dl_vlan_pcp, id=dl_vlan_cfi, vlan=dl_vlan)/ \
+            scapy.IP(src=ip_src, dst=ip_dst, tos=ip_tos, ihl=ip_ihl)/ \
+            scapy.UDP(sport=udp_sport, dport=udp_dport)
+    else:
+        if not ip_options:
+            pkt = scapy.Ether(dst=dl_dst, src=dl_src)/ \
+                scapy.IP(src=ip_src, dst=ip_dst, tos=ip_tos, ihl=ip_ihl)/ \
+                scapy.UDP(sport=udp_sport, dport=udp_dport)
+        else:
+            pkt = scapy.Ether(dst=dl_dst, src=dl_src)/ \
+                scapy.IP(src=ip_src, dst=ip_dst, tos=ip_tos, ihl=ip_ihl, options=ip_options)/ \
+                scapy.UDP(sport=udp_sport, dport=udp_dport)
+
+    pkt = pkt/("D" * (pktlen - len(pkt)))
 
     return pkt
 
@@ -316,6 +371,8 @@
                                 + str(exp_pkt).encode('hex'))
             logging.debug("Received len " + str(len(rcv_pkt)) + ": "
                                 + str(rcv_pkt).encode('hex'))
+            logging.debug("Expected packet: " + inspect_packet(scapy.Ether(str(exp_pkt))))
+            logging.debug("Received packet: " + inspect_packet(scapy.Ether(str(rcv_pkt))))
         parent.assertEqual(str(exp_pkt), str(rcv_pkt),
                            "Packet match error on port " + str(check_port))
 
@@ -639,13 +696,19 @@
     elif field_to_mod == 'tcp_dport':
         act = action.action_set_tp_dst()
         act.tp_port = mod_field_vals['tcp_dport']
+    elif field_to_mod == 'udp_sport':
+        act = action.action_set_tp_src()
+        act.tp_port = mod_field_vals['udp_sport']
+    elif field_to_mod == 'udp_dport':
+        act = action.action_set_tp_dst()
+        act.tp_port = mod_field_vals['udp_dport']
     else:
         parent.assertTrue(0, "Unknown field to modify: " + str(field_to_mod))
 
     return act
 
 def pkt_action_setup(parent, start_field_vals={}, mod_field_vals={}, 
-                     mod_fields=[], check_test_params=False):
+                     mod_fields=[], tp="tcp", check_test_params=False):
     """
     Set up the ingress and expected packet and action list for a test
 
@@ -671,8 +734,12 @@
     base_pkt_params['ip_src'] = '192.168.0.1'
     base_pkt_params['ip_dst'] = '192.168.0.2'
     base_pkt_params['ip_tos'] = 0
-    base_pkt_params['tcp_sport'] = 1234
-    base_pkt_params['tcp_dport'] = 80
+    if tp == "tcp":
+        base_pkt_params['tcp_sport'] = 1234
+        base_pkt_params['tcp_dport'] = 80
+    elif tp == "udp":
+        base_pkt_params['udp_sport'] = 1234
+        base_pkt_params['udp_dport'] = 80
     for keyname in start_field_vals.keys():
         base_pkt_params[keyname] = start_field_vals[keyname]
 
@@ -686,8 +753,12 @@
     mod_pkt_params['ip_src'] = '10.20.30.40'
     mod_pkt_params['ip_dst'] = '50.60.70.80'
     mod_pkt_params['ip_tos'] = 0xf0
-    mod_pkt_params['tcp_sport'] = 4321
-    mod_pkt_params['tcp_dport'] = 8765
+    if tp == "tcp":
+        mod_pkt_params['tcp_sport'] = 4321
+        mod_pkt_params['tcp_dport'] = 8765
+    elif tp == "udp":
+        mod_pkt_params['udp_sport'] = 4321
+        mod_pkt_params['udp_dport'] = 8765
     for keyname in mod_field_vals.keys():
         mod_pkt_params[keyname] = mod_field_vals[keyname]
 
@@ -722,8 +793,15 @@
             mod_fields.append('dl_vlan_enable')
             mod_fields.append('pktlen')
 
+    if tp == "tcp":
+        packet_builder = simple_tcp_packet
+    elif tp == "udp":
+        packet_builder = simple_udp_packet
+    else:
+        raise NotImplementedError("unknown transport protocol %s" % tp)
+
     # Build the ingress packet
-    ingress_pkt = simple_tcp_packet(**base_pkt_params)
+    ingress_pkt = packet_builder(**base_pkt_params)
 
     # Build the expected packet, modifying the indicated fields
     for item in mod_fields:
@@ -732,7 +810,7 @@
         if act:
             new_actions.append(act)
 
-    expected_pkt = simple_tcp_packet(**base_pkt_params)
+    expected_pkt = packet_builder(**base_pkt_params)
 
     return (ingress_pkt, expected_pkt, new_actions)
 
@@ -823,3 +901,20 @@
 def format_packet(pkt):
     return "Packet length %d \n%s" % (len(str(pkt)), 
                                       hex_dump_buffer(str(pkt)))
+
+def inspect_packet(pkt):
+    """
+    Wrapper around scapy's show() method.
+    @returns A string showing the dissected packet.
+    """
+    from cStringIO import StringIO
+    out = None
+    backup = sys.stdout
+    try:
+        sys.stdout = StringIO()
+        pkt.show2()
+        out = sys.stdout.getvalue()
+        sys.stdout.close()
+    finally:
+        sys.stdout = backup
+    return out
diff --git a/tests/cxn.py b/tests/cxn.py
index 5524434..e9059cc 100644
--- a/tests/cxn.py
+++ b/tests/cxn.py
@@ -112,55 +112,84 @@
         self.num_controllers = test_param_get('num_controllers', default=1)
         self.controller_timeout = test_param_get('controller_timeout',
                                                  default=-1)
+        self.hello_timeout = test_param_get('hello_timeout',
+                                            default=5)
+        self.features_req_timeout = test_param_get('features_req_timeout',
+                                                   default=5)
 
         for i in range(self.num_controllers):
             self.controllerSetup(config["controller_host"],
                                  config["controller_port"]+i)
         for i in range(self.num_controllers):
-            self.controllers[i].handshake_done = False
+            self.controllers[i].cstate = 0
+            self.controllers[i].keep_alive = True
+        tick = 0.1  # time period in seconds at which controllers are handled
 
-        # try to maintain switch connections for specified timeout
-        # -1 means forever
         while True:
             for con in self.controllers:
-                if con.switch_socket and con.handshake_done:
-                    if (self.controller_timeout < 0 or
-                        con.count < self.controller_timeout):
-                        logging.info(con.host + ":" + str(con.port) + 
-                                     ": maintaining connection to " +
-                                     str(con.switch_addr))
-                        con.count = con.count + 1
-                    else:
-                        logging.info(con.host + ":" + str(con.port) + 
-                                     ": disconnecting from " +
-                                     str(con.switch_addr))
-                        con.disconnect()
-                        con.handshake_done = False
-                        con.count = 0
-                    time.sleep(1)
-                else:
-                    #@todo Add an option to wait for a pkt transaction to 
-                    # ensure version compatibilty?
-                    con.connect(self.default_timeout)
-                    if not con.switch_socket:
-                        logging.info("Did not connect to switch")
-                        continue
-                    logging.info("TCP Connected " + str(con.switch_addr))
-                    logging.info("Sending hello")
-                    con.message_send(message.hello())
-                    request = message.features_request()
-                    reply, pkt = con.transact(request, 
-                                              timeout=self.default_timeout)
-                    if reply:
-                        logging.info("Handshake complete with " + 
-                                    str(con.switch_addr))
-                        con.handshake_done = True
-                        con.keep_alive = True
-                        con.count = 0
-                    else:
-                        logging.info("Did not complete features_request " +
-                                     "for handshake")
-                        con.disconnect()
-                        con.handshake_done = False
-                        con.count = 0
+                condesc = con.host + ":" + str(con.port) + ": "
+                logging.debug("Checking " + condesc)
 
+                if con.switch_socket: 
+                    if con.cstate == 0:
+                        logging.info(condesc + "Sending hello to " +
+                                     str(con.switch_addr))
+                        con.message_send(message.hello())
+                        con.cstate = 1
+                        con.count = 0
+                    elif con.cstate == 1:
+                        reply, pkt = con.poll(exp_msg=ofp.OFPT_HELLO,
+                                              timeout=0)
+                        if reply is not None:
+                            logging.info(condesc + 
+                                         "Hello received from " +
+                                         str(con.switch_addr))
+                            con.cstate = 2
+                        else:
+                            con.count = con.count + 1
+                            # fall back to previous state on timeout
+                            if con.count >= self.hello_timeout/tick:
+                                logging.info(condesc + 
+                                             "Timeout hello from " +
+                                             str(con.switch_addr))
+                                con.cstate = 0
+                    elif con.cstate == 2:
+                        logging.info(condesc + "Sending features request to " +
+                                     str(con.switch_addr))
+                        con.message_send(message.features_request())
+                        con.cstate = 3
+                        con.count = 0
+                    elif con.cstate == 3:
+                        reply, pkt = con.poll(exp_msg=ofp.OFPT_FEATURES_REPLY,
+                                              timeout=0)
+                        if reply is not None:
+                            logging.info(condesc + 
+                                         "Features request received from " +
+                                         str(con.switch_addr))
+                            con.cstate = 4
+                            con.count = 0
+                        else:
+                            con.count = con.count + 1
+                            # fall back to previous state on timeout
+                            if con.count >= self.features_req_timeout/tick:
+                                logging.info(condesc +
+                                             "Timeout features request from " +
+                                             str(con.switch_addr))
+                                con.cstate = 2
+                    elif con.cstate == 4:
+                        if (self.controller_timeout < 0 or
+                            con.count < self.controller_timeout/tick):
+                            logging.debug(condesc +
+                                          "Maintaining connection to " +
+                                          str(con.switch_addr))
+                            con.count = con.count + 1
+                        else:
+                            logging.info(condesc + 
+                                         "Disconnecting from " +
+                                         str(con.switch_addr))
+                            con.disconnect()
+                            con.cstate = 0
+                else:
+                    con.cstate = 0
+
+            time.sleep(tick)
diff --git a/tests/pktact.py b/tests/pktact.py
index 34654d1..25ea9cd 100644
--- a/tests/pktact.py
+++ b/tests/pktact.py
@@ -1546,6 +1546,21 @@
         flow_match_test(self, config["port_map"], pkt=pkt, exp_pkt=exp_pkt, 
                         action_list=acts, max_test=2)
 
+class ModifyL4SrcUdp(BaseMatchCase):
+    """
+    Modify the source UDP port of a UDP packet
+    """
+    def runTest(self):
+        sup_acts = self.supported_actions
+        if not (sup_acts & 1 << ofp.OFPAT_SET_TP_DST):
+            skip_message_emit(self, "ModifyL4SrcUdp test")
+            return
+
+        (pkt, exp_pkt, acts) = pkt_action_setup(self, mod_fields=['udp_sport'],
+                                                check_test_params=True, tp="udp")
+        flow_match_test(self, config["port_map"], pkt=pkt, exp_pkt=exp_pkt,
+                        action_list=acts, max_test=2)
+
 class ModifyL4Dst(BaseMatchCase):
     """
     Modify the dest TCP port of a TCP packet (TP1)
@@ -1561,6 +1576,21 @@
         flow_match_test(self, config["port_map"], pkt=pkt, exp_pkt=exp_pkt, 
                         action_list=acts, max_test=2)
 
+class ModifyL4DstUdp(BaseMatchCase):
+    """
+    Modify the dest UDP port of a UDP packet
+    """
+    def runTest(self):
+        sup_acts = self.supported_actions
+        if not (sup_acts & 1 << ofp.OFPAT_SET_TP_DST):
+            skip_message_emit(self, "ModifyL4DstUdp test")
+            return
+
+        (pkt, exp_pkt, acts) = pkt_action_setup(self, mod_fields=['udp_dport'],
+                                                check_test_params=True, tp="udp")
+        flow_match_test(self, config["port_map"], pkt=pkt, exp_pkt=exp_pkt,
+                        action_list=acts, max_test=2)
+
 class ModifyTOS(BaseMatchCase):
     """
     Modify the IP type of service of an IP packet (TP1)
@@ -1961,16 +1991,6 @@
         # - VLAN?
         # - action
 
-    def pktToStr(self, pkt):
-        from cStringIO import StringIO
-        backup = sys.stdout
-        sys.stdout = StringIO()
-        pkt.show2()
-        out = sys.stdout.getvalue() 
-        sys.stdout.close() 
-        sys.stdout = backup
-        return out
-
     def createMatch(self, **kwargs):
         match = ofp.ofp_match()
         match.wildcards = ofp.OFPFW_ALL
@@ -2014,7 +2034,7 @@
         logging.info("Ingress %s to egress %s" % 
                        (str(ingress_port), str(egress_port)))
         logging.info("Packet:")
-        logging.info(self.pktToStr(pkt))
+        logging.info(inspect_packet(pkt))
 
         match.in_port = ingress_port
 
@@ -2078,7 +2098,7 @@
             if str_pkt != str_rcv_pkt:
                 logging.error("Response packet does not match send packet")
                 logging.info("Response:")
-                logging.info(self.pktToStr(scapy.Ether(rcv_pkt)))
+                logging.info(inspect_packet(scapy.Ether(rcv_pkt)))
             self.assertEqual(str_pkt, str_rcv_pkt,
                              'Response packet does not match send packet')
         elif expected_result == self.RESULT_NOMATCH:
diff --git a/tests/port_stats.py b/tests/port_stats.py
index 670e31f..89b2cf6 100644
--- a/tests/port_stats.py
+++ b/tests/port_stats.py
@@ -78,6 +78,21 @@
     logging.info("Port %d stats count: tx %d rx %d" % (port, packet_sent, packet_recv))
     return packet_sent, packet_recv
 
+def getAllStats(obj):
+    stat_req = message.port_stats_request()
+    stat_req.port_no = ofp.OFPP_NONE
+
+    logging.info("Sending all port stats request")
+    response, pkt = obj.controller.transact(stat_req, timeout=2)
+    obj.assertTrue(response is not None, 
+                    "No response to stats request")
+    obj.assertTrue(len(response.stats) >= 3,
+                    "Did not receive all port stats reply")
+    stats = {}
+    for item in response.stats:
+        stats[ item.port_no ] = ( item.tx_packets, item.rx_packets )
+    return stats
+
 def verifyStats(obj, port, test_timeout, packet_sent, packet_recv):
     stat_req = message.port_stats_request()
     stat_req.port_no = port
@@ -179,7 +194,6 @@
         verifyStats(self, ingress_port, test_timeout, initTxInPort, initRxInPort + num_sends)
         verifyStats(self, egress_port, test_timeout, initTxOutPort + num_sends, initRxOutPort)
 
-
 class MultiFlowStats(base_tests.SimpleDataPlane):
     """
     Verify flow stats are properly retrieved.
@@ -260,3 +274,80 @@
                     initTxOutPort1 + num_pkt1s, initRxOutPort1)
         verifyStats(self, egress_port2, test_timeout,
                     initTxOutPort2 + num_pkt2s, initRxOutPort2)
+
+class AllPortStats(base_tests.SimpleDataPlane):
+    """
+    Verify all port stats are properly retrieved.
+
+    First, get stats from each port. Then get all port stats, verify
+    consistency with single port stats.
+    """
+
+    # TODO: This is copied from MultiFlowStats. Need to combine.
+    def buildFlowModMsg(self, pkt, ingress_port, egress_port):
+        match = packet_to_flow_match(self, pkt)
+        match.wildcards &= ~ofp.OFPFW_IN_PORT
+        self.assertTrue(match is not None, 
+                        "Could not generate flow match from pkt")
+        match.in_port = ingress_port
+        
+        flow_mod_msg = message.flow_mod()
+        flow_mod_msg.match = match
+        flow_mod_msg.cookie = random.randint(0,9007199254740992)
+        flow_mod_msg.buffer_id = 0xffffffff
+        flow_mod_msg.idle_timeout = 0
+        flow_mod_msg.hard_timeout = 0
+        act = action.action_output()
+        act.port = egress_port
+        self.assertTrue(flow_mod_msg.actions.add(act), "Could not add action")
+
+        logging.info("Ingress " + str(ingress_port) + 
+                       " to egress " + str(egress_port))
+
+        return flow_mod_msg
+
+    def runTest(self):
+        # TODO: set from command-line parameter
+        test_timeout = 60
+
+        of_ports = config["port_map"].keys()
+        of_ports.sort()
+        self.assertTrue(len(of_ports) >= 3, "Not enough ports for test")
+        port0 = of_ports[0];
+        port1 = of_ports[1];
+        port2 = of_ports[2];
+
+        # construct some packets and flows, send to switch
+        pkt1 = simple_tcp_packet()
+        flow_mod_msg1 = self.buildFlowModMsg(pkt1, port0, port1)
+       
+        pkt2 = simple_tcp_packet(dl_src='0:7:7:7:7:7')
+        flow_mod_msg2 = self.buildFlowModMsg(pkt2, port0, port2)
+       
+        logging.info("Inserting flow1")
+        rv = self.controller.message_send(flow_mod_msg1)
+        self.assertTrue(rv != -1, "Error installing flow mod")
+        logging.info("Inserting flow2")
+        rv = self.controller.message_send(flow_mod_msg2)
+        self.assertTrue(rv != -1, "Error installing flow mod")
+        self.assertEqual(do_barrier(self.controller), 0, "Barrier failed")
+
+        num_pkt1s = random.randint(5,10)
+        logging.info("Sending " + str(num_pkt1s) + " pkt1s")
+        num_pkt2s = random.randint(10,15)
+        logging.info("Sending " + str(num_pkt2s) + " pkt2s")
+        for i in range(0,num_pkt1s):
+            sendPacket(self, pkt1, port0, port1, test_timeout)
+        for i in range(0,num_pkt2s):
+            sendPacket(self, pkt2, port0, port2, test_timeout)
+
+        # get individual port stats count
+        port_stats = {}
+        port_stats[ port0 ] = getStats(self, port0)
+        port_stats[ port1 ] = getStats(self, port1)
+        port_stats[ port2 ] = getStats(self, port2)
+
+        all_stats = getAllStats(self)
+        self.assertEqual(port_stats[ port0 ], all_stats[ port0 ])
+        self.assertEqual(port_stats[ port1 ], all_stats[ port1 ])
+        self.assertEqual(port_stats[ port2 ], all_stats[ port2 ])