Moved core code up to oftest directory
diff --git a/src/python/oftest/controller.py b/src/python/oftest/controller.py
index 25e5080..4262ca9 100644
--- a/src/python/oftest/controller.py
+++ b/src/python/oftest/controller.py
@@ -19,9 +19,6 @@
@todo Set up reasonable logging facility
"""
-import sys
-sys.path.append("../protocol")
-sys.path.append("../")
from oft_config import *
import os
import socket
@@ -32,6 +29,7 @@
from message import *
from parse import *
from netutils import *
+from ofutils import *
class Controller(Thread):
"""
@@ -58,7 +56,7 @@
@var packets_total Total number of packets received
@var packets_expired Number of packets popped from queue as queue full
@var packets_handled Number of packets handled by something
- @var state Debug indication of state
+ @var dbg_state Debug indication of state
"""
def __init__(self, max_pkts=1024):
@@ -79,7 +77,7 @@
self.passive = True
self.packets_expired = 0
self.packets_handled = 0
- self.state = "init" # Debug
+ self.dbg_state = "init"
self.listen_socket = None
self.switch_socket = None
self.switch_addr = None
@@ -92,6 +90,9 @@
self.xid = None
self.xid_response = None
+ # Connection condition variable to notify anyone waiting for a cxn
+ self.connect_cv = Condition()
+
def dbg(self, level, string):
debug_log("CTRL", self.debug_level, level, string)
@@ -107,8 +108,8 @@
If already connected, will close the current connection
"""
- oldstate = self.state
- self.state = "connecting"
+ oldstate = self.dbg_state
+ self.dbg_state = "connecting"
if self.connected:
self.dbg(DEBUG_WARN, "Disconnect when already connected")
self.disconnect()
@@ -116,7 +117,7 @@
if not self.passive:
print "Error in controller init: Active cxn not supported"
# raise unsupported
- self.state = oldstate
+ self.dbg_state = oldstate
return False
# FIXME: add error handling; try SocketServer?
@@ -128,17 +129,17 @@
self.listen_socket.setsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR, 1)
self.listen_socket.bind((self.host, self.port))
- self.state = "listening"
+ self.dbg_state = "listening"
self.listen_socket.listen(LISTEN_QUEUE_SIZE)
(self.switch_socket, self.switch_addr) = self.listen_socket.accept()
if not self.switch_socket:
self.socket_errors += 1
self.dbg(DEBUG_WARN, "Failed on accept")
- self.state = "error"
+ self.dbg_state = "error"
return False
self.connected = True
- self.state = "connected"
+ self.dbg_state = "connected"
self.dbg(DEBUG_INFO, "Got connection to " + str(self.switch_addr))
return True
@@ -146,12 +147,14 @@
"""
Disconnect the switch socket
"""
- self.state = "disconnected"
+ self.dbg_state = "disconnecting"
if not self.connected:
self.dbg(DEBUG_INFO, "disconnect when not connected")
+ self.dbg_state = "disconnected"
return
self.switch_socket.close()
self.connected = False
+ self.dbg_state = "disconnected"
def _pkt_handler_check(self, pkt):
"""
@@ -186,6 +189,7 @@
if self.xid:
if hdr.xid == self.xid:
self.xid_response = msg
+ self.xid_cv.notify()
self.xid_cv.release()
return (True, None)
self.xid_cv.release()
@@ -225,6 +229,10 @@
self.dbg(DEBUG_ERROR,
"Controller thread error connecting; exit")
break
+ # Notify anyone waiting that a connection is made
+ self.connect_cv.acquire()
+ self.connect_cv.notify()
+ self.connect_cv.release()
try:
pkt = self.switch_socket.recv(self.rcv_size)
if len(pkt) == 0:
@@ -242,7 +250,7 @@
if len(self.packets) >= self.max_pkts:
self.packets.pop(0)
self.packets_expired += 1
- self.packets.append((msg, data))
+ self.packets.append((msg, pkt))
self.packets_total += 1
self.sync.release()
@@ -284,9 +292,10 @@
@param exp_msg If set, return only when this type of message
is received.
- @retval A pair (msg, data) where msg is a message object and data is
- a string of any additional information following the
- parsed message.
+ @retval A pair (msg, pkt) where msg is a message object and pkt
+ the string representing the packet as received from the socket.
+ This allows additional parsing by the receiver if necessary.
+
The data members in the message are in host endian order.
If an error occurs, None is returned
"""
@@ -297,12 +306,12 @@
while len(self.packets) > 0:
self.sync.acquire()
- (msg, data) = self.packets.pop(0)
+ (msg, pkt) = self.packets.pop(0)
self.sync.release()
if not exp_msg or (exp_msg and (msg.header.type == exp_msg)):
- return msg, data
+ return msg, pkt
- def transact(self, msg, timeout=None):
+ def transact(self, msg, timeout=None, zero_xid=False):
"""
Run a message transaction with the switch
@@ -312,11 +321,15 @@
@param msg The message object to send; must not be a string
@param timeout The timeout in seconds (?)
+ @param zero_xid Normally, if the XID is 0 an XID will be generated
+ for the message. Set xero_xid to override this behavior
@return The matching message object or None if unsuccessful
- @todo Implement transact function for controller
"""
+ if not zero_xid and msg.header.xid == 0:
+ msg.header.xid = gen_xid()
+
self.xid_cv.acquire()
if self.xid:
self.xid_cv.release()
@@ -328,10 +341,10 @@
self.xid_response = None
self.message_send(msg.pack())
self.xid_cv.wait(timeout)
- (msg, data) = self.xid_response
+ msg = self.xid_response
self.xid_response = None
self.xid_cv.release()
- return (msg, data)
+ return msg
def message_send(self, msg):
"""
@@ -354,7 +367,7 @@
def __str__(self):
string = "Controller:\n"
string += " connected " + str(self.connected) + "\n"
- string += " state " + self.state + "\n"
+ string += " state " + self.dbg_state + "\n"
string += " running " + str(self.running) + "\n"
string += " switch_addr " + str(self.switch_addr) + "\n"
string += " pending pkts " + str(len(self.packets)) + "\n"
@@ -372,6 +385,34 @@
def show(self):
print str(self)
+ def connect_to_switch(self, timeout=None, send_hello=True):
+ """
+ Connect to switch
+
+ Block until a connection to a switch
+ is established or a timeout occurs. Sends a hello message
+ once established.
+ @param timeout Timeout in seconds
+ @param send_hello If True, will send hello when connection established
+ @return True if successful, False otherwise
+ """
+ h = hello()
+ print "x1"
+ self.connect_cv.acquire()
+ if self.connected:
+ print "x2"
+ self.connect_cv.release()
+ return True
+
+ print "x3"
+ self.connect_cv.wait(timeout)
+ self.connect_cv.release()
+ print "x4"
+ if send_hello:
+ self.message_send(h.pack())
+ print "x5"
+ return self.connected
+
def sample_handler(controller, msg, pkt):
"""
Sample message handler
diff --git a/src/python/oftest/dataplane.py b/src/python/oftest/dataplane.py
index 90a5207..0462f5d 100644
--- a/src/python/oftest/dataplane.py
+++ b/src/python/oftest/dataplane.py
@@ -18,7 +18,7 @@
import os
import socket
import time
-import promisc
+import netutils
from threading import Thread
from threading import Lock
diff --git a/src/python/oftest/netutils.py b/src/python/oftest/netutils.py
index f53c2e0..613ac66 100644
--- a/src/python/oftest/netutils.py
+++ b/src/python/oftest/netutils.py
@@ -1,7 +1,6 @@
-#!/usr/bin/env python
"""
-Network utilities for the OpenFlow controller
+Network utilities for the OpenFlow test framework
"""
###########################################################################
diff --git a/src/python/oftest/oft_config.py b/src/python/oftest/oft_config.py
index b6c14d3..af82455 100644
--- a/src/python/oftest/oft_config.py
+++ b/src/python/oftest/oft_config.py
@@ -79,6 +79,9 @@
CONTROLLER_HOST_DEFAULT = ''
CONTROLLER_PORT_DEFAULT = 6633
+# Timeout in seconds for initial connection
+INIT_CONNECT_TIMEOUT = 4
+
# Number of switch connection requests to queue
LISTEN_QUEUE_SIZE = 1