Aggressively purging Accton switch
Change-Id: Ie523895ecbb0a84b155b621f6b7cb8677a2edc83
diff --git a/ofdpa/flows.py b/ofdpa/flows.py
index 36db256..b2a11be 100755
--- a/ofdpa/flows.py
+++ b/ofdpa/flows.py
@@ -190,13 +190,12 @@
"""
def runTest(self):
-
+ Groups = Queue.LifoQueue()
try:
intf_src_mac=[0x00, 0x00, 0x00, 0xcc, 0xcc, 0xcc]
dst_mac=[0x00, 0x00, 0x00, 0x22, 0x22, 0x00]
dip=0xc0a80001
ports = sorted(config["port_map"].keys())
- Groups = Queue.LifoQueue()
for port in ports:
#add l2 interface group
@@ -241,11 +240,11 @@
"""
def runTest(self):
+ Groups = Queue.LifoQueue()
try:
ports = sorted(config["port_map"].keys())
vlan_id = 1
- Groups = Queue.LifoQueue()
for port in ports:
L2gid, l2msg = add_one_l2_interface_group(self.controller, port,
vlan_id, True, False)
@@ -332,10 +331,11 @@
"""
def runTest(self):
+
+ Groups = Queue.LifoQueue()
try:
ports = sorted(config["port_map"].keys())
vlan_id = 1;
- Groups = Queue.LifoQueue()
for port in ports:
L2gid, l2msg = add_one_l2_interface_group(self.controller, port,
vlan_id, True, False)
@@ -370,10 +370,10 @@
class Mtu1500(base_tests.SimpleDataPlane):
def runTest(self):
+ Groups = Queue.LifoQueue()
try:
ports = sorted(config["port_map"].keys())
vlan_id = 18
- Groups = Queue.LifoQueue()
for port in ports:
L2gid, msg = add_one_l2_interface_group(self.controller, port,
vlan_id, True, False)
@@ -411,6 +411,7 @@
"""
def runTest(self):
+ Groups = Queue.LifoQueue()
try:
test_id = 26
if len(config["port_map"]) < 2:
@@ -421,7 +422,6 @@
dst_mac = [0x00, 0x00, 0x00, 0x22, 0x22, 0x00]
dip = 0xc0a80001
ports = config["port_map"].keys()
- Groups = Queue.LifoQueue()
for port in ports:
# add l2 interface group
vlan_id = port + test_id
@@ -485,6 +485,7 @@
"""
def runTest(self):
+ Groups = Queue.LifoQueue()
try:
if len(config["port_map"]) < 2:
logging.info("Port count less than 2, can't run this case")
@@ -494,7 +495,6 @@
dst_mac = [0x00, 0x00, 0x00, 0x22, 0x22, 0x00]
dip = 0xc0a80001
ports = config["port_map"].keys()
- Groups = Queue.LifoQueue()
for port in ports:
# add l2 interface group
id = port
@@ -566,6 +566,7 @@
"""
def runTest(self):
+ Groups = Queue.LifoQueue()
try:
if len(config["port_map"]) < 2:
logging.info("Port count less than 2, can't run this case")
@@ -575,7 +576,6 @@
dst_mac = [0x00, 0x00, 0x00, 0x22, 0x22, 0x00]
dip = 0xc0a80001
ports = config["port_map"].keys()
- Groups = Queue.LifoQueue()
for port in ports:
# add l2 interface group
id = port
@@ -648,8 +648,8 @@
"""
def runTest(self):
+ Groups = Queue.LifoQueue()
try:
- Groups = Queue.LifoQueue()
if len(config["port_map"]) < 2:
logging.info("Port count less than 2, can't run this case")
return
@@ -725,6 +725,7 @@
"""
def runTest(self):
+ Groups = Queue.LifoQueue()
try:
if len(config["port_map"]) < 2:
logging.info("Port count less than 2, can't run this case")
@@ -734,7 +735,6 @@
dst_mac = [0x00, 0x00, 0x00, 0x22, 0x22, 0x00]
dip = 0xc0a80001
ports = config["port_map"].keys()
- Groups = Queue.LifoQueue()
for port in ports:
# add l2 interface group
id = port
@@ -803,6 +803,7 @@
"""
def runTest(self):
+ Groups = Queue.LifoQueue()
try:
if len(config["port_map"]) < 2:
logging.info("Port count less than 2, can't run this case")
@@ -810,7 +811,6 @@
intf_src_mac = [0x00, 0x00, 0x00, 0xcc, 0xcc, 0xcc]
dst_mac = [0x00, 0x00, 0x00, 0x22, 0x22, 0x00]
dip = 0xc0a80001
- Groups = Queue.LifoQueue()
ports = config["port_map"].keys()
for port in ports:
# add l2 interface group
@@ -888,8 +888,8 @@
"""
def runTest(self):
+ Groups = Queue.LifoQueue()
try:
- Groups = Queue.LifoQueue()
if len(config["port_map"]) < 2:
logging.info("Port count less than 2, can't run this case")
return
@@ -1056,12 +1056,12 @@
"""
port1 (vlan 300)-> All Ports (vlan 300)
"""
+ Groups = Queue.LifoQueue()
try:
if len(config["port_map"]) < 3:
logging.info("Port count less than 3, can't run this case")
assert (False)
return
- Groups = Queue.LifoQueue()
vlan_id = 300
intf_src_mac = [0x00, 0x00, 0x00, 0xcc, 0xcc, 0xcc]
intf_src_mac_str = ':'.join(['%02X' % x for x in intf_src_mac])
@@ -1134,8 +1134,8 @@
"""
port1 (vlan 1)-> port 2 (vlan 2)
"""
+ Groups = Queue.LifoQueue()
try:
- Groups = Queue.LifoQueue()
if len(config["port_map"]) < 3:
logging.info("Port count less than 3, can't run this case")
assert (False)
@@ -1221,8 +1221,8 @@
Receive MPLS packet
"""
def runTest(self):
+ Groups = Queue.LifoQueue()
try:
- Groups = Queue.LifoQueue()
if len(config["port_map"]) < 2:
logging.info("Port count less than 2, can't run this case")
return
@@ -1296,8 +1296,8 @@
Receive MPLS packet
"""
def runTest(self):
+ Groups = Queue.LifoQueue()
try:
- Groups = Queue.LifoQueue()
if len(config["port_map"]) < 2:
logging.info("Port count less than 2, can't run this case")
return
@@ -1365,17 +1365,16 @@
"""
def runTest(self):
+ Groups = Queue.LifoQueue()
try:
test_id = 26
if len(config["port_map"]) < 2:
logging.info("Port count less than 2, can't run this case")
return
-
intf_src_mac = [0x00, 0x00, 0x00, 0xcc, 0xcc, 0xcc]
dst_mac = [0x00, 0x00, 0x00, 0x22, 0x22, 0x00]
dip = 0xc0a80001
ports = config["port_map"].keys()
- Groups = Queue.LifoQueue()
for port in ports:
# add l2 interface group
vlan_id = port + test_id
@@ -1438,6 +1437,7 @@
"""
def runTest(self):
+ Groups = Queue.LifoQueue()
try:
if len(config["port_map"]) < 2:
logging.info("Port count less than 2, can't run this case")
@@ -1447,7 +1447,6 @@
dst_mac = [0x00, 0x00, 0x00, 0x22, 0x22, 0x00]
dip = 0xc0a80001
ports = config["port_map"].keys()
- Groups = Queue.LifoQueue()
for port in ports:
# add l2 interface group
vlan_id = port
@@ -1629,6 +1628,7 @@
"""
def runTest(self):
+ Groups = Queue.LifoQueue()
try:
ports = sorted(config["port_map"].keys())
@@ -1647,5 +1647,4 @@
verify_no_other_packets(self)
finally:
delete_all_flows(self.controller)
- delete_groups(self.controller, Groups)
delete_all_groups(self.controller)
\ No newline at end of file
diff --git a/oft b/oft
index bab5b47..275e3d3 100755
--- a/oft
+++ b/oft
@@ -99,7 +99,7 @@
"random_order" : False,
"dump_packet" : True,
"cicada_poject" : False,
-
+ "force_ofdpa_restart": False,
# Other configuration
"port_map" : {},
}
@@ -214,6 +214,8 @@
help="Minimum allowable packet size on the dataplane.")
group.add_option("--random-seed", type="int",
help="Random number generator seed")
+ group.add_option("--force-ofdpa-restart",
+ help="If set force ofdpa restart on user@switchIP")
group.add_option("--disable-ipv6", action="store_true",
help="Disable IPv6 tests")
group.add_option("--random-order", action="store_true",
diff --git a/src/python/oftest/base_tests.py b/src/python/oftest/base_tests.py
old mode 100644
new mode 100755
index a2b25ac..ec9b351
--- a/src/python/oftest/base_tests.py
+++ b/src/python/oftest/base_tests.py
@@ -14,17 +14,24 @@
import oftest.controller as controller
import oftest.dataplane as dataplane
import ofp
+from ofdpa_utils import *
class BaseTest(unittest.TestCase):
def __str__(self):
return self.id().replace('.runTest', '')
def setUp(self):
+ if config["force_ofdpa_restart"]:
+ logging.info("Restarting OFDPA")
+ forceOfdpaRestart( config["force_ofdpa_restart"]);
oftest.open_logfile(str(self))
logging.info("** START TEST CASE " + str(self))
def tearDown(self):
logging.info("** END TEST CASE " + str(self))
+ if config["force_ofdpa_restart"]:
+ forceOfdpaStop( config["force_ofdpa_restart"]);
+
class SimpleProtocol(BaseTest):
"""
@@ -33,7 +40,6 @@
def setUp(self):
BaseTest.setUp(self)
-
self.controller = controller.Controller(
switch=config["switch_ip"],
host=config["controller_host"],
@@ -47,7 +53,6 @@
# By default, respond to echo requests
self.controller.keep_alive = True
-
if not self.controller.active:
raise Exception("Controller startup failed")
if self.controller.switch_addr is None:
diff --git a/src/python/oftest/controller.py b/src/python/oftest/controller.py
old mode 100644
new mode 100755
index acbe3fc..23d031b
--- a/src/python/oftest/controller.py
+++ b/src/python/oftest/controller.py
@@ -43,30 +43,34 @@
# Configured openflow version
import ofp as cfg_ofp
-FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.'
- for x in range(256)])
+FILTER = ''.join( [ (len( repr( chr( x ) ) ) == 3) and chr( x ) or '.'
+ for x in range( 256 ) ] )
-def hex_dump_buffer(src, length=16):
+
+def hex_dump_buffer( src, length=16 ):
"""
Convert src to a hex dump string and return the string
@param src The source buffer
@param length The number of bytes shown in each line
@returns A string showing the hex dump
"""
- result = ["\n"]
- for i in xrange(0, len(src), length):
- chars = src[i:i+length]
- hex = ' '.join(["%02x" % ord(x) for x in chars])
- printable = ''.join(["%s" % ((ord(x) <= 127 and
- FILTER[ord(x)]) or '.') for x in chars])
- result.append("%04x %-*s %s\n" % (i, length*3, hex, printable))
- return ''.join(result)
+ result = [ "\n" ]
+ for i in xrange( 0, len( src ), length ):
+ chars = src[ i:i + length ]
+ hex = ' '.join( [ "%02x" % ord( x ) for x in chars ] )
+ printable = ''.join( [ "%s" % ((ord( x ) <= 127 and
+ FILTER[ ord( x ) ]) or '.') for x in
+ chars ] )
+ result.append( "%04x %-*s %s\n" % (i, length * 3, hex, printable) )
+ return ''.join( result )
+
##@todo Find a better home for these identifiers (controller)
RCV_SIZE_DEFAULT = 32768
LISTEN_QUEUE_SIZE = 1
-class Controller(Thread):
+
+class Controller( Thread ):
"""
Class abstracting the control interface to the switch.
@@ -97,19 +101,20 @@
@var dbg_state Debug indication of state
"""
- def __init__(self, switch=None, host='127.0.0.1', port=6653, max_pkts=1024):
- Thread.__init__(self)
+ def __init__( self, switch=None, host='127.0.0.1', port=6653, max_pkts=1024,
+ force=False ):
+ Thread.__init__( self )
# Socket related
self.rcv_size = RCV_SIZE_DEFAULT
self.listen_socket = None
self.switch_socket = None
self.switch_addr = None
- self.connect_cv = Condition()
- self.message_cv = Condition()
- self.tx_lock = Lock()
+ self.connect_cv = Condition( )
+ self.message_cv = Condition( )
+ self.tx_lock = Lock( )
# Used to wake up the event loop from another thread
- self.waker = ofutils.EventDescriptor()
+ self.waker = ofutils.EventDescriptor( )
# Counters
self.socket_errors = 0
@@ -120,37 +125,38 @@
self.poll_discards = 0
# State
- self.sync = Lock()
- self.handlers = {}
+ self.sync = Lock( )
+ self.handlers = { }
self.keep_alive = False
self.active = True
self.initial_hello = True
# OpenFlow message/packet queue
# Protected by the packets_cv lock / condition variable
- self.packets = []
- self.packets_cv = Condition()
+ self.packets = [ ]
+ self.packets_cv = Condition( )
self.packet_in_count = 0
# Settings
self.max_pkts = max_pkts
self.switch = switch
self.passive = not self.switch
+ self.force = force
self.host = host
self.port = port
self.dbg_state = "init"
- self.logger = logging.getLogger("controller")
- self.filter_packet_in = False # Drop "excessive" packet ins
- self.pkt_in_run = 0 # Count on run of packet ins
- self.pkt_in_filter_limit = 50 # Count on run of packet ins
- self.pkt_in_dropped = 0 # Total dropped packet ins
- self.transact_to = 15 # Transact timeout default value; add to config
+ self.logger = logging.getLogger( "controller" )
+ self.filter_packet_in = False # Drop "excessive" packet ins
+ self.pkt_in_run = 0 # Count on run of packet ins
+ self.pkt_in_filter_limit = 50 # Count on run of packet ins
+ self.pkt_in_dropped = 0 # Total dropped packet ins
+ self.transact_to = 15 # Transact timeout default value; add to config
# Transaction and message type waiting variables
# xid_cv: Condition variable (semaphore) for packet waiters
# xid: Transaction ID being waited on
# xid_response: Transaction response message
- self.xid_cv = Condition()
+ self.xid_cv = Condition( )
self.xid = None
self.xid_response = None
@@ -158,19 +164,19 @@
# Create listen socket
if self.passive:
- self.logger.info("Create/listen at " + self.host + ":" +
- str(self.port))
- ai = socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC,
- socket.SOCK_STREAM, 0, socket.AI_PASSIVE)
+ self.logger.info( "Create/listen at " + self.host + ":" +
+ str( self.port ) )
+ ai = socket.getaddrinfo( self.host, self.port, socket.AF_UNSPEC,
+ socket.SOCK_STREAM, 0, socket.AI_PASSIVE )
# Use first returned addrinfo
- (family, socktype, proto, name, sockaddr) = ai[0]
- self.listen_socket = socket.socket(family, socktype)
- self.listen_socket.setsockopt(socket.SOL_SOCKET,
- socket.SO_REUSEADDR, 1)
- self.listen_socket.bind(sockaddr)
- self.listen_socket.listen(LISTEN_QUEUE_SIZE)
+ (family, socktype, proto, name, sockaddr) = ai[ 0 ]
+ self.listen_socket = socket.socket( family, socktype )
+ self.listen_socket.setsockopt( socket.SOL_SOCKET,
+ socket.SO_REUSEADDR, 1 )
+ self.listen_socket.bind( sockaddr )
+ self.listen_socket.listen( LISTEN_QUEUE_SIZE )
- def filter_packet(self, rawmsg, hdr):
+ def filter_packet( self, rawmsg, hdr ):
"""
Check if packet should be filtered
@@ -184,15 +190,15 @@
# If we were dropping packets, report number dropped
# TODO dont drop expected packet ins
if self.pkt_in_run > self.pkt_in_filter_limit:
- self.logger.debug("Dropped %d packet ins (%d total)"
- % ((self.pkt_in_run -
- self.pkt_in_filter_limit),
- self.pkt_in_dropped))
+ self.logger.debug( "Dropped %d packet ins (%d total)"
+ % ((self.pkt_in_run -
+ self.pkt_in_filter_limit),
+ self.pkt_in_dropped) )
self.pkt_in_run = 0
return False
- def _pkt_handle(self, pkt):
+ def _pkt_handle( self, pkt ):
"""
Check for all packet handling conditions
@@ -214,52 +220,55 @@
# Process each of the OF msgs inside the pkt
offset = 0
- while offset < len(pkt):
- if offset + 8 > len(pkt):
+ while offset < len( pkt ):
+ if offset + 8 > len( pkt ):
break
# Parse the header to get type
- hdr_version, hdr_type, hdr_length, hdr_xid = cfg_ofp.message.parse_header(pkt[offset:])
+ hdr_version, hdr_type, hdr_length, hdr_xid = cfg_ofp.message.parse_header(
+ pkt[ offset: ] )
# Use loxi to resolve to ofp of matching version
- ofp = loxi.protocol(hdr_version)
+ ofp = loxi.protocol( hdr_version )
# Extract the raw message bytes
- if (offset + hdr_length) > len(pkt):
+ if (offset + hdr_length) > len( pkt ):
break
- rawmsg = pkt[offset : offset + hdr_length]
+ rawmsg = pkt[ offset: offset + hdr_length ]
offset += hdr_length
- #if self.filter_packet(rawmsg, hdr):
+ # if self.filter_packet(rawmsg, hdr):
# continue
- msg = ofp.message.parse_message(rawmsg)
+ msg = ofp.message.parse_message( rawmsg )
if not msg:
self.parse_errors += 1
- self.logger.warn("Could not parse message")
+ self.logger.warn( "Could not parse message" )
continue
- self.logger.debug("Msg in: version %d class %s len %d xid %d",
- hdr_version, type(msg).__name__, hdr_length, hdr_xid)
+ self.logger.debug( "Msg in: version %d class %s len %d xid %d",
+ hdr_version, type( msg ).__name__, hdr_length,
+ hdr_xid )
with self.sync:
# Check if transaction is waiting
with self.xid_cv:
if self.xid and hdr_xid == self.xid:
- self.logger.debug("Matched expected XID " + str(hdr_xid))
+ self.logger.debug(
+ "Matched expected XID " + str( hdr_xid ) )
self.xid_response = (msg, rawmsg)
self.xid = None
- self.xid_cv.notify()
+ self.xid_cv.notify( )
continue
# Check if keep alive is set; if so, respond to echo requests
if self.keep_alive:
if hdr_type == ofp.OFPT_ECHO_REQUEST:
- self.logger.debug("Responding to echo request")
- rep = ofp.message.echo_reply()
+ self.logger.debug( "Responding to echo request" )
+ rep = ofp.message.echo_reply( )
rep.xid = hdr_xid
# Ignoring additional data
- self.message_send(rep)
+ self.message_send( rep )
continue
# Generalize to counters for all packet types?
@@ -267,10 +276,10 @@
self.packet_in_count += 1
# Log error messages
- if isinstance(msg, ofp.message.error_msg):
- #pylint: disable=E1103
+ if isinstance( msg, ofp.message.error_msg ):
+ # pylint: disable=E1103
if msg.err_type in ofp.ofp_error_type_map:
- type_str = ofp.ofp_error_type_map[msg.err_type]
+ type_str = ofp.ofp_error_type_map[ msg.err_type ]
if msg.err_type == ofp.OFPET_HELLO_FAILED:
code_map = ofp.ofp_hello_failed_code_map
elif msg.err_type == ofp.OFPET_BAD_REQUEST:
@@ -287,41 +296,43 @@
code_map = None
if code_map and msg.code in code_map:
- code_str = code_map[msg.code]
+ code_str = code_map[ msg.code ]
else:
code_str = "unknown"
else:
type_str = "unknown"
code_str = "unknown"
- self.logger.warn("Received error message: xid=%d type=%s (%d) code=%s (%d)",
- hdr_xid, type_str, msg.err_type, code_str, msg.code)
+ self.logger.warn(
+ "Received error message: xid=%d type=%s (%d) code=%s (%d)",
+ hdr_xid, type_str, msg.err_type, code_str,
+ msg.code )
# Now check for message handlers; preference is given to
# handlers for a specific packet
handled = False
- if hdr_type in self.handlers.keys():
- handled = self.handlers[hdr_type](self, msg, rawmsg)
- if not handled and ("all" in self.handlers.keys()):
- handled = self.handlers["all"](self, msg, rawmsg)
+ if hdr_type in self.handlers.keys( ):
+ handled = self.handlers[ hdr_type ]( self, msg, rawmsg )
+ if not handled and ("all" in self.handlers.keys( )):
+ handled = self.handlers[ "all" ]( self, msg, rawmsg )
- if not handled: # Not handled, enqueue
+ if not handled: # Not handled, enqueue
with self.packets_cv:
- if len(self.packets) >= self.max_pkts:
- self.packets.pop(0)
+ if len( self.packets ) >= self.max_pkts:
+ self.packets.pop( 0 )
self.packets_expired += 1
- self.packets.append((msg, rawmsg))
- self.packets_cv.notify_all()
+ self.packets.append( (msg, rawmsg) )
+ self.packets_cv.notify_all( )
self.packets_total += 1
else:
self.packets_handled += 1
- self.logger.debug("Message handled by callback")
+ self.logger.debug( "Message handled by callback" )
# end of 'while offset < len(pkt)'
# note that if offset = len(pkt), this is
# appends a harmless empty string
- self.buffered_input += pkt[offset:]
+ self.buffered_input += pkt[ offset: ]
- def _socket_ready_handle(self, s):
+ def _socket_ready_handle( self, s ):
"""
Handle an input-ready socket
@@ -331,91 +342,93 @@
if self.passive and s and s == self.listen_socket:
if self.switch_socket:
- self.logger.warning("Ignoring incoming connection; already connected to switch")
- (sock, addr) = self.listen_socket.accept()
- sock.close()
+ self.logger.warning(
+ "Ignoring incoming connection; already connected to switch" )
+ (sock, addr) = self.listen_socket.accept( )
+ sock.close( )
return 0
try:
- (sock, addr) = self.listen_socket.accept()
+ (sock, addr) = self.listen_socket.accept( )
except:
- self.logger.warning("Error on listen socket accept")
+ self.logger.warning( "Error on listen socket accept" )
return -1
- self.logger.info(self.host+":"+str(self.port)+": Incoming connection from "+str(addr))
+ self.logger.info( self.host + ":" + str(
+ self.port ) + ": Incoming connection from " + str( addr ) )
with self.connect_cv:
(self.switch_socket, self.switch_addr) = (sock, addr)
- self.switch_socket.setsockopt(socket.IPPROTO_TCP,
- socket.TCP_NODELAY, True)
+ self.switch_socket.setsockopt( socket.IPPROTO_TCP,
+ socket.TCP_NODELAY, True )
if self.initial_hello:
- self.message_send(cfg_ofp.message.hello())
- self.connect_cv.notify() # Notify anyone waiting
+ self.message_send( cfg_ofp.message.hello( ) )
+ self.connect_cv.notify( ) # Notify anyone waiting
# Prevent further connections
- self.listen_socket.close()
+ self.listen_socket.close( )
self.listen_socket = None
elif s and s == self.switch_socket:
- for idx in range(3): # debug: try a couple of times
+ for idx in range( 3 ): # debug: try a couple of times
try:
- pkt = self.switch_socket.recv(self.rcv_size)
+ pkt = self.switch_socket.recv( self.rcv_size )
except:
- self.logger.warning("Error on switch read")
+ self.logger.warning( "Error on switch read" )
return -1
-
+
if not self.active:
return 0
-
- if len(pkt) == 0:
- self.logger.warning("Zero-length switch read, %d" % idx)
+
+ if len( pkt ) == 0:
+ self.logger.warning( "Zero-length switch read, %d" % idx )
else:
break
- if len(pkt) == 0: # Still no packet
- self.logger.warning("Zero-length switch read; closing cxn")
- self.logger.info(str(self))
+ if len( pkt ) == 0: # Still no packet
+ self.logger.warning( "Zero-length switch read; closing cxn" )
+ self.logger.info( str( self ) )
return -1
- self._pkt_handle(pkt)
+ self._pkt_handle( pkt )
elif s and s == self.waker:
- self.waker.wait()
+ self.waker.wait( )
else:
- self.logger.error("Unknown socket ready: " + str(s))
+ self.logger.error( "Unknown socket ready: " + str( s ) )
return -1
return 0
- def active_connect(self):
+ def active_connect( self ):
"""
Actively connect to a switch IP addr
"""
try:
- self.logger.info("Trying active connection to %s" % self.switch)
- soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- soc.connect((self.switch, self.port))
- self.logger.info("Connected to " + self.switch + " on " +
- str(self.port))
- soc.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
+ self.logger.info( "Trying active connection to %s" % self.switch )
+ soc = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
+ soc.connect( (self.switch, self.port) )
+ self.logger.info( "Connected to " + self.switch + " on " +
+ str( self.port ) )
+ soc.setsockopt( socket.IPPROTO_TCP, socket.TCP_NODELAY, True )
self.switch_addr = (self.switch, self.port)
return soc
except (StandardError, socket.error), e:
- self.logger.error("Could not connect to %s at %d:: %s" %
- (self.switch, self.port, str(e)))
+ self.logger.error( "Could not connect to %s at %d:: %s" %
+ (self.switch, self.port, str( e )) )
return None
- def wakeup(self):
+ def wakeup( self ):
"""
Wake up the event loop, presumably from another thread.
"""
- self.waker.notify()
+ self.waker.notify( )
- def sockets(self):
+ def sockets( self ):
"""
Return list of sockets to select on.
"""
- socs = [self.listen_socket, self.switch_socket, self.waker]
- return [x for x in socs if x]
+ socs = [ self.listen_socket, self.switch_socket, self.waker ]
+ return [ x for x in socs if x ]
- def run(self):
+ def run( self ):
"""
Activity function for class
@@ -435,26 +448,27 @@
while self.active:
try:
sel_in, sel_out, sel_err = \
- select.select(self.sockets(), [], self.sockets(), 1)
+ select.select( self.sockets( ), [ ], self.sockets( ), 1 )
except:
- print sys.exc_info()
- self.logger.error("Select error, disconnecting")
- self.disconnect()
+ print sys.exc_info( )
+ self.logger.error( "Select error, disconnecting" )
+ self.disconnect( )
for s in sel_err:
- self.logger.error("Got socket error on: " + str(s) + ", disconnecting")
- self.disconnect()
+ self.logger.error(
+ "Got socket error on: " + str( s ) + ", disconnecting" )
+ self.disconnect( )
for s in sel_in:
- if self._socket_ready_handle(s) == -1:
- self.disconnect()
+ if self._socket_ready_handle( s ) == -1:
+ self.disconnect( )
# End of main loop
self.dbg_state = "closing"
- self.logger.info("Exiting controller thread")
- self.shutdown()
+ self.logger.info( "Exiting controller thread" )
+ self.shutdown( )
- def connect(self, timeout=-1):
+ def connect( self, timeout=-1 ):
"""
Connect to the switch
@@ -463,63 +477,63 @@
"""
if not self.passive: # Do active connection now
- self.logger.info("Attempting to connect to %s on port %s" %
- (self.switch, str(self.port)))
- soc = self.active_connect()
+ self.logger.info( "Attempting to connect to %s on port %s" %
+ (self.switch, str( self.port )) )
+ soc = self.active_connect( )
if soc:
- self.logger.info("Connected to %s", self.switch)
+ self.logger.info( "Connected to %s", self.switch )
self.dbg_state = "running"
self.switch_socket = soc
- self.wakeup()
+ self.wakeup( )
with self.connect_cv:
if self.initial_hello:
- self.message_send(cfg_ofp.message.hello())
- self.connect_cv.notify() # Notify anyone waiting
+ self.message_send( cfg_ofp.message.hello( ) )
+ self.connect_cv.notify( ) # Notify anyone waiting
else:
- self.logger.error("Could not actively connect to switch %s",
- self.switch)
+ self.logger.error( "Could not actively connect to switch %s",
+ self.switch )
self.active = False
else:
with self.connect_cv:
- ofutils.timed_wait(self.connect_cv, lambda: self.switch_socket,
- timeout=timeout)
+ ofutils.timed_wait( self.connect_cv, lambda: self.switch_socket,
+ timeout=timeout )
return self.switch_socket is not None
-
- def disconnect(self, timeout=-1):
+
+ def disconnect( self, timeout=-1 ):
"""
If connected to a switch, disconnect.
"""
if self.switch_socket:
- self.switch_socket.close()
+ self.switch_socket.close( )
self.switch_socket = None
self.switch_addr = None
with self.packets_cv:
- self.packets = []
+ self.packets = [ ]
with self.connect_cv:
- self.connect_cv.notifyAll()
+ self.connect_cv.notifyAll( )
- def wait_disconnected(self, timeout=-1):
+ def wait_disconnected( self, timeout=-1 ):
"""
@param timeout Block for up to timeout seconds. Pass -1 for the default.
@return Boolean, True if disconnected
"""
with self.connect_cv:
- ofutils.timed_wait(self.connect_cv,
- lambda: True if not self.switch_socket else None,
- timeout=timeout)
+ ofutils.timed_wait( self.connect_cv,
+ lambda: True if not self.switch_socket else None,
+ timeout=timeout )
return self.switch_socket is None
-
- def kill(self):
+
+ def kill( self ):
"""
Force the controller thread to quit
"""
self.active = False
- self.wakeup()
- self.join()
+ self.wakeup( )
+ self.join( )
- def shutdown(self):
+ def shutdown( self ):
"""
Shutdown the controller closing all sockets
@@ -528,28 +542,28 @@
self.active = False
try:
- self.switch_socket.shutdown(socket.SHUT_RDWR)
+ self.switch_socket.shutdown( socket.SHUT_RDWR )
except:
- self.logger.info("Ignoring switch soc shutdown error")
+ self.logger.info( "Ignoring switch soc shutdown error" )
self.switch_socket = None
try:
- self.listen_socket.shutdown(socket.SHUT_RDWR)
+ self.listen_socket.shutdown( socket.SHUT_RDWR )
except:
- self.logger.info("Ignoring listen soc shutdown error")
+ self.logger.info( "Ignoring listen soc shutdown error" )
self.listen_socket = None
# Wakeup condition variables on which controller may be wait
with self.xid_cv:
- self.xid_cv.notifyAll()
+ self.xid_cv.notifyAll( )
with self.connect_cv:
- self.connect_cv.notifyAll()
+ self.connect_cv.notifyAll( )
- self.wakeup()
+ self.wakeup( )
self.dbg_state = "down"
- def register(self, msg_type, handler):
+ def register( self, msg_type, handler ):
"""
Register a callback to receive a specific message type.
@@ -565,12 +579,12 @@
type is received.
"""
# Should check type is valid
- if not handler and msg_type in self.handlers.keys():
- del self.handlers[msg_type]
+ if not handler and msg_type in self.handlers.keys( ):
+ del self.handlers[ msg_type ]
return
- self.handlers[msg_type] = handler
+ self.handlers[ msg_type ] = handler
- def poll(self, exp_msg=None, timeout=-1):
+ def poll( self, exp_msg=None, timeout=-1 ):
"""
Wait for the next OF message received from the switch.
@@ -589,29 +603,30 @@
"""
if exp_msg is None:
- self.logger.warn("DEPRECATED polling for any message class")
+ self.logger.warn( "DEPRECATED polling for any message class" )
klass = None
- elif isinstance(exp_msg, int):
- klass = cfg_ofp.message.message.subtypes[exp_msg]
- elif issubclass(exp_msg, loxi.OFObject):
+ elif isinstance( exp_msg, int ):
+ klass = cfg_ofp.message.message.subtypes[ exp_msg ]
+ elif issubclass( exp_msg, loxi.OFObject ):
klass = exp_msg
else:
- raise ValueError("Unexpected exp_msg argument %r" % exp_msg)
+ raise ValueError( "Unexpected exp_msg argument %r" % exp_msg )
- self.logger.debug("Polling for %s", klass.__name__)
+ self.logger.debug( "Polling for %s", klass.__name__ )
# Take the packet from the queue
- def grab():
- for i, (msg, pkt) in enumerate(self.packets):
- if klass is None or isinstance(msg, klass):
- self.logger.debug("Got %s message", msg.__class__.__name__)
- return self.packets.pop(i)
+ def grab( ):
+ for i, (msg, pkt) in enumerate( self.packets ):
+ if klass is None or isinstance( msg, klass ):
+ self.logger.debug( "Got %s message",
+ msg.__class__.__name__ )
+ return self.packets.pop( i )
# Not found
- self.logger.debug("%s message not in queue", klass.__name__)
+ self.logger.debug( "%s message not in queue", klass.__name__ )
return None
with self.packets_cv:
- ret = ofutils.timed_wait(self.packets_cv, grab, timeout=timeout)
+ ret = ofutils.timed_wait( self.packets_cv, grab, timeout=timeout )
if ret != None:
(msg, pkt) = ret
@@ -619,7 +634,7 @@
else:
return (None, None)
- def transact(self, msg, timeout=-1):
+ def transact( self, msg, timeout=-1 ):
"""
Run a message transaction with the switch
@@ -632,21 +647,22 @@
"""
if msg.xid == None:
- msg.xid = ofutils.gen_xid()
+ msg.xid = ofutils.gen_xid( )
- self.logger.debug("Running transaction %d" % msg.xid)
+ self.logger.debug( "Running transaction %d" % msg.xid )
with self.xid_cv:
if self.xid:
- self.logger.error("Can only run one transaction at a time")
+ self.logger.error( "Can only run one transaction at a time" )
return (None, None)
self.xid = msg.xid
self.xid_response = None
- self.message_send(msg)
+ self.message_send( msg )
- self.logger.debug("Waiting for transaction %d" % msg.xid)
- ofutils.timed_wait(self.xid_cv, lambda: self.xid_response, timeout=timeout)
+ self.logger.debug( "Waiting for transaction %d" % msg.xid )
+ ofutils.timed_wait( self.xid_cv, lambda: self.xid_response,
+ timeout=timeout )
if self.xid_response:
(resp, pkt) = self.xid_response
@@ -655,10 +671,10 @@
(resp, pkt) = (None, None)
if resp is None:
- self.logger.warning("No response for xid " + str(self.xid))
+ self.logger.warning( "No response for xid " + str( self.xid ) )
return (resp, pkt)
- def message_send(self, msg):
+ def message_send( self, msg ):
"""
Send the message to the switch
@@ -668,56 +684,58 @@
if not self.switch_socket:
# Sending a string indicates the message is ready to go
- raise Exception("no socket")
+ raise Exception( "no socket" )
if msg.xid == None:
- msg.xid = ofutils.gen_xid()
+ msg.xid = ofutils.gen_xid( )
- outpkt = msg.pack()
+ outpkt = msg.pack( )
- self.logger.debug("Msg out: version %d class %s len %d xid %d",
- msg.version, type(msg).__name__, len(outpkt), msg.xid)
+ self.logger.debug( "Msg out: version %d class %s len %d xid %d",
+ msg.version, type( msg ).__name__, len( outpkt ),
+ msg.xid )
with self.tx_lock:
- if self.switch_socket.sendall(outpkt) is not None:
- raise AssertionError("failed to send message to switch")
+ if self.switch_socket.sendall( outpkt ) is not None:
+ raise AssertionError( "failed to send message to switch" )
- return 0 # for backwards compatibility
+ return 0 # for backwards compatibility
- def clear_queue(self):
+ def clear_queue( self ):
"""
Clear the input queue and report the number of messages
that were in it
"""
- enqueued_pkt_count = len(self.packets)
+ enqueued_pkt_count = len( self.packets )
with self.packets_cv:
- self.packets = []
+ self.packets = [ ]
return enqueued_pkt_count
- def __str__(self):
+ def __str__( self ):
string = "Controller:\n"
string += " state " + self.dbg_state + "\n"
- string += " switch_addr " + str(self.switch_addr) + "\n"
- string += " pending pkts " + str(len(self.packets)) + "\n"
- string += " total pkts " + str(self.packets_total) + "\n"
- string += " expired pkts " + str(self.packets_expired) + "\n"
- string += " handled pkts " + str(self.packets_handled) + "\n"
- string += " poll discards " + str(self.poll_discards) + "\n"
- string += " parse errors " + str(self.parse_errors) + "\n"
- string += " sock errrors " + str(self.socket_errors) + "\n"
- string += " max pkts " + str(self.max_pkts) + "\n"
- string += " target switch " + str(self.switch) + "\n"
- string += " host " + str(self.host) + "\n"
- string += " port " + str(self.port) + "\n"
- string += " keep_alive " + str(self.keep_alive) + "\n"
- string += " pkt_in_run " + str(self.pkt_in_run) + "\n"
- string += " pkt_in_dropped " + str(self.pkt_in_dropped) + "\n"
+ string += " switch_addr " + str( self.switch_addr ) + "\n"
+ string += " pending pkts " + str( len( self.packets ) ) + "\n"
+ string += " total pkts " + str( self.packets_total ) + "\n"
+ string += " expired pkts " + str( self.packets_expired ) + "\n"
+ string += " handled pkts " + str( self.packets_handled ) + "\n"
+ string += " poll discards " + str( self.poll_discards ) + "\n"
+ string += " parse errors " + str( self.parse_errors ) + "\n"
+ string += " sock errrors " + str( self.socket_errors ) + "\n"
+ string += " max pkts " + str( self.max_pkts ) + "\n"
+ string += " target switch " + str( self.switch ) + "\n"
+ string += " host " + str( self.host ) + "\n"
+ string += " port " + str( self.port ) + "\n"
+ string += " keep_alive " + str( self.keep_alive ) + "\n"
+ string += " pkt_in_run " + str( self.pkt_in_run ) + "\n"
+ string += " pkt_in_dropped " + str( self.pkt_in_dropped ) + "\n"
return string
- def show(self):
- print str(self)
+ def show( self ):
+ print str( self )
-def sample_handler(controller, msg, pkt):
+
+def sample_handler( controller, msg, pkt ):
"""
Sample message handler
diff --git a/ofdpa/ofdpa_utils.py b/src/python/oftest/ofdpa_utils.py
old mode 100644
new mode 100755
similarity index 88%
rename from ofdpa/ofdpa_utils.py
rename to src/python/oftest/ofdpa_utils.py
index 4fa4d0a..ebbc834
--- a/ofdpa/ofdpa_utils.py
+++ b/src/python/oftest/ofdpa_utils.py
@@ -5,12 +5,24 @@
"""
-import logging
+import logging, subprocess, time
import ofp
from oftest import config
from oftest.testutils import *
+def forceOfdpaRestart( user ):
+ output = 1;
+ credential = user;
+ test = subprocess.Popen(["ssh", credential, "service ofdpa restart &> /dev/null"]);
+ time.sleep(1);
+ while output < 10:
+ output = int(subprocess.check_output(["ssh", credential, "client_cfg_purge | wc -l"]));
+ time.sleep(1);
+ subprocess.Popen(["ssh", credential, "brcm-indigo-ofdpa-ofagent -t 10.128.0.220 &> /dev/null"], stdout=subprocess.PIPE);
+
+def forceOfdpaStop( user ):
+ subprocess.Popen(["ssh", user, "ps ax | grep 'brcm-indigo-ofdpa-ofagent' | awk '{print $1}' | xargs sudo kill"], stdout=subprocess.PIPE);
class table(object):