blob: fc61161c4c81461ae335870e884bd6c1b2d3024c [file] [log] [blame]
Stephane Barbarie6e1bd502018-11-05 22:44:45 -05001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# Copyright 2015, Big Switch Networks, Inc.
15
16"""
17OpenFlow connection class
18
19This class creates a thread which continually parses OpenFlow messages off the
20supplied socket and places them in a queue. The class has methods for reading messages
21from the RX queue, sending messages, and higher level operations like request-response
22and multipart transactions.
23"""
24
25import loxi
26import loxi.of14
27import logging
28import time
29import socket
30import errno
31import os
32import select
33from threading import Condition, Lock, Thread
34
35DEFAULT_TIMEOUT = 1
36
37class TransactionError(Exception):
38 def __str__(self):
39 return self.args[0]
40
41 @property
42 def msg(self):
43 return self.args[1]
44
45class ConnectionClosed(Exception):
46 pass
47
48class Connection(Thread):
49 def __init__(self, sock):
50 Thread.__init__(self)
51 self.sock = sock
52 self.logger = logging.getLogger("connection")
53 self.rx = []
54 self.rx_cv = Condition()
55 self.tx_lock = Lock()
56 self.next_xid = 1
57 self.wakeup_rd, self.wakeup_wr = os.pipe()
58 self.finished = False
59 self.read_buffer = None
60
61 def start(self):
62 while not self.finished:
63 try:
64 rd, wr, err = select.select([self.sock, self.wakeup_rd], [], [])
65 except select.error:
66 print 'SELECT ERROR!!!'
67 if self.sock in rd:
68 try:
69 self.process_read()
70 except ConnectionClosed:
71 self.logger.info("Connection to controller was dropped")
72 self.finished = True
73 if self.wakeup_rd in rd:
74 os.read(self.wakeup_rd, 1)
75 self.logger.debug("Exited event loop")
76
77 def process_read(self):
78 recvd = self.sock.recv(4096)
79 if (len(recvd)) == 0:
80 # this indicates that the other end hang up
81 raise ConnectionClosed()
82
83 self.logger.debug("Received %d bytes", len(recvd))
84
85 buf = self.read_buffer
86 if buf:
87 buf += recvd
88 else:
89 buf = recvd
90
91 offset = 0
92 while offset < len(buf):
93 if offset + 8 > len(buf):
94 # Not enough data for the OpenFlow header
95 break
96
97 # Parse the header to get type
98 hdr_version, hdr_type, hdr_msglen, hdr_xid = loxi.of14.message.parse_header(buf[offset:])
99
100 # Use loxi to resolve ofp of matching version
101 ofp = loxi.protocol(hdr_version)
102
103 # Extract the raw message bytes
104 if (offset + hdr_msglen) > len(buf):
105 # Not enough data for the body
106 break
107 rawmsg = buf[offset : offset + hdr_msglen]
108 offset += hdr_msglen
109
110 msg = ofp.message.parse_message(rawmsg)
111 if not msg:
112 self.logger.warn("Could not parse message")
113 continue
114
115 self.logger.debug("Received message %s.%s xid %d length %d",
116 type(msg).__module__, type(msg).__name__, hdr_xid, hdr_msglen)
117
118 with self.rx_cv:
119 self.rx.append(msg)
120 self.rx_cv.notify_all()
121
122 if offset == len(buf):
123 self.read_buffer = None
124 else:
125 self.read_buffer = buf[offset:]
126 self.logger.debug("%d bytes remaining", len(self.read_buffer))
127
128 def recv(self, predicate, timeout=DEFAULT_TIMEOUT):
129 """
130 Remove and return the first message in the RX queue for
131 which 'predicate' returns true
132 """
133 assert self.is_alive()
134
135 deadline = time.time() + timeout
136 while True:
137 with self.rx_cv:
138 for i, msg in enumerate(self.rx):
139 if predicate(msg):
140 return self.rx.pop(i)
141
142 now = time.time()
143 if now > deadline:
144 return None
145 else:
146 self.rx_cv.wait(deadline - now)
147
148 def recv_any(self, timeout=DEFAULT_TIMEOUT):
149 """
150 Return the first message in the RX queue
151 """
152 return self.recv(lambda msg: True, timeout)
153
154 def recv_xid(self, xid, timeout=DEFAULT_TIMEOUT):
155 """
156 Return the first message in the RX queue with XID 'xid'
157 """
158 return self.recv(lambda msg: msg.xid == xid, timeout)
159
160 def recv_class(self, klass, timeout=DEFAULT_TIMEOUT):
161 """
162 Return the first message in the RX queue which is an instance of 'klass'
163 """
164 return self.recv(lambda msg: isinstance(msg, klass), timeout)
165
166 def send_raw(self, buf):
167 """
168 Send raw bytes on the socket
169 """
170 assert self.is_alive()
171 self.logger.debug("Sending raw message length %d", len(buf))
172 with self.tx_lock:
173 if self.sock.sendall(buf) is not None:
174 raise RuntimeError("failed to send message to switch")
175
176 def send(self, msg):
177 """
178 Send a message
179 """
180 assert self.is_alive()
181
182 if msg.xid is None:
183 msg.xid = self._gen_xid()
184 buf = msg.pack()
185 self.logger.debug("Sending message %s.%s xid %d length %d",
186 type(msg).__module__, type(msg).__name__, msg.xid, len(buf))
187 with self.tx_lock:
188 if self.sock.sendall(buf) is not None:
189 raise RuntimeError("failed to send message to switch")
190
191 def transact(self, msg, timeout=DEFAULT_TIMEOUT):
192 """
193 Send a message and return the reply
194 """
195 self.send(msg)
196 reply = self.recv_xid(msg.xid, timeout)
197 if reply is None:
198 raise TransactionError("no reply for %s" % type(msg).__name__, None)
199 elif isinstance(reply, loxi.protocol(reply.version).message.error_msg):
200 raise TransactionError("received %s in response to %s" % (type(reply).__name__, type(msg).__name__), reply)
201 return reply
202
203 def transact_multipart_generator(self, msg, timeout=DEFAULT_TIMEOUT):
204 """
205 Send a multipart request and yield each entry from the replies
206 """
207 self.send(msg)
208 finished = False
209 while not finished:
210 reply = self.recv_xid(msg.xid, timeout)
211 if reply is None:
212 raise TransactionError("no reply for %s" % type(msg).__name__, None)
213 elif not isinstance(reply, loxi.protocol(reply.version).message.stats_reply):
214 raise TransactionError("received %s in response to %s" % (type(reply).__name__, type(msg).__name__), reply)
215 for entry in reply.entries:
216 yield entry
217 finished = reply.flags & loxi.protocol(reply.version).OFPSF_REPLY_MORE == 0
218
219 def transact_multipart(self, msg, timeout=DEFAULT_TIMEOUT):
220 """
221 Send a multipart request and return all entries from the replies
222 """
223 entries = []
224 for entry in self.transact_multipart_generator(msg, timeout):
225 entries.append(entry)
226 return entries
227
228 def stop(self):
229 """
230 Signal the thread to exit and wait for it
231 """
232 assert not self.finished
233 self.logger.debug("Stopping connection")
234 self.finished = True
235 os.write(self.wakeup_wr, "x")
236 self.join()
237 self.sock.close()
238 os.close(self.wakeup_rd)
239 os.close(self.wakeup_wr)
240 self.logger.debug("Stopped connection")
241
242 def _gen_xid(self):
243 xid = self.next_xid
244 self.next_xid += 1
245 return xid
246
247def connect(ip, port=6653, daemon=True, ofp=loxi.of14):
248 """
249 Actively connect to a switch
250 """
251 soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
252 soc.connect((ip, port))
253 soc.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
254 cxn = Connection(soc)
255 cxn.daemon = daemon
256 cxn.logger.debug("Connected to %s:%d", ip, port)
257 cxn.start()
258
259 cxn.send(ofp.message.hello())
260 if not cxn.recv(lambda msg: msg.type == ofp.OFPT_HELLO):
261 raise Exception("Did not receive HELLO")
262
263 return cxn