blob: af8bf842f9f60b8f03de7aa1aa1bac8817a33398 [file] [log] [blame]
Rich Lane51168312015-06-25 13:13:54 -07001# Copyright 2015, Big Switch Networks, Inc.
2
3"""
4OpenFlow connection class
5
6This class creates a thread which continually parses OpenFlow messages off the
7supplied socket and places them in a queue. The class has methods for reading messages
8from the RX queue, sending messages, and higher level operations like request-response
9and multipart transactions.
10"""
11
12import loxi
13import loxi.of14
14import logging
15import time
16import socket
17import errno
18import os
19import select
20from threading import Condition, Lock, Thread
21
22DEFAULT_TIMEOUT = 1
23
24class TransactionError(Exception):
25 def __str__(self):
26 return self.args[0]
27
28 @property
29 def msg(self):
30 return self.args[1]
31
32class Connection(Thread):
33 def __init__(self, sock):
34 Thread.__init__(self)
35 self.sock = sock
36 self.logger = logging.getLogger("connection")
37 self.rx = []
38 self.rx_cv = Condition()
39 self.tx_lock = Lock()
40 self.next_xid = 1
41 self.wakeup_rd, self.wakeup_wr = os.pipe()
42 self.finished = False
43 self.read_buffer = None
44
45 def run(self):
46 while not self.finished:
47 rd, wr, err = select.select([self.sock, self.wakeup_rd], [], [])
48 if self.sock in rd:
49 self.process_read()
50 if self.wakeup_rd in rd:
51 os.read(self.wakeup_rd, 1)
52 self.logger.debug("Exited event loop")
53
54 def process_read(self):
55 recvd = self.sock.recv(4096)
56
57 self.logger.debug("Received %d bytes", len(recvd))
58
59 buf = self.read_buffer
60 if buf:
61 buf += recvd
62 else:
63 buf = recvd
64
65 offset = 0
66 while offset < len(buf):
67 if offset + 8 > len(buf):
68 # Not enough data for the OpenFlow header
69 break
70
71 # Parse the header to get type
72 hdr_version, hdr_type, hdr_msglen, hdr_xid = loxi.of14.message.parse_header(buf[offset:])
73
74 # Use loxi to resolve ofp of matching version
75 ofp = loxi.protocol(hdr_version)
76
77 # Extract the raw message bytes
78 if (offset + hdr_msglen) > len(buf):
79 # Not enough data for the body
80 break
81 rawmsg = buf[offset : offset + hdr_msglen]
82 offset += hdr_msglen
83
84 msg = ofp.message.parse_message(rawmsg)
85 if not msg:
86 self.logger.warn("Could not parse message")
87 continue
88
89 self.logger.debug("Received message %s.%s xid %d length %d",
90 type(msg).__module__, type(msg).__name__, hdr_xid, hdr_msglen)
91
92 with self.rx_cv:
93 self.rx.append(msg)
94 self.rx_cv.notify_all()
95
96 if offset == len(buf):
97 self.read_buffer = None
98 else:
99 self.read_buffer = buf[offset:]
100 self.logger.debug("%d bytes remaining", len(self.read_buffer))
101
102 def recv(self, predicate, timeout=DEFAULT_TIMEOUT):
103 """
104 Remove and return the first message in the RX queue for
105 which 'predicate' returns true
106 """
107 assert self.is_alive()
108
109 deadline = time.time() + timeout
110 while True:
111 with self.rx_cv:
112 for i, msg in enumerate(self.rx):
113 if predicate(msg):
114 return self.rx.pop(i)
115
116 now = time.time()
117 if now > deadline:
118 return None
119 else:
120 self.rx_cv.wait(deadline - now)
121
122 def recv_any(self, timeout=DEFAULT_TIMEOUT):
123 """
124 Return the first message in the RX queue
125 """
126 return self.recv(lambda msg: True, timeout)
127
128 def recv_xid(self, xid, timeout=DEFAULT_TIMEOUT):
129 """
130 Return the first message in the RX queue with XID 'xid'
131 """
132 return self.recv(lambda msg: msg.xid == xid, timeout)
133
134 def recv_class(self, klass, timeout=DEFAULT_TIMEOUT):
135 """
136 Return the first message in the RX queue which is an instance of 'klass'
137 """
138 return self.recv(lambda msg: isinstance(msg, klass), timeout)
139
140 def send_raw(self, buf):
141 """
142 Send raw bytes on the socket
143 """
144 assert self.is_alive()
145 self.logger.debug("Sending raw message length %d", len(buf))
146 with self.tx_lock:
147 if self.sock.sendall(buf) is not None:
148 raise RuntimeError("failed to send message to switch")
149
150 def send(self, msg):
151 """
152 Send a message
153 """
154 assert self.is_alive()
155
156 if msg.xid is None:
157 msg.xid = self._gen_xid()
158 buf = msg.pack()
159 self.logger.debug("Sending message %s.%s xid %d length %d",
160 type(msg).__module__, type(msg).__name__, msg.xid, len(buf))
161 with self.tx_lock:
162 if self.sock.sendall(buf) is not None:
163 raise RuntimeError("failed to send message to switch")
164
165 def transact(self, msg, timeout=DEFAULT_TIMEOUT):
166 """
167 Send a message and return the reply
168 """
169 self.send(msg)
170 reply = self.recv_xid(msg.xid, timeout)
171 if reply is None:
172 raise TransactionError("no reply for %s" % type(msg).__name__, None)
173 elif isinstance(reply, loxi.protocol(reply.version).message.error_msg):
174 raise TransactionError("received %s in response to %s" % (type(reply).__name__, type(msg).__name__), reply)
175 return reply
176
177 def transact_multipart_generator(self, msg, timeout=DEFAULT_TIMEOUT):
178 """
179 Send a multipart request and yield each entry from the replies
180 """
181 self.send(msg)
182 finished = False
183 while not finished:
184 reply = self.recv_xid(msg.xid, timeout)
185 if reply is None:
186 raise TransactionError("no reply for %s" % type(msg).__name__, None)
187 elif not isinstance(reply, loxi.protocol(reply.version).message.stats_reply):
188 raise TransactionError("received %s in response to %s" % (type(reply).__name__, type(msg).__name__), reply)
189 for entry in reply.entries:
190 yield entry
191 finished = reply.flags & loxi.protocol(reply.version).OFPSF_REPLY_MORE == 0
192
193 def transact_multipart(self, msg, timeout=DEFAULT_TIMEOUT):
194 """
195 Send a multipart request and return all entries from the replies
196 """
197 entries = []
198 for entry in self.transact_multipart_generator(msg, timeout):
199 entries.append(entry)
200 return entries
201
202 def stop(self):
203 """
204 Signal the thread to exit and wait for it
205 """
206 assert not self.finished
207 self.logger.debug("Stopping connection")
208 self.finished = True
209 os.write(self.wakeup_wr, "x")
210 self.join()
211 self.sock.close()
212 os.close(self.wakeup_rd)
213 os.close(self.wakeup_wr)
214 self.logger.debug("Stopped connection")
215
216 def _gen_xid(self):
217 xid = self.next_xid
218 self.next_xid += 1
219 return xid
220
221def connect(ip, port=6653, daemon=True, ofp=loxi.of14):
222 """
223 Actively connect to a switch
224 """
225 soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
226 soc.connect((ip, port))
227 soc.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
228 cxn = Connection(soc)
229 cxn.daemon = daemon
230 cxn.logger.debug("Connected to %s:%d", ip, port)
231 cxn.start()
232
233 cxn.send(ofp.message.hello())
234 if not cxn.recv(lambda msg: msg.type == ofp.OFPT_HELLO):
235 raise Exception("Did not receive HELLO")
236
237 return cxn