Initial set of Fabric switch test cases

Change-Id: I86fd2b67d3b773aa496f5ef61f1e1fdf51fd9925
diff --git a/Fabric/Utilities/src/python/oftest/ b/Fabric/Utilities/src/python/oftest/
new file mode 100755
index 0000000..e52bfbe
--- /dev/null
+++ b/Fabric/Utilities/src/python/oftest/
@@ -0,0 +1,767 @@
+# Copyright 2017-present Open Networking Foundation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+OpenFlow Test Framework
+Controller class
+Provide the interface to the control channel to the switch under test.  
+Class inherits from thread so as to run in background allowing
+asynchronous callbacks (if needed, not required).  Also supports
+The controller thread maintains a queue.  Incoming messages that
+are not handled by a callback function are placed in this queue for 
+poll calls.  
+Callbacks and polling support specifying the message type
+@todo Support transaction semantics via xid
+@todo Support select and listen on an administrative socket (or
+use a timeout to support clean shutdown).
+Currently only one connection is accepted during the life of
+the controller.   There seems
+to be no clean way to interrupt an accept call.  Using select that also listens
+on an administrative socket and can shut down the socket might work.
+import sys
+import os
+import socket
+import time
+import struct
+import select
+import logging
+from threading import Thread
+from threading import Lock
+from threading import Condition
+import ofutils
+import loxi
+# Configured openflow version
+import ofp as cfg_ofp
+FILTER = ''.join( [ (len( repr( chr( x ) ) ) == 3) and chr( x ) or '.'
+                    for x in range( 256 ) ] )
+def hex_dump_buffer( src, length=16 ):
+    """
+    Convert src to a hex dump string and return the string
+    @param src The source buffer
+    @param length The number of bytes shown in each line
+    @returns A string showing the hex dump
+    """
+    result = [ "\n" ]
+    for i in xrange( 0, len( src ), length ):
+        chars = src[ i:i + length ]
+        hex = ' '.join( [ "%02x" % ord( x ) for x in chars ] )
+        printable = ''.join( [ "%s" % ((ord( x ) <= 127 and
+                                        FILTER[ ord( x ) ]) or '.') for x in
+                               chars ] )
+        result.append( "%04x  %-*s  %s\n" % (i, length * 3, hex, printable) )
+    return ''.join( result )
+##@todo Find a better home for these identifiers (controller)
+class Controller( Thread ):
+    """
+    Class abstracting the control interface to the switch.  
+    For receiving messages, two mechanism will be implemented.  First,
+    query the interface with poll.  Second, register to have a
+    function called by message type.  The callback is passed the
+    message type as well as the raw packet (or message object)
+    One of the main purposes of this object is to translate between network 
+    and host byte order.  'Above' this object, things should be in host
+    byte order.
+    @todo Consider using SocketServer for listening socket
+    @todo Test transaction code
+    @var rcv_size The receive size to use for receive calls
+    @var max_pkts The max size of the receive queue
+    @var keep_alive If true, listen for echo requests and respond w/
+    echo replies
+    @var initial_hello If true, will send a hello message immediately
+    upon connecting to the switch
+    @var switch If not None, do an active connection to the switch
+    @var host The host to use for connect
+    @var port The port to connect on 
+    @var packets_total Total number of packets received
+    @var packets_expired Number of packets popped from queue as queue full
+    @var packets_handled Number of packets handled by something
+    @var dbg_state Debug indication of state
+    """
+    def __init__( self, switch=None, host='', port=6653, max_pkts=1024,
+                  force=False ):
+        Thread.__init__( self )
+        # Socket related
+        self.rcv_size = RCV_SIZE_DEFAULT
+        self.listen_socket = None
+        self.switch_socket = None
+        self.switch_addr = None
+        self.connect_cv = Condition( )
+        self.message_cv = Condition( )
+        self.tx_lock = Lock( )
+        # Used to wake up the event loop from another thread
+        self.waker = ofutils.EventDescriptor( )
+        # Counters
+        self.socket_errors = 0
+        self.parse_errors = 0
+        self.packets_total = 0
+        self.packets_expired = 0
+        self.packets_handled = 0
+        self.poll_discards = 0
+        # State
+        self.sync = Lock( )
+        self.handlers = { }
+        self.keep_alive = False
+ = True
+        self.initial_hello = True
+        # OpenFlow message/packet queue
+        # Protected by the packets_cv lock / condition variable
+        self.packets = [ ]
+        self.packets_cv = Condition( )
+        self.packet_in_count = 0
+        # Settings
+        self.max_pkts = max_pkts
+        self.switch = switch
+        self.passive = not self.switch
+        self.force = force
+ = host
+        self.port = port
+        self.dbg_state = "init"
+        self.logger = logging.getLogger( "controller" )
+        self.filter_packet_in = False  # Drop "excessive" packet ins
+        self.pkt_in_run = 0  # Count on run of packet ins
+        self.pkt_in_filter_limit = 50  # Count on run of packet ins
+        self.pkt_in_dropped = 0  # Total dropped packet ins
+        self.transact_to = 15  # Transact timeout default value; add to config
+        # Transaction and message type waiting variables 
+        #   xid_cv: Condition variable (semaphore) for packet waiters
+        #   xid: Transaction ID being waited on
+        #   xid_response: Transaction response message
+        self.xid_cv = Condition( )
+        self.xid = None
+        self.xid_response = None
+        self.buffered_input = ""
+        # Create listen socket
+        if self.passive:
+   "Create/listen at " + + ":" +
+                              str( self.port ) )
+            ai = socket.getaddrinfo(, self.port, socket.AF_UNSPEC,
+                                     socket.SOCK_STREAM, 0, socket.AI_PASSIVE )
+            # Use first returned addrinfo
+            (family, socktype, proto, name, sockaddr) = ai[ 0 ]
+            self.listen_socket = socket.socket( family, socktype )
+            self.listen_socket.setsockopt( socket.SOL_SOCKET,
+                                           socket.SO_REUSEADDR, 1 )
+            self.listen_socket.bind( sockaddr )
+            self.listen_socket.listen( LISTEN_QUEUE_SIZE )
+    def filter_packet( self, rawmsg, hdr ):
+        """
+        Check if packet should be filtered
+        Currently filters packet in messages
+        @return Boolean, True if packet should be dropped
+        """
+        # XXX didn't actually check for packet-in...
+        return False
+        # Add check for packet in and rate limit
+        if self.filter_packet_in:
+            # If we were dropping packets, report number dropped
+            # TODO dont drop expected packet ins
+            if self.pkt_in_run > self.pkt_in_filter_limit:
+                self.logger.debug( "Dropped %d packet ins (%d total)"
+                                   % ((self.pkt_in_run -
+                                       self.pkt_in_filter_limit),
+                                      self.pkt_in_dropped) )
+            self.pkt_in_run = 0
+        return False
+    def _pkt_handle( self, pkt ):
+        """
+        Check for all packet handling conditions
+        Parse and verify message 
+        Check if XID matches something waiting
+        Check if message is being expected for a poll operation
+        Check if keep alive is on and message is an echo request
+        Check if any registered handler wants the packet
+        Enqueue if none of those conditions is met
+        an echo request in case keep_alive is true, followed by
+        registered message handlers.
+        @param pkt The raw packet (string) which may contain multiple OF msgs
+        """
+        # snag any left over data from last read()
+        pkt = self.buffered_input + pkt
+        self.buffered_input = ""
+        # Process each of the OF msgs inside the pkt
+        offset = 0
+        while offset < len( pkt ):
+            if offset + 8 > len( pkt ):
+                break
+            # Parse the header to get type
+            hdr_version, hdr_type, hdr_length, hdr_xid = cfg_ofp.message.parse_header(
+                    pkt[ offset: ] )
+            # Use loxi to resolve to ofp of matching version
+            ofp = loxi.protocol( hdr_version )
+            # Extract the raw message bytes
+            if (offset + hdr_length) > len( pkt ):
+                break
+            rawmsg = pkt[ offset: offset + hdr_length ]
+            offset += hdr_length
+            # if self.filter_packet(rawmsg, hdr):
+            #    continue
+            msg = ofp.message.parse_message( rawmsg )
+            if not msg:
+                self.parse_errors += 1
+                self.logger.warn( "Could not parse message" )
+                continue
+            self.logger.debug( "Msg in: version %d class %s len %d xid %d",
+                               hdr_version, type( msg ).__name__, hdr_length,
+                               hdr_xid )
+            with self.sync:
+                # Check if transaction is waiting
+                with self.xid_cv:
+                    if self.xid and hdr_xid == self.xid:
+                        self.logger.debug(
+                                "Matched expected XID " + str( hdr_xid ) )
+                        self.xid_response = (msg, rawmsg)
+                        self.xid = None
+                        self.xid_cv.notify( )
+                        continue
+                # Check if keep alive is set; if so, respond to echo requests
+                if self.keep_alive:
+                    if hdr_type == ofp.OFPT_ECHO_REQUEST:
+                        self.logger.debug( "Responding to echo request" )
+                        rep = ofp.message.echo_reply( )
+                        rep.xid = hdr_xid
+                        # Ignoring additional data
+                        self.message_send( rep )
+                        continue
+                # Generalize to counters for all packet types?
+                if msg.type == ofp.OFPT_PACKET_IN:
+                    self.packet_in_count += 1
+                # Log error messages
+                if isinstance( msg, ofp.message.error_msg ):
+                    # pylint: disable=E1103
+                    if msg.err_type in ofp.ofp_error_type_map:
+                        type_str = ofp.ofp_error_type_map[ msg.err_type ]
+                        if msg.err_type == ofp.OFPET_HELLO_FAILED:
+                            code_map = ofp.ofp_hello_failed_code_map
+                        elif msg.err_type == ofp.OFPET_BAD_REQUEST:
+                            code_map = ofp.ofp_bad_request_code_map
+                        elif msg.err_type == ofp.OFPET_BAD_ACTION:
+                            code_map = ofp.ofp_bad_action_code_map
+                        elif msg.err_type == ofp.OFPET_FLOW_MOD_FAILED:
+                            code_map = ofp.ofp_flow_mod_failed_code_map
+                        elif msg.err_type == ofp.OFPET_PORT_MOD_FAILED:
+                            code_map = ofp.ofp_port_mod_failed_code_map
+                        elif msg.err_type == ofp.OFPET_QUEUE_OP_FAILED:
+                            code_map = ofp.ofp_queue_op_failed_code_map
+                        else:
+                            code_map = None
+                        if code_map and msg.code in code_map:
+                            code_str = code_map[ msg.code ]
+                        else:
+                            code_str = "unknown"
+                    else:
+                        type_str = "unknown"
+                        code_str = "unknown"
+                    self.logger.warn(
+                            "Received error message: xid=%d type=%s (%d) code=%s (%d)",
+                            hdr_xid, type_str, msg.err_type, code_str,
+                            msg.code )
+                # Now check for message handlers; preference is given to
+                # handlers for a specific packet
+                handled = False
+                if hdr_type in self.handlers.keys( ):
+                    handled = self.handlers[ hdr_type ]( self, msg, rawmsg )
+                if not handled and ("all" in self.handlers.keys( )):
+                    handled = self.handlers[ "all" ]( self, msg, rawmsg )
+                if not handled:  # Not handled, enqueue
+                    with self.packets_cv:
+                        if len( self.packets ) >= self.max_pkts:
+                            self.packets.pop( 0 )
+                            self.packets_expired += 1
+                        self.packets.append( (msg, rawmsg) )
+                        self.packets_cv.notify_all( )
+                    self.packets_total += 1
+                else:
+                    self.packets_handled += 1
+                    self.logger.debug( "Message handled by callback" )
+        # end of 'while offset < len(pkt)'
+        #   note that if offset = len(pkt), this is
+        #   appends a harmless empty string
+        self.buffered_input += pkt[ offset: ]
+    def _socket_ready_handle( self, s ):
+        """
+        Handle an input-ready socket
+        @param s The socket object that is ready
+        @returns 0 on success, -1 on error
+        """
+        if self.passive and s and s == self.listen_socket:
+            if self.switch_socket:
+                self.logger.warning(
+                        "Ignoring incoming connection; already connected to switch" )
+                (sock, addr) = self.listen_socket.accept( )
+                sock.close( )
+                return 0
+            try:
+                (sock, addr) = self.listen_socket.accept( )
+            except:
+                self.logger.warning( "Error on listen socket accept" )
+                return -1
+   + ":" + str(
+                    self.port ) + ": Incoming connection from " + str( addr ) )
+            with self.connect_cv:
+                (self.switch_socket, self.switch_addr) = (sock, addr)
+                self.switch_socket.setsockopt( socket.IPPROTO_TCP,
+                                               socket.TCP_NODELAY, True )
+                if self.initial_hello:
+                    self.message_send( cfg_ofp.message.hello( ) )
+                self.connect_cv.notify( )  # Notify anyone waiting
+            # Prevent further connections
+            self.listen_socket.close( )
+            self.listen_socket = None
+        elif s and s == self.switch_socket:
+            for idx in range( 3 ):  # debug: try a couple of times
+                try:
+                    pkt = self.switch_socket.recv( self.rcv_size )
+                except:
+                    self.logger.warning( "Error on switch read" )
+                    return -1
+                if not
+                    return 0
+                if len( pkt ) == 0:
+                    self.logger.warning( "Zero-length switch read, %d" % idx )
+                else:
+                    break
+            if len( pkt ) == 0:  # Still no packet
+                self.logger.warning( "Zero-length switch read; closing cxn" )
+       str( self ) )
+                return -1
+            self._pkt_handle( pkt )
+        elif s and s == self.waker:
+            self.waker.wait( )
+        else:
+            self.logger.error( "Unknown socket ready: " + str( s ) )
+            return -1
+        return 0
+    def active_connect( self ):
+        """
+        Actively connect to a switch IP addr
+        """
+        try:
+   "Trying active connection to %s" % self.switch )
+            soc = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
+            soc.connect( (self.switch, self.port) )
+   "Connected to " + self.switch + " on " +
+                              str( self.port ) )
+            soc.setsockopt( socket.IPPROTO_TCP, socket.TCP_NODELAY, True )
+            self.switch_addr = (self.switch, self.port)
+            return soc
+        except (StandardError, socket.error), e:
+            self.logger.error( "Could not connect to %s at %d:: %s" %
+                               (self.switch, self.port, str( e )) )
+        return None
+    def wakeup( self ):
+        """
+        Wake up the event loop, presumably from another thread.
+        """
+        self.waker.notify( )
+    def sockets( self ):
+        """
+        Return list of sockets to select on.
+        """
+        socs = [ self.listen_socket, self.switch_socket, self.waker ]
+        return [ x for x in socs if x ]
+    def run( self ):
+        """
+        Activity function for class
+        Assumes connection to switch already exists.  Listens on
+        switch_socket for messages until an error (or zero len pkt)
+        occurs.
+        When there is a message on the socket, check for handlers; queue the
+        packet if no one handles the packet.
+        See note for controller describing the limitation of a single
+        connection for now.
+        """
+        self.dbg_state = "running"
+        while
+            try:
+                sel_in, sel_out, sel_err = \
+           self.sockets( ), [ ], self.sockets( ), 1 )
+            except:
+                print sys.exc_info( )
+                self.logger.error( "Select error, disconnecting" )
+                self.disconnect( )
+            for s in sel_err:
+                self.logger.error(
+                        "Got socket error on: " + str( s ) + ", disconnecting" )
+                self.disconnect( )
+            for s in sel_in:
+                if self._socket_ready_handle( s ) == -1:
+                    self.disconnect( )
+        # End of main loop
+        self.dbg_state = "closing"
+ "Exiting controller thread" )
+        self.shutdown( )
+    def connect( self, timeout=-1 ):
+        """
+        Connect to the switch
+        @param timeout Block for up to timeout seconds. Pass -1 for the default.
+        @return Boolean, True if connected
+        """
+        if not self.passive:  # Do active connection now
+   "Attempting to connect to %s on port %s" %
+                                  (self.switch, str( self.port )) )
+            soc = self.active_connect( )
+            if soc:
+       "Connected to %s", self.switch )
+                self.dbg_state = "running"
+                self.switch_socket = soc
+                self.wakeup( )
+                with self.connect_cv:
+                    if self.initial_hello:
+                        self.message_send( cfg_ofp.message.hello( ) )
+                    self.connect_cv.notify( )  # Notify anyone waiting
+            else:
+                self.logger.error( "Could not actively connect to switch %s",
+                                   self.switch )
+       = False
+        else:
+            with self.connect_cv:
+                ofutils.timed_wait( self.connect_cv, lambda: self.switch_socket,
+                                    timeout=timeout )
+        return self.switch_socket is not None
+    def disconnect( self, timeout=-1 ):
+        """
+        If connected to a switch, disconnect.
+        """
+        if self.switch_socket:
+            self.switch_socket.close( )
+            self.switch_socket = None
+            self.switch_addr = None
+            with self.packets_cv:
+                self.packets = [ ]
+            with self.connect_cv:
+                self.connect_cv.notifyAll( )
+    def wait_disconnected( self, timeout=-1 ):
+        """
+        @param timeout Block for up to timeout seconds. Pass -1 for the default.
+        @return Boolean, True if disconnected
+        """
+        with self.connect_cv:
+            ofutils.timed_wait( self.connect_cv,
+                                lambda: True if not self.switch_socket else None,
+                                timeout=timeout )
+        return self.switch_socket is None
+    def kill( self ):
+        """
+        Force the controller thread to quit
+        """
+ = False
+        self.wakeup( )
+        self.join( )
+    def shutdown( self ):
+        """
+        Shutdown the controller closing all sockets
+        @todo Might want to synchronize shutdown with self.sync...
+        """
+ = False
+        try:
+            self.switch_socket.shutdown( socket.SHUT_RDWR )
+        except:
+   "Ignoring switch soc shutdown error" )
+        self.switch_socket = None
+        try:
+            self.listen_socket.shutdown( socket.SHUT_RDWR )
+        except:
+   "Ignoring listen soc shutdown error" )
+        self.listen_socket = None
+        # Wakeup condition variables on which controller may be wait
+        with self.xid_cv:
+            self.xid_cv.notifyAll( )
+        with self.connect_cv:
+            self.connect_cv.notifyAll( )
+        self.wakeup( )
+        self.dbg_state = "down"
+    def register( self, msg_type, handler ):
+        """
+        Register a callback to receive a specific message type.
+        Only one handler may be registered for a given message type.
+        WARNING:  A lock is held during the handler call back, so 
+        the handler should not make any blocking calls
+        @param msg_type The type of message to receive.  May be DEFAULT 
+        for all non-handled packets.  The special type, the string "all"
+        will send all packets to the handler.
+        @param handler The function to call when a message of the given 
+        type is received.
+        """
+        # Should check type is valid
+        if not handler and msg_type in self.handlers.keys( ):
+            del self.handlers[ msg_type ]
+            return
+        self.handlers[ msg_type ] = handler
+    def poll( self, exp_msg=None, timeout=-1 ):
+        """
+        Wait for the next OF message received from the switch.
+        @param exp_msg If set, return only when this type of message 
+        is received (unless timeout occurs).
+        @param timeout Maximum number of seconds to wait for the message.
+        Pass -1 for the default timeout.
+        @retval A pair (msg, pkt) where msg is a message object and pkt
+        the string representing the packet as received from the socket.
+        This allows additional parsing by the receiver if necessary.
+        The data members in the message are in host endian order.
+        If an error occurs, (None, None) is returned
+        """
+        if exp_msg is None:
+            self.logger.warn( "DEPRECATED polling for any message class" )
+            klass = None
+        elif isinstance( exp_msg, int ):
+            klass = cfg_ofp.message.message.subtypes[ exp_msg ]
+        elif issubclass( exp_msg, loxi.OFObject ):
+            klass = exp_msg
+        else:
+            raise ValueError( "Unexpected exp_msg argument %r" % exp_msg )
+        self.logger.debug( "Polling for %s", klass.__name__ )
+        # Take the packet from the queue
+        def grab( ):
+            for i, (msg, pkt) in enumerate( self.packets ):
+                if klass is None or isinstance( msg, klass ):
+                    self.logger.debug( "Got %s message",
+                                       msg.__class__.__name__ )
+                    return self.packets.pop( i )
+            # Not found
+            self.logger.debug( "%s message not in queue", klass.__name__ )
+            return None
+        with self.packets_cv:
+            ret = ofutils.timed_wait( self.packets_cv, grab, timeout=timeout )
+        if ret != None:
+            (msg, pkt) = ret
+            return (msg, pkt)
+        else:
+            return (None, None)
+    def transact( self, msg, timeout=-1 ):
+        """
+        Run a message transaction with the switch
+        Send the message in msg and wait for a reply with a matching
+        transaction id.  Transactions have the highest priority in
+        received message handling.
+        @param msg The message object to send; must not be a string
+        @param timeout The timeout in seconds; if -1 use default.
+        """
+        if msg.xid == None:
+            msg.xid = ofutils.gen_xid( )
+        self.logger.debug( "Running transaction %d" % msg.xid )
+        with self.xid_cv:
+            if self.xid:
+                self.logger.error( "Can only run one transaction at a time" )
+                return (None, None)
+            self.xid = msg.xid
+            self.xid_response = None
+            self.message_send( msg )
+            self.logger.debug( "Waiting for transaction %d" % msg.xid )
+            ofutils.timed_wait( self.xid_cv, lambda: self.xid_response,
+                                timeout=timeout )
+            if self.xid_response:
+                (resp, pkt) = self.xid_response
+                self.xid_response = None
+            else:
+                (resp, pkt) = (None, None)
+        if resp is None:
+            self.logger.warning( "No response for xid " + str( self.xid ) )
+        return (resp, pkt)
+    def message_send( self, msg ):
+        """
+        Send the message to the switch
+        @param msg A string or OpenFlow message object to be forwarded to
+        the switch.
+        """
+        if not self.switch_socket:
+            # Sending a string indicates the message is ready to go
+            raise Exception( "no socket" )
+        if msg.xid == None:
+            msg.xid = ofutils.gen_xid( )
+        outpkt = msg.pack( )
+        self.logger.debug( "Msg out: version %d class %s len %d xid %d",
+                           msg.version, type( msg ).__name__, len( outpkt ),
+                           msg.xid )
+        with self.tx_lock:
+            if self.switch_socket.sendall( outpkt ) is not None:
+                raise AssertionError( "failed to send message to switch" )
+        return 0  # for backwards compatibility
+    def clear_queue( self ):
+        """
+        Clear the input queue and report the number of messages
+        that were in it
+        """
+        enqueued_pkt_count = len( self.packets )
+        with self.packets_cv:
+            self.packets = [ ]
+        return enqueued_pkt_count
+    def __str__( self ):
+        string = "Controller:\n"
+        string += "  state           " + self.dbg_state + "\n"
+        string += "  switch_addr     " + str( self.switch_addr ) + "\n"
+        string += "  pending pkts    " + str( len( self.packets ) ) + "\n"
+        string += "  total pkts      " + str( self.packets_total ) + "\n"
+        string += "  expired pkts    " + str( self.packets_expired ) + "\n"
+        string += "  handled pkts    " + str( self.packets_handled ) + "\n"
+        string += "  poll discards   " + str( self.poll_discards ) + "\n"
+        string += "  parse errors    " + str( self.parse_errors ) + "\n"
+        string += "  sock errrors    " + str( self.socket_errors ) + "\n"
+        string += "  max pkts        " + str( self.max_pkts ) + "\n"
+        string += "  target switch   " + str( self.switch ) + "\n"
+        string += "  host            " + str( ) + "\n"
+        string += "  port            " + str( self.port ) + "\n"
+        string += "  keep_alive      " + str( self.keep_alive ) + "\n"
+        string += "  pkt_in_run      " + str( self.pkt_in_run ) + "\n"
+        string += "  pkt_in_dropped  " + str( self.pkt_in_dropped ) + "\n"
+        return string
+    def show( self ):
+        print str( self )
+def sample_handler( controller, msg, pkt ):
+    """
+    Sample message handler
+    This is the prototype for functions registered with the controller
+    class for packet reception
+    @param controller The controller calling the handler
+    @param msg The parsed message object
+    @param pkt The raw packet that was received on the socket.  This is
+    in case the packet contains extra unparsed data.
+    @returns Boolean value indicating if the packet was handled.  If
+    not handled, the packet is placed in the queue for pollers to received
+    """
+    pass