Merge pull request #37 from blp/master
ovs-dummy fix
diff --git a/platforms/ovs-dummy.py b/platforms/ovs-dummy.py
index 2dad1a5..a2f1790 100644
--- a/platforms/ovs-dummy.py
+++ b/platforms/ovs-dummy.py
@@ -17,30 +17,25 @@
RCV_TIMEOUT = 10000
RUN_DIR = os.environ.get("OVS_RUNDIR", "/var/run/openvswitch")
-class DataPlanePortOVSDummy(Thread):
+class DataPlanePortOVSDummy:
"""
Class defining a port monitoring object that uses Unix domain
sockets for ports, intended for connecting to Open vSwitch "dummy"
netdevs.
"""
- def __init__(self, interface_name, port_number, parent, max_pkts=1024):
+ def __init__(self, interface_name, port_number, max_pkts=1024):
"""
Set up a port monitor object
@param interface_name The name of the physical interface like eth1
@param port_number The port number associated with this port
- @param parent The controlling dataplane object; for pkt wait CV
@param max_pkts Maximum number of pkts to keep in queue
"""
- Thread.__init__(self)
self.interface_name = interface_name
self.max_pkts = max_pkts
- self.packets_total = 0
- self.packets = []
- self.packets_discarded = 0
self.port_number = port_number
self.txq = []
- self.txq_lock = Lock()
+ self.rxbuf = ""
logname = "dp-" + interface_name
self.logger = logging.getLogger(logname)
try:
@@ -49,7 +44,6 @@
self.logger.info("Could not open socket")
raise
self.logger.info("Opened port monitor (class %s)", type(self).__name__)
- self.parent = parent
@staticmethod
def interface_open(interface_name):
@@ -64,133 +58,65 @@
s.connect("%s/%s" % (RUN_DIR, interface_name))
return s
- def run(self):
- """
- Activity function for class
- """
- self.running = True
- rxbuf = ""
- while self.running:
- try:
- self.txq_lock.acquire()
- if self.txq:
- wlist = [self.socket]
- else:
- wlist = []
- self.txq_lock.release()
-
- rout, wout, eout = select.select([self.socket], wlist, [], 1)
- except:
- print sys.exc_info()
- self.logger.error("Select error, exiting")
- break
-
- if not self.running:
- break
-
- if wout:
- self.txq_lock.acquire()
- if self.txq:
- retval = self.socket.send(self.txq[0])
- if retval > 0:
- self.txq[0] = self.txq[0][retval:]
- if len(self.txq[0]) == 0:
- self.txq = self.txq[1:]
- self.txq_lock.release()
-
- if rout:
- if len(rxbuf) < 2:
- n = 2 - len(rxbuf)
- else:
- frame_len = struct.unpack('>h', rxbuf[:2])[0]
- n = (2 + frame_len) - len(rxbuf)
-
- data = self.socket.recv(n)
- rxbuf += data
- if len(data) == n and len(rxbuf) > 2:
- rcvtime = time.time()
- self.logger.debug("Pkt len " + str(len(rxbuf)) +
- " in at " + str(rcvtime) + " on port " +
- str(self.port_number))
-
- # Enqueue packet
- with self.parent.pkt_sync:
- if len(self.packets) >= self.max_pkts:
- # Queue full, throw away oldest
- self.packets.pop(0)
- self.packets_discarded += 1
- self.logger.debug("Discarding oldest packet to make room")
- self.packets.append((rxbuf[2:], rcvtime))
- self.packets_total += 1
- self.parent.pkt_sync.notify_all()
-
- rxbuf = ''
-
- self.logger.info("Thread exit")
-
- def kill(self):
- """
- Terminate the running thread
- """
- self.logger.debug("Port monitor kill")
- self.running = False
- try:
+ def __del__(self):
+ if self.socket:
self.socket.close()
- except:
- self.logger.info("Ignoring dataplane soc shutdown error")
- def timestamp_head(self):
+ def fileno(self):
"""
- Return the timestamp of the head of queue or None if empty
+ Return an integer file descriptor that can be passed to select(2).
"""
- rv = None
- try:
- rv = self.packets[0][1]
- except:
- rv = None
- return rv
+ return self.socket.fileno()
- def flush(self):
- """
- Clear the packet queue
- """
- with self.parent.pkt_sync:
- self.packets_discarded += len(self.packets)
- self.packets = []
+ def recv(self):
+ while True:
+ rout, wout, eout = select.select([self.socket], [], [], 0)
+ if not rout:
+ return
+
+ if len(self.rxbuf) < 2:
+ n = 2 - len(self.rxbuf)
+ else:
+ frame_len = struct.unpack('>h', self.rxbuf[:2])[0]
+ n = (2 + frame_len) - len(self.rxbuf)
+
+ data = self.socket.recv(n)
+ self.rxbuf += data
+ if len(data) == n and len(self.rxbuf) > 2:
+ rcvtime = time.time()
+ packet = self.rxbuf[2:]
+ self.logger.debug("Pkt len " + str(len(packet)) +
+ " in at " + str(rcvtime) + " on port " +
+ str(self.port_number))
+ self.rxbuf = ""
+ return (packet, rcvtime)
def send(self, packet):
- """
- Send a packet to the dataplane port
- @param packet The packet data to send to the port
- @retval The number of bytes sent
- """
-
- self.txq_lock.acquire()
if len(self.txq) < self.max_pkts:
self.txq.append(struct.pack('>h', len(packet)) + packet)
retval = len(packet)
else:
retval = 0
- self.txq_lock.release()
-
+ self.__run_tx()
return retval
- def register(self, handler):
- """
- Register a callback function to receive packets from this
- port. The callback will be passed the packet, the
- interface name and the port number (if set) on which the
- packet was received.
+ def __run_tx(self):
+ while self.txq:
+ rout, wout, eout = select.select([], [self.socket], [], 0)
+ if not wout:
+ return
- To be implemented
- """
+ retval = self.socket.send(self.txq[0])
+ if retval > 0:
+ self.txq[0] = self.txq[0][retval:]
+ if len(self.txq[0]) == 0:
+ self.txq = self.txq[1:]
+
+ def down(self):
pass
- def show(self, prefix=''):
- print prefix + "Name: " + self.interface_name
- print prefix + "Pkts pending: " + str(len(self.packets))
- print prefix + "Pkts total: " + str(self.packets_total)
- print prefix + "socket: " + str(self.socket)
+ def up(self):
+ pass
# Update this dictionary to suit your environment.
dummy_port_map = {