Lots of reorg and wrestling with sockets
diff --git a/src/python/oftest/dataplane.py b/src/python/oftest/dataplane.py
index 0462f5d..89cf073 100644
--- a/src/python/oftest/dataplane.py
+++ b/src/python/oftest/dataplane.py
@@ -21,6 +21,7 @@
import netutils
from threading import Thread
from threading import Lock
+import oft_config
#@todo Move these identifiers into config
ETH_P_ALL = 0x03
@@ -47,14 +48,20 @@
"""
Thread.__init__(self)
self.interface_name = interface_name
+ self.debug_level = oft_config.debug_level_default
self.max_pkts = max_pkts
self.packets_pending = 0
self.packets_total = 0
self.packets = []
self.packet_times = []
+ self.packets_discarded = 0
self.sync = Lock()
self.socket = self.interface_open(interface_name)
- print "Openned port monitor socket " + interface_name
+ self.dbg(oft_config.DEBUG_INFO,
+ "Openned port monitor socket " + interface_name)
+
+ def dbg(self, level, string):
+ oft_config.debug_log("DPLANE", self.debug_level, level, string)
def interface_open(self, interface_name):
"""
@@ -65,18 +72,10 @@
s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
socket.htons(ETH_P_ALL))
s.bind((interface_name, 0))
- promisc.set_promisc(s, interface_name)
+ netutils.set_promisc(s, interface_name)
s.settimeout(RCV_TIMEOUT)
return s
- def kill(self):
- """
- Terminate the running thread
- """
- self.running = False
- self.socket.close()
- print "Port monitor for " + self.interface_name + " exiting"
-
def run(self):
"""
Activity function for class
@@ -85,23 +84,41 @@
while self.running:
try:
rcvmsg = self.socket.recv(RCV_SIZE)
- rcvtime = time.clock()
-
- self.sync.acquire()
- self.packets.append(rcvmsg)
- self.packet_times.append(rcvtime)
- self.packets_pending += 1
- self.packets_total += 1
- self.sync.release()
-
- except socket.timeout:
- print "Socket timeout for " + self.interface_name
except socket.error:
- print "Socket closed for " + self.interface_name
- if self.running:
- self.kill()
+ self.dbg(DEBUG_INFO, "Socket error for " +
+ self.interface_name)
+ continue
+ if len(rcvmsg) == 0:
+ self.dbg(DEBUG_INFO, "Zero len pkt on " + self.interface_name)
+ self.kill()
break
+ rcvtime = time.clock()
+
+ self.sync.acquire()
+ if len(self.packets) >= self.max_pkts:
+ self.packets.pop(0)
+ self.packets_discarded += 1
+ self.packets.append(rcvmsg)
+ self.packet_times.append(rcvtime)
+ self.packets_pending += 1
+ self.packets_total += 1
+ self.sync.release()
+
+ self.dbg(DEBUG_INFO, "Thread exit for " + self.interface_name)
+
+ def kill(self):
+ """
+ Terminate the running thread
+ """
+ self.running = False
+ try:
+ self.socket.close()
+ except:
+ self.dbg(DEBUG_INFO, "Ignoring dataplane soc shutdown error")
+ self.dbg(oft_config.DEBUG_INFO,
+ "Port monitor for " + self.interface_name + " exiting")
+
def dequeue(self):
"""
Get the oldest packet in the queue
@@ -126,6 +143,7 @@
Clear the packet queue
"""
self.sync.acquire()
+ self.packets_discarded += len(self.packets)
self.packets = []
self.packet_times = []
self.packets_pending = 0
@@ -138,6 +156,8 @@
@param packet The packet data to send to the port
@retval The number of bytes sent
"""
+ self.dbg(oft_config.DEBUG_VERBOSE,
+ "port sending " + str(len(packet)) + " bytes")
return self.socket.send(packet)
@@ -152,6 +172,12 @@
"""
pass
+ def show(self, prefix=''):
+ print prefix + "Name: " + self.interface_name
+ print prefix + "Pkts pending: " + str(self.packets_pending)
+ print prefix + "Pkts total: " + str(self.packets_total)
+ print prefix + "socket: " + str(self.socket)
+
class DataPlane:
"""
@@ -160,6 +186,10 @@
"""
def __init__(self):
self.port_list = {}
+ self.debug_level = oft_config.debug_level_default
+
+ def dbg(self, level, string):
+ oft_config.debug_log("DPORT", self.debug_level, level, string)
def port_add(self, interface_name, port_number):
"""
@@ -178,10 +208,13 @@
@param port_number The port to send the data to
@param packet Raw packet data to send to port
"""
+ self.dbg(oft_config.DEBUG_VERBOSE,
+ "Sending %d bytes to port %d" % (len(packet), port_number))
bytes = self.port_list[port_number].send(packet)
if bytes != len(packet):
- print "Unhandled send error, length mismatch %d != %d" % \
- (bytes, len(packet))
+ self.dbg(DEBUG_ERROR,"Unhandled send error, " +
+ "length mismatch %d != %d" %
+ (bytes, len(packet)))
return bytes
def flood(self, packet):
@@ -192,9 +225,9 @@
for port_number in self.port_list.keys():
bytes = self.port_list[port_number].send(packet)
if bytes != len(packet):
- print "Unhandled send error" + \
- ", port %d, length mismatch %d != %d" % \
- (port_number, bytes, len(packet))
+ self.dbg(DEBUG_ERROR, "Unhandled send error" +
+ ", port %d, length mismatch %d != %d" %
+ (port_number, bytes, len(packet)))
def packet_get(self, port_number=None):
"""
@@ -230,8 +263,22 @@
pkt, time = self.port_list[min_port].dequeue()
return min_port, pkt, time
- def kill(self):
+ def kill(self, join_threads=True):
+ """
+ Close all sockets for dataplane
+ @param join_threads If True (default) call join on each thread
+ """
for port_number in self.port_list.keys():
self.port_list[port_number].kill()
+ if join_threads:
+ self.dbg(oft_config.DEBUG_INFO, "Joining ", port_number)
+ self.port_list[port_number].join()
- print "DataPlane shutdown"
+ self.dbg(oft_config.DEBUG_INFO, "DataPlane shutdown")
+
+ def show(self, prefix=''):
+ print prefix + "Dataplane Controller"
+ for pnum, port in self.port_list.items():
+ print prefix + "OpenFlow Port Number " + str(pnum)
+ port.show(prefix + ' ')
+