blob: 2b1b5afeb139eeac92d81cc0cbda35822459526e [file] [log] [blame]
Matteo Scandoloa229eca2017-08-08 13:05:28 -07001
2# Copyright 2017-present Open Networking Foundation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16
Rich Lane51168312015-06-25 13:13:54 -070017# Copyright 2015, Big Switch Networks, Inc.
18
19"""
20OpenFlow connection class
21
22This class creates a thread which continually parses OpenFlow messages off the
23supplied socket and places them in a queue. The class has methods for reading messages
24from the RX queue, sending messages, and higher level operations like request-response
25and multipart transactions.
26"""
27
28import loxi
29import loxi.of14
30import logging
31import time
32import socket
33import errno
34import os
35import select
36from threading import Condition, Lock, Thread
37
38DEFAULT_TIMEOUT = 1
39
40class TransactionError(Exception):
41 def __str__(self):
42 return self.args[0]
43
44 @property
45 def msg(self):
46 return self.args[1]
47
48class Connection(Thread):
49 def __init__(self, sock):
50 Thread.__init__(self)
51 self.sock = sock
52 self.logger = logging.getLogger("connection")
53 self.rx = []
54 self.rx_cv = Condition()
55 self.tx_lock = Lock()
56 self.next_xid = 1
57 self.wakeup_rd, self.wakeup_wr = os.pipe()
58 self.finished = False
59 self.read_buffer = None
60
61 def run(self):
62 while not self.finished:
63 rd, wr, err = select.select([self.sock, self.wakeup_rd], [], [])
64 if self.sock in rd:
65 self.process_read()
66 if self.wakeup_rd in rd:
67 os.read(self.wakeup_rd, 1)
68 self.logger.debug("Exited event loop")
69
70 def process_read(self):
71 recvd = self.sock.recv(4096)
72
73 self.logger.debug("Received %d bytes", len(recvd))
74
75 buf = self.read_buffer
76 if buf:
77 buf += recvd
78 else:
79 buf = recvd
80
81 offset = 0
82 while offset < len(buf):
83 if offset + 8 > len(buf):
84 # Not enough data for the OpenFlow header
85 break
86
87 # Parse the header to get type
88 hdr_version, hdr_type, hdr_msglen, hdr_xid = loxi.of14.message.parse_header(buf[offset:])
89
90 # Use loxi to resolve ofp of matching version
91 ofp = loxi.protocol(hdr_version)
92
93 # Extract the raw message bytes
94 if (offset + hdr_msglen) > len(buf):
95 # Not enough data for the body
96 break
97 rawmsg = buf[offset : offset + hdr_msglen]
98 offset += hdr_msglen
99
100 msg = ofp.message.parse_message(rawmsg)
101 if not msg:
102 self.logger.warn("Could not parse message")
103 continue
104
105 self.logger.debug("Received message %s.%s xid %d length %d",
106 type(msg).__module__, type(msg).__name__, hdr_xid, hdr_msglen)
107
108 with self.rx_cv:
109 self.rx.append(msg)
110 self.rx_cv.notify_all()
111
112 if offset == len(buf):
113 self.read_buffer = None
114 else:
115 self.read_buffer = buf[offset:]
116 self.logger.debug("%d bytes remaining", len(self.read_buffer))
117
118 def recv(self, predicate, timeout=DEFAULT_TIMEOUT):
119 """
120 Remove and return the first message in the RX queue for
121 which 'predicate' returns true
122 """
123 assert self.is_alive()
124
125 deadline = time.time() + timeout
126 while True:
127 with self.rx_cv:
128 for i, msg in enumerate(self.rx):
129 if predicate(msg):
130 return self.rx.pop(i)
131
132 now = time.time()
133 if now > deadline:
134 return None
135 else:
136 self.rx_cv.wait(deadline - now)
137
138 def recv_any(self, timeout=DEFAULT_TIMEOUT):
139 """
140 Return the first message in the RX queue
141 """
142 return self.recv(lambda msg: True, timeout)
143
144 def recv_xid(self, xid, timeout=DEFAULT_TIMEOUT):
145 """
146 Return the first message in the RX queue with XID 'xid'
147 """
148 return self.recv(lambda msg: msg.xid == xid, timeout)
149
150 def recv_class(self, klass, timeout=DEFAULT_TIMEOUT):
151 """
152 Return the first message in the RX queue which is an instance of 'klass'
153 """
154 return self.recv(lambda msg: isinstance(msg, klass), timeout)
155
156 def send_raw(self, buf):
157 """
158 Send raw bytes on the socket
159 """
160 assert self.is_alive()
161 self.logger.debug("Sending raw message length %d", len(buf))
162 with self.tx_lock:
163 if self.sock.sendall(buf) is not None:
164 raise RuntimeError("failed to send message to switch")
165
166 def send(self, msg):
167 """
168 Send a message
169 """
170 assert self.is_alive()
171
172 if msg.xid is None:
173 msg.xid = self._gen_xid()
174 buf = msg.pack()
175 self.logger.debug("Sending message %s.%s xid %d length %d",
176 type(msg).__module__, type(msg).__name__, msg.xid, len(buf))
177 with self.tx_lock:
178 if self.sock.sendall(buf) is not None:
179 raise RuntimeError("failed to send message to switch")
180
181 def transact(self, msg, timeout=DEFAULT_TIMEOUT):
182 """
183 Send a message and return the reply
184 """
185 self.send(msg)
186 reply = self.recv_xid(msg.xid, timeout)
187 if reply is None:
188 raise TransactionError("no reply for %s" % type(msg).__name__, None)
189 elif isinstance(reply, loxi.protocol(reply.version).message.error_msg):
190 raise TransactionError("received %s in response to %s" % (type(reply).__name__, type(msg).__name__), reply)
191 return reply
192
193 def transact_multipart_generator(self, msg, timeout=DEFAULT_TIMEOUT):
194 """
195 Send a multipart request and yield each entry from the replies
196 """
197 self.send(msg)
198 finished = False
199 while not finished:
200 reply = self.recv_xid(msg.xid, timeout)
201 if reply is None:
202 raise TransactionError("no reply for %s" % type(msg).__name__, None)
203 elif not isinstance(reply, loxi.protocol(reply.version).message.stats_reply):
204 raise TransactionError("received %s in response to %s" % (type(reply).__name__, type(msg).__name__), reply)
205 for entry in reply.entries:
206 yield entry
207 finished = reply.flags & loxi.protocol(reply.version).OFPSF_REPLY_MORE == 0
208
209 def transact_multipart(self, msg, timeout=DEFAULT_TIMEOUT):
210 """
211 Send a multipart request and return all entries from the replies
212 """
213 entries = []
214 for entry in self.transact_multipart_generator(msg, timeout):
215 entries.append(entry)
216 return entries
217
218 def stop(self):
219 """
220 Signal the thread to exit and wait for it
221 """
222 assert not self.finished
223 self.logger.debug("Stopping connection")
224 self.finished = True
225 os.write(self.wakeup_wr, "x")
226 self.join()
227 self.sock.close()
228 os.close(self.wakeup_rd)
229 os.close(self.wakeup_wr)
230 self.logger.debug("Stopped connection")
231
232 def _gen_xid(self):
233 xid = self.next_xid
234 self.next_xid += 1
235 return xid
236
237def connect(ip, port=6653, daemon=True, ofp=loxi.of14):
238 """
239 Actively connect to a switch
240 """
241 soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
242 soc.connect((ip, port))
243 soc.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
244 cxn = Connection(soc)
245 cxn.daemon = daemon
246 cxn.logger.debug("Connected to %s:%d", ip, port)
247 cxn.start()
248
249 cxn.send(ofp.message.hello())
250 if not cxn.recv(lambda msg: msg.type == ofp.OFPT_HELLO):
251 raise Exception("Did not receive HELLO")
252
253 return cxn