blob: f74ff206186aa5ce7f487f214c4cf2ca24a9d663 [file] [log] [blame]
Nathan Knuth418fdc82016-09-16 22:51:15 -07001# Copyright 2015, Big Switch Networks, Inc.
2
3"""
4OpenFlow connection class
5
6This class creates a thread which continually parses OpenFlow messages off the
7supplied socket and places them in a queue. The class has methods for reading messages
8from the RX queue, sending messages, and higher level operations like request-response
9and multipart transactions.
10"""
11
12import loxi
13import loxi.of14
14import logging
15import time
16import socket
17import errno
18import os
19import select
20from threading import Condition, Lock, Thread
21
22DEFAULT_TIMEOUT = 1
23
24class TransactionError(Exception):
25 def __str__(self):
26 return self.args[0]
27
28 @property
29 def msg(self):
30 return self.args[1]
31
32class ConnectionClosed(Exception):
33 pass
34
35class Connection(Thread):
36 def __init__(self, sock):
37 Thread.__init__(self)
38 self.sock = sock
39 self.logger = logging.getLogger("connection")
40 self.rx = []
41 self.rx_cv = Condition()
42 self.tx_lock = Lock()
43 self.next_xid = 1
44 self.wakeup_rd, self.wakeup_wr = os.pipe()
45 self.finished = False
46 self.read_buffer = None
47
48 def run(self):
49 while not self.finished:
50 try:
51 rd, wr, err = select.select([self.sock, self.wakeup_rd], [], [])
52 except select.error:
53 print 'SELECT ERROR!!!'
54 if self.sock in rd:
55 try:
56 self.process_read()
57 except ConnectionClosed:
58 self.logger.info("Connection to controller was dropped")
59 self.finished = True
60 if self.wakeup_rd in rd:
61 os.read(self.wakeup_rd, 1)
62 self.logger.debug("Exited event loop")
63
64 def process_read(self):
65 recvd = self.sock.recv(4096)
66 if (len(recvd)) == 0:
67 # this indicates that the other end hang up
68 raise ConnectionClosed()
69
70 self.logger.debug("Received %d bytes", len(recvd))
71
72 buf = self.read_buffer
73 if buf:
74 buf += recvd
75 else:
76 buf = recvd
77
78 offset = 0
79 while offset < len(buf):
80 if offset + 8 > len(buf):
81 # Not enough data for the OpenFlow header
82 break
83
84 # Parse the header to get type
85 hdr_version, hdr_type, hdr_msglen, hdr_xid = loxi.of14.message.parse_header(buf[offset:])
86
87 # Use loxi to resolve ofp of matching version
88 ofp = loxi.protocol(hdr_version)
89
90 # Extract the raw message bytes
91 if (offset + hdr_msglen) > len(buf):
92 # Not enough data for the body
93 break
94 rawmsg = buf[offset : offset + hdr_msglen]
95 offset += hdr_msglen
96
97 msg = ofp.message.parse_message(rawmsg)
98 if not msg:
99 self.logger.warn("Could not parse message")
100 continue
101
102 self.logger.debug("Received message %s.%s xid %d length %d",
103 type(msg).__module__, type(msg).__name__, hdr_xid, hdr_msglen)
104
105 with self.rx_cv:
106 self.rx.append(msg)
107 self.rx_cv.notify_all()
108
109 if offset == len(buf):
110 self.read_buffer = None
111 else:
112 self.read_buffer = buf[offset:]
113 self.logger.debug("%d bytes remaining", len(self.read_buffer))
114
115 def recv(self, predicate, timeout=DEFAULT_TIMEOUT):
116 """
117 Remove and return the first message in the RX queue for
118 which 'predicate' returns true
119 """
120 assert self.is_alive()
121
122 deadline = time.time() + timeout
123 while True:
124 with self.rx_cv:
125 for i, msg in enumerate(self.rx):
126 if predicate(msg):
127 return self.rx.pop(i)
128
129 now = time.time()
130 if now > deadline:
131 return None
132 else:
133 self.rx_cv.wait(deadline - now)
134
135 def recv_any(self, timeout=DEFAULT_TIMEOUT):
136 """
137 Return the first message in the RX queue
138 """
139 return self.recv(lambda msg: True, timeout)
140
141 def recv_xid(self, xid, timeout=DEFAULT_TIMEOUT):
142 """
143 Return the first message in the RX queue with XID 'xid'
144 """
145 return self.recv(lambda msg: msg.xid == xid, timeout)
146
147 def recv_class(self, klass, timeout=DEFAULT_TIMEOUT):
148 """
149 Return the first message in the RX queue which is an instance of 'klass'
150 """
151 return self.recv(lambda msg: isinstance(msg, klass), timeout)
152
153 def send_raw(self, buf):
154 """
155 Send raw bytes on the socket
156 """
157 assert self.is_alive()
158 self.logger.debug("Sending raw message length %d", len(buf))
159 with self.tx_lock:
160 if self.sock.sendall(buf) is not None:
161 raise RuntimeError("failed to send message to switch")
162
163 def send(self, msg):
164 """
165 Send a message
166 """
167 assert self.is_alive()
168
169 if msg.xid is None:
170 msg.xid = self._gen_xid()
171 buf = msg.pack()
172 self.logger.debug("Sending message %s.%s xid %d length %d",
173 type(msg).__module__, type(msg).__name__, msg.xid, len(buf))
174 with self.tx_lock:
175 if self.sock.sendall(buf) is not None:
176 raise RuntimeError("failed to send message to switch")
177
178 def transact(self, msg, timeout=DEFAULT_TIMEOUT):
179 """
180 Send a message and return the reply
181 """
182 self.send(msg)
183 reply = self.recv_xid(msg.xid, timeout)
184 if reply is None:
185 raise TransactionError("no reply for %s" % type(msg).__name__, None)
186 elif isinstance(reply, loxi.protocol(reply.version).message.error_msg):
187 raise TransactionError("received %s in response to %s" % (type(reply).__name__, type(msg).__name__), reply)
188 return reply
189
190 def transact_multipart_generator(self, msg, timeout=DEFAULT_TIMEOUT):
191 """
192 Send a multipart request and yield each entry from the replies
193 """
194 self.send(msg)
195 finished = False
196 while not finished:
197 reply = self.recv_xid(msg.xid, timeout)
198 if reply is None:
199 raise TransactionError("no reply for %s" % type(msg).__name__, None)
200 elif not isinstance(reply, loxi.protocol(reply.version).message.stats_reply):
201 raise TransactionError("received %s in response to %s" % (type(reply).__name__, type(msg).__name__), reply)
202 for entry in reply.entries:
203 yield entry
204 finished = reply.flags & loxi.protocol(reply.version).OFPSF_REPLY_MORE == 0
205
206 def transact_multipart(self, msg, timeout=DEFAULT_TIMEOUT):
207 """
208 Send a multipart request and return all entries from the replies
209 """
210 entries = []
211 for entry in self.transact_multipart_generator(msg, timeout):
212 entries.append(entry)
213 return entries
214
215 def stop(self):
216 """
217 Signal the thread to exit and wait for it
218 """
219 assert not self.finished
220 self.logger.debug("Stopping connection")
221 self.finished = True
222 os.write(self.wakeup_wr, "x")
223 self.join()
224 self.sock.close()
225 os.close(self.wakeup_rd)
226 os.close(self.wakeup_wr)
227 self.logger.debug("Stopped connection")
228
229 def _gen_xid(self):
230 xid = self.next_xid
231 self.next_xid += 1
232 return xid
233
234def connect(ip, port=6653, daemon=True, ofp=loxi.of14):
235 """
236 Actively connect to a switch
237 """
238 soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
239 soc.connect((ip, port))
240 soc.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
241 cxn = Connection(soc)
242 cxn.daemon = daemon
243 cxn.logger.debug("Connected to %s:%d", ip, port)
244 cxn.start()
245
246 cxn.send(ofp.message.hello())
247 if not cxn.recv(lambda msg: msg.type == ofp.OFPT_HELLO):
248 raise Exception("Did not receive HELLO")
249
250 return cxn