Mostly changes to socket deployment
Use select for handling sockets; hopefully better cleanup approach
Added connection semaphore for controller
Support message objects as arguments to controller.message_send
Support initial hello from controller when connected to switch
diff --git a/src/python/oftest/dataplane.py b/src/python/oftest/dataplane.py
index 89cf073..44ec997 100644
--- a/src/python/oftest/dataplane.py
+++ b/src/python/oftest/dataplane.py
@@ -21,13 +21,29 @@
import netutils
from threading import Thread
from threading import Lock
-import oft_config
+from oft_config import *
+import select
#@todo Move these identifiers into config
ETH_P_ALL = 0x03
RCV_TIMEOUT = 10000
RCV_SIZE = 4096
+# class packet_queue:
+# """
+# Class defining a packet queue across multiple ports
+
+# Items in the queue are stored as a triple (port number, pkt, pkt in time)
+# """
+
+# def __init__(self, max_pkts=1024):
+# self.sync = Lock()
+# self.debug_level = debug_level_default
+# self.packets = []
+# self.max_pkts = max_pkts
+# self.packets_total = 0
+# self.packets_discarded = 0
+
class DataPlanePort(Thread):
"""
Class defining a port monitoring object.
@@ -48,20 +64,18 @@
"""
Thread.__init__(self)
self.interface_name = interface_name
- self.debug_level = oft_config.debug_level_default
+ self.debug_level = debug_level_default
self.max_pkts = max_pkts
- self.packets_pending = 0
self.packets_total = 0
self.packets = []
- self.packet_times = []
self.packets_discarded = 0
self.sync = Lock()
self.socket = self.interface_open(interface_name)
- self.dbg(oft_config.DEBUG_INFO,
- "Openned port monitor socket " + interface_name)
+ self.dbg(DEBUG_INFO, "Openned port monitor socket")
def dbg(self, level, string):
- oft_config.debug_log("DPLANE", self.debug_level, level, string)
+ debug_log("DPLANE", self.debug_level, level,
+ self.interface_name + ": " + string)
def interface_open(self, interface_name):
"""
@@ -81,31 +95,52 @@
Activity function for class
"""
self.running = True
+ self.socs = [self.socket]
+ error_warned = False # Have we warned about error?
while self.running:
try:
+ sel_in, sel_out, sel_err = \
+ select.select(self.socs, [], [], 1)
+ except:
+ print sys.exc_info()
+ self.dbg(DEBUG_ERROR, "Select error, exiting")
+ sys.exit(1)
+
+ #if not sel_err is None:
+ # self.dbg(DEBUG_VERBOSE, "Socket error from select set")
+
+ if not self.running:
+ break
+
+ if sel_in is None:
+ continue
+
+ try:
rcvmsg = self.socket.recv(RCV_SIZE)
except socket.error:
- self.dbg(DEBUG_INFO, "Socket error for " +
- self.interface_name)
+ if not error_warned:
+ self.dbg(DEBUG_INFO, "Socket error on recv")
+ error_warned = True
continue
+
if len(rcvmsg) == 0:
- self.dbg(DEBUG_INFO, "Zero len pkt on " + self.interface_name)
+ self.dbg(DEBUG_INFO, "Zero len pkt rcvd")
self.kill()
break
rcvtime = time.clock()
+ self.dbg(DEBUG_VERBOSE, "Pkt len " + str(len(rcvmsg)) +
+ " in at " + str(rcvtime))
self.sync.acquire()
if len(self.packets) >= self.max_pkts:
self.packets.pop(0)
self.packets_discarded += 1
- self.packets.append(rcvmsg)
- self.packet_times.append(rcvtime)
- self.packets_pending += 1
+ self.packets.append((rcvmsg, rcvtime))
self.packets_total += 1
self.sync.release()
- self.dbg(DEBUG_INFO, "Thread exit for " + self.interface_name)
+ self.dbg(DEBUG_INFO, "Thread exit ")
def kill(self):
"""
@@ -116,17 +151,15 @@
self.socket.close()
except:
self.dbg(DEBUG_INFO, "Ignoring dataplane soc shutdown error")
- self.dbg(oft_config.DEBUG_INFO,
- "Port monitor for " + self.interface_name + " exiting")
+ self.dbg(DEBUG_INFO,
+ "Port monitor exiting")
def dequeue(self):
"""
Get the oldest packet in the queue
"""
self.sync.acquire()
- pkt = self.packets.pop(0)
- pkt_time = self.packet_times.pop(0)
- self.packets_pending -= 1
+ pkt, pkt_time = self.packets.pop(0)
self.sync.release()
return pkt, pkt_time
@@ -134,9 +167,12 @@
"""
Return the timestamp of the head of queue or None if empty
"""
- if self.packets_pending:
- return self.packet_times[0]
- return None
+ rv = None
+ self.sync.acquire()
+ if len(self.packets) > 0:
+ rv = self.packets[0][1]
+ self.sync.release()
+ return rv
def flush(self):
"""
@@ -146,7 +182,6 @@
self.packets_discarded += len(self.packets)
self.packets = []
self.packet_times = []
- self.packets_pending = 0
self.sync.release()
@@ -156,7 +191,7 @@
@param packet The packet data to send to the port
@retval The number of bytes sent
"""
- self.dbg(oft_config.DEBUG_VERBOSE,
+ self.dbg(DEBUG_VERBOSE,
"port sending " + str(len(packet)) + " bytes")
return self.socket.send(packet)
@@ -174,7 +209,7 @@
def show(self, prefix=''):
print prefix + "Name: " + self.interface_name
- print prefix + "Pkts pending: " + str(self.packets_pending)
+ print prefix + "Pkts pending: " + str(len(self.packets))
print prefix + "Pkts total: " + str(self.packets_total)
print prefix + "socket: " + str(self.socket)
@@ -186,10 +221,10 @@
"""
def __init__(self):
self.port_list = {}
- self.debug_level = oft_config.debug_level_default
+ self.debug_level = debug_level_default
def dbg(self, level, string):
- oft_config.debug_log("DPORT", self.debug_level, level, string)
+ debug_log("DPORT", self.debug_level, level, string)
def port_add(self, interface_name, port_number):
"""
@@ -208,7 +243,7 @@
@param port_number The port to send the data to
@param packet Raw packet data to send to port
"""
- self.dbg(oft_config.DEBUG_VERBOSE,
+ self.dbg(DEBUG_VERBOSE,
"Sending %d bytes to port %d" % (len(packet), port_number))
bytes = self.port_list[port_number].send(packet)
if bytes != len(packet):
@@ -232,7 +267,8 @@
def packet_get(self, port_number=None):
"""
Get a packet from the data plane
- If port_number is given, get the packet from that port.
+
+ If port_number is given, get the oldest packet from that port.
Otherwise, find the port with the oldest packet and return
that packet.
@param port_number If set, get packet from this port
@@ -241,13 +277,14 @@
"""
if port_number:
- if self.port_list[port_number].packets_pending != 0:
+ if len(self.port_list[port_number].packets) != 0:
pkt, time = self.port_list[port_number].dequeue()
return port_number, pkt, time
else:
return None, None, None
# Find port with oldest packet
+ #@todo Consider using a single queue for all ports
min_time = 0
min_port = -1
for port_number in self.port_list.keys():
@@ -263,18 +300,18 @@
pkt, time = self.port_list[min_port].dequeue()
return min_port, pkt, time
- def kill(self, join_threads=True):
+ def kill(self, join_threads=False):
"""
Close all sockets for dataplane
- @param join_threads If True (default) call join on each thread
+ @param join_threads If True call join on each thread
"""
for port_number in self.port_list.keys():
self.port_list[port_number].kill()
if join_threads:
- self.dbg(oft_config.DEBUG_INFO, "Joining ", port_number)
+ self.dbg(DEBUG_INFO, "Joining " + str(port_number))
self.port_list[port_number].join()
- self.dbg(oft_config.DEBUG_INFO, "DataPlane shutdown")
+ self.dbg(DEBUG_INFO, "DataPlane shutdown")
def show(self, prefix=''):
print prefix + "Dataplane Controller"