blob: af8bf842f9f60b8f03de7aa1aa1bac8817a33398 [file] [log] [blame]
# Copyright 2015, Big Switch Networks, Inc.
"""
OpenFlow connection class
This class creates a thread which continually parses OpenFlow messages off the
supplied socket and places them in a queue. The class has methods for reading messages
from the RX queue, sending messages, and higher level operations like request-response
and multipart transactions.
"""
import loxi
import loxi.of14
import logging
import time
import socket
import errno
import os
import select
from threading import Condition, Lock, Thread
DEFAULT_TIMEOUT = 1
class TransactionError(Exception):
def __str__(self):
return self.args[0]
@property
def msg(self):
return self.args[1]
class Connection(Thread):
def __init__(self, sock):
Thread.__init__(self)
self.sock = sock
self.logger = logging.getLogger("connection")
self.rx = []
self.rx_cv = Condition()
self.tx_lock = Lock()
self.next_xid = 1
self.wakeup_rd, self.wakeup_wr = os.pipe()
self.finished = False
self.read_buffer = None
def run(self):
while not self.finished:
rd, wr, err = select.select([self.sock, self.wakeup_rd], [], [])
if self.sock in rd:
self.process_read()
if self.wakeup_rd in rd:
os.read(self.wakeup_rd, 1)
self.logger.debug("Exited event loop")
def process_read(self):
recvd = self.sock.recv(4096)
self.logger.debug("Received %d bytes", len(recvd))
buf = self.read_buffer
if buf:
buf += recvd
else:
buf = recvd
offset = 0
while offset < len(buf):
if offset + 8 > len(buf):
# Not enough data for the OpenFlow header
break
# Parse the header to get type
hdr_version, hdr_type, hdr_msglen, hdr_xid = loxi.of14.message.parse_header(buf[offset:])
# Use loxi to resolve ofp of matching version
ofp = loxi.protocol(hdr_version)
# Extract the raw message bytes
if (offset + hdr_msglen) > len(buf):
# Not enough data for the body
break
rawmsg = buf[offset : offset + hdr_msglen]
offset += hdr_msglen
msg = ofp.message.parse_message(rawmsg)
if not msg:
self.logger.warn("Could not parse message")
continue
self.logger.debug("Received message %s.%s xid %d length %d",
type(msg).__module__, type(msg).__name__, hdr_xid, hdr_msglen)
with self.rx_cv:
self.rx.append(msg)
self.rx_cv.notify_all()
if offset == len(buf):
self.read_buffer = None
else:
self.read_buffer = buf[offset:]
self.logger.debug("%d bytes remaining", len(self.read_buffer))
def recv(self, predicate, timeout=DEFAULT_TIMEOUT):
"""
Remove and return the first message in the RX queue for
which 'predicate' returns true
"""
assert self.is_alive()
deadline = time.time() + timeout
while True:
with self.rx_cv:
for i, msg in enumerate(self.rx):
if predicate(msg):
return self.rx.pop(i)
now = time.time()
if now > deadline:
return None
else:
self.rx_cv.wait(deadline - now)
def recv_any(self, timeout=DEFAULT_TIMEOUT):
"""
Return the first message in the RX queue
"""
return self.recv(lambda msg: True, timeout)
def recv_xid(self, xid, timeout=DEFAULT_TIMEOUT):
"""
Return the first message in the RX queue with XID 'xid'
"""
return self.recv(lambda msg: msg.xid == xid, timeout)
def recv_class(self, klass, timeout=DEFAULT_TIMEOUT):
"""
Return the first message in the RX queue which is an instance of 'klass'
"""
return self.recv(lambda msg: isinstance(msg, klass), timeout)
def send_raw(self, buf):
"""
Send raw bytes on the socket
"""
assert self.is_alive()
self.logger.debug("Sending raw message length %d", len(buf))
with self.tx_lock:
if self.sock.sendall(buf) is not None:
raise RuntimeError("failed to send message to switch")
def send(self, msg):
"""
Send a message
"""
assert self.is_alive()
if msg.xid is None:
msg.xid = self._gen_xid()
buf = msg.pack()
self.logger.debug("Sending message %s.%s xid %d length %d",
type(msg).__module__, type(msg).__name__, msg.xid, len(buf))
with self.tx_lock:
if self.sock.sendall(buf) is not None:
raise RuntimeError("failed to send message to switch")
def transact(self, msg, timeout=DEFAULT_TIMEOUT):
"""
Send a message and return the reply
"""
self.send(msg)
reply = self.recv_xid(msg.xid, timeout)
if reply is None:
raise TransactionError("no reply for %s" % type(msg).__name__, None)
elif isinstance(reply, loxi.protocol(reply.version).message.error_msg):
raise TransactionError("received %s in response to %s" % (type(reply).__name__, type(msg).__name__), reply)
return reply
def transact_multipart_generator(self, msg, timeout=DEFAULT_TIMEOUT):
"""
Send a multipart request and yield each entry from the replies
"""
self.send(msg)
finished = False
while not finished:
reply = self.recv_xid(msg.xid, timeout)
if reply is None:
raise TransactionError("no reply for %s" % type(msg).__name__, None)
elif not isinstance(reply, loxi.protocol(reply.version).message.stats_reply):
raise TransactionError("received %s in response to %s" % (type(reply).__name__, type(msg).__name__), reply)
for entry in reply.entries:
yield entry
finished = reply.flags & loxi.protocol(reply.version).OFPSF_REPLY_MORE == 0
def transact_multipart(self, msg, timeout=DEFAULT_TIMEOUT):
"""
Send a multipart request and return all entries from the replies
"""
entries = []
for entry in self.transact_multipart_generator(msg, timeout):
entries.append(entry)
return entries
def stop(self):
"""
Signal the thread to exit and wait for it
"""
assert not self.finished
self.logger.debug("Stopping connection")
self.finished = True
os.write(self.wakeup_wr, "x")
self.join()
self.sock.close()
os.close(self.wakeup_rd)
os.close(self.wakeup_wr)
self.logger.debug("Stopped connection")
def _gen_xid(self):
xid = self.next_xid
self.next_xid += 1
return xid
def connect(ip, port=6653, daemon=True, ofp=loxi.of14):
"""
Actively connect to a switch
"""
soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
soc.connect((ip, port))
soc.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
cxn = Connection(soc)
cxn.daemon = daemon
cxn.logger.debug("Connected to %s:%d", ip, port)
cxn.start()
cxn.send(ofp.message.hello())
if not cxn.recv(lambda msg: msg.type == ofp.OFPT_HELLO):
raise Exception("Did not receive HELLO")
return cxn