blob: 0b89ee6a9cbe3ef9c4e348f7c748e019e8dde71c [file] [log] [blame]
Dan Talayco34089522010-02-07 23:07:41 -08001"""
2OpenFlow Test Framework
3
4Controller class
5
6Provide the interface to the control channel to the switch under test.
7
8Class inherits from thread so as to run in background allowing
9asynchronous callbacks (if needed, not required). Also supports
10polling.
11
12The controller thread maintains a queue. Incoming messages that
13are not handled by a callback function are placed in this queue for
14poll calls.
15
16Callbacks and polling support specifying the message type
17
18@todo Support transaction semantics via xid
Dan Talayco21c75c72010-02-12 22:59:24 -080019@todo Set up reasonable logging facility
Dan Talayco1b3f6902010-02-15 14:14:19 -080020@todo Support select and listen on an administrative socket (or
21use a timeout to support clean shutdown).
22
23Currently only one connection is accepted during the life of
24the controller. There seems
25to be no clean way to interrupt an accept call. Using select that also listens
26on an administrative socket and can shut down the socket might work.
27
Dan Talayco34089522010-02-07 23:07:41 -080028"""
29
Dan Talayco21c75c72010-02-12 22:59:24 -080030from oft_config import *
Dan Talayco34089522010-02-07 23:07:41 -080031import os
32import socket
33import time
Dan Talayco34089522010-02-07 23:07:41 -080034from threading import Thread
35from threading import Lock
Dan Talayco21c75c72010-02-12 22:59:24 -080036from threading import Condition
Dan Talayco34089522010-02-07 23:07:41 -080037from message import *
Dan Talaycoe37999f2010-02-09 15:27:12 -080038from parse import *
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080039from ofutils import *
Dan Talayco710438c2010-02-18 15:16:07 -080040# For some reason, it seems select to be last (or later).
41# Otherwise get an attribute error when calling select.select
42import select
Dan Talayco34089522010-02-07 23:07:41 -080043
44class Controller(Thread):
45 """
46 Class abstracting the control interface to the switch.
47
48 For receiving messages, two mechanism will be implemented. First,
49 query the interface with poll. Second, register to have a
50 function called by message type. The callback is passed the
51 message type as well as the raw packet (or message object)
52
53 One of the main purposes of this object is to translate between network
54 and host byte order. 'Above' this object, things should be in host
55 byte order.
Dan Talayco21c75c72010-02-12 22:59:24 -080056
57 @todo Consider using SocketServer for listening socket
58 @todo Test transaction code
59
60 @var rcv_size The receive size to use for receive calls
61 @var max_pkts The max size of the receive queue
62 @var keep_alive If true, listen for echo requests and respond w/
63 echo replies
Dan Talayco710438c2010-02-18 15:16:07 -080064 @var initial_hello If true, will send a hello message immediately
65 upon connecting to the switch
Dan Talayco21c75c72010-02-12 22:59:24 -080066 @var host The host to use for connect
67 @var port The port to connect on
68 @var packets_total Total number of packets received
69 @var packets_expired Number of packets popped from queue as queue full
70 @var packets_handled Number of packets handled by something
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080071 @var dbg_state Debug indication of state
Dan Talayco34089522010-02-07 23:07:41 -080072 """
73
Dan Talayco21c75c72010-02-12 22:59:24 -080074 def __init__(self, max_pkts=1024):
75 Thread.__init__(self)
Dan Talayco1b3f6902010-02-15 14:14:19 -080076 # Socket related
Dan Talayco21c75c72010-02-12 22:59:24 -080077 self.rcv_size = RCV_SIZE_DEFAULT
Dan Talayco1b3f6902010-02-15 14:14:19 -080078 self.listen_socket = None
79 self.switch_socket = None
80 self.switch_addr = None
Dan Talayco710438c2010-02-18 15:16:07 -080081 self.socs = []
82 self.connect_cv = Condition()
Dan Talayco1b3f6902010-02-15 14:14:19 -080083
84 # Counters
Dan Talayco21c75c72010-02-12 22:59:24 -080085 self.socket_errors = 0
86 self.parse_errors = 0
Dan Talayco21c75c72010-02-12 22:59:24 -080087 self.packets_total = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -080088 self.packets_expired = 0
89 self.packets_handled = 0
90
91 # State
Dan Talayco21c75c72010-02-12 22:59:24 -080092 self.packets = []
93 self.sync = Lock()
94 self.handlers = {}
95 self.keep_alive = False
Dan Talayco710438c2010-02-18 15:16:07 -080096 self.active = True
97 self.initial_hello = True
Dan Talayco1b3f6902010-02-15 14:14:19 -080098
99 # Settings
100 self.max_pkts = max_pkts
101 self.passive = True
Dan Talayco21c75c72010-02-12 22:59:24 -0800102 self.host = controller_host
103 self.port = controller_port
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800104 self.dbg_state = "init"
Dan Talayco1b3f6902010-02-15 14:14:19 -0800105 self.debug_level = debug_level_default
Dan Talayco710438c2010-02-18 15:16:07 -0800106 # self.debug_level = DEBUG_VERBOSE
Dan Talayco1b3f6902010-02-15 14:14:19 -0800107
Dan Talayco21c75c72010-02-12 22:59:24 -0800108 # Transaction variables
109 # xid_cv: Condition variable (semaphore) for transaction
110 # xid: Transaction ID being waited on
111 # xid_response: Transaction response message
112 self.xid_cv = Condition()
113 self.xid = None
114 self.xid_response = None
115
116 def dbg(self, level, string):
117 debug_log("CTRL", self.debug_level, level, string)
118
Dan Talayco710438c2010-02-18 15:16:07 -0800119 def _socket_ready_handle(self, s):
120 """
121 Handle an input-ready socket
122 @param s The socket object that is ready
123 @retval True, reset the switch connection
124 """
125
126 if s == self.listen_socket:
127 if self.switch_socket:
128 self.dbg(DEBUG_ERROR, "Multiple switch cxns not supported")
129 sys.exit(1)
130
131 (self.switch_socket, self.switch_addr) = \
132 self.listen_socket.accept()
133 self.dbg(DEBUG_INFO, "Got cxn to " + str(self.switch_addr))
134 # Notify anyone waiting
135 self.connect_cv.acquire()
136 self.connect_cv.notify()
137 self.connect_cv.release()
138 self.socs.append(self.switch_socket)
139 if self.initial_hello:
140 self.message_send(hello())
141 elif s == self.switch_socket:
142 try:
143 pkt = self.switch_socket.recv(self.rcv_size)
144 except:
145 self.dbg(DEBUG_INFO, "error on switch read")
146 return True
147
148 if not self.active:
149 return False
150
151 if len(pkt) == 0:
152 self.dbg(DEBUG_INFO, "zero-len pkt in")
153 return True
154
155 (handled, msg) = self._pkt_handler_check(pkt)
156 if handled:
157 self.packets_handled += 1
158 return False
159
160 # Not handled, enqueue
161 self.sync.acquire()
162 if len(self.packets) >= self.max_pkts:
163 self.packets.pop(0)
164 self.packets_expired += 1
165 self.packets.append((msg, pkt))
166 self.packets_total += 1
167 self.sync.release()
168 else:
169 self.dbg(DEBUG_ERROR, "Unknown socket ready: " + str(s))
170 return True
171
172 return False
173
Dan Talayco1b3f6902010-02-15 14:14:19 -0800174 def run(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800175 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800176 Activity function for class
Dan Talayco21c75c72010-02-12 22:59:24 -0800177
Dan Talayco1b3f6902010-02-15 14:14:19 -0800178 Assumes connection to switch already exists. Listens on
179 switch_socket for messages until an error (or zero len pkt)
180 occurs.
Dan Talayco21c75c72010-02-12 22:59:24 -0800181
Dan Talayco1b3f6902010-02-15 14:14:19 -0800182 When there is a message on the socket, check for handlers; queue the
183 packet if no one handles the packet.
184
185 See note for controller describing the limitation of a single
186 connection for now.
187 """
188
Dan Talayco710438c2010-02-18 15:16:07 -0800189 self.dbg_state = "starting"
Dan Talayco1b3f6902010-02-15 14:14:19 -0800190
Dan Talayco710438c2010-02-18 15:16:07 -0800191 # Create listen socket
192 self.dbg(DEBUG_INFO, "Create/listen at " + self.host + ":" +
193 str(self.port))
194 self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
195 self.listen_socket.setsockopt(socket.SOL_SOCKET,
196 socket.SO_REUSEADDR, 1)
197 self.listen_socket.bind((self.host, self.port))
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800198 self.dbg_state = "listening"
Dan Talayco21c75c72010-02-12 22:59:24 -0800199 self.listen_socket.listen(LISTEN_QUEUE_SIZE)
Dan Talayco21c75c72010-02-12 22:59:24 -0800200
Dan Talayco710438c2010-02-18 15:16:07 -0800201 self.dbg(DEBUG_INFO, "Waiting for switch connection")
202 self.socs = [self.listen_socket]
203 self.dbg_state = "running"
204 while self.active:
205 reset_switch_cxn = False
206 try:
207 sel_in, sel_out, sel_err = \
208 select.select(self.socs, [], self.socs, 1)
209 except:
210 print sys.exc_info()
211 self.dbg(DEBUG_ERROR, "Select error, exiting")
212 sys.exit(1)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800213
Dan Talayco710438c2010-02-18 15:16:07 -0800214 if not self.active:
215 break
Dan Talayco1b3f6902010-02-15 14:14:19 -0800216
Dan Talayco710438c2010-02-18 15:16:07 -0800217 for s in sel_in:
218 reset_switch_cxn = self._socket_ready_handle(s)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800219
Dan Talayco710438c2010-02-18 15:16:07 -0800220 for s in sel_err:
221 self.dbg(DEBUG_ERROR, "Got socket error on: " + str(s))
222 if s == self.switch_socket:
223 reset_switch_cxn = True
224 else:
225 self.dbg(DEBUG_ERROR, "Socket error; exiting")
226 self.active = False
227 break
228
229 if self.active and reset_switch_cxn:
230 self.dbg(DEBUG_INFO, "Closing switch cxn")
231 try:
232 self.switch_socket.close()
233 except:
234 pass
235 self.switch_socket = None
236 self.socs = self.socs[0:1]
237
238 # End of main loop
239 self.dbg_state = "closing"
240 self.dbg(DEBUG_INFO, "Exiting controller thread")
241 self.shutdown()
242
243 def connect(self, timeout=None):
244 """
245 Connect to the switch
246
247 @param timeout If None, block until connected. If 0, return
248 immedidately. Otherwise, block for up to timeout seconds
249 @return Boolean, True if connected
250 """
251
252 if timeout == 0:
253 return self.switch_socket is not None
254 if self.switch_socket is not None:
255 return True
256 self.connect_cv.acquire()
257 self.connect_cv.wait(timeout)
258 self.connect_cv.release()
259
260 return self.switch_socket is not None
261
262 def kill(self):
263 """
264 Force the controller thread to quit
265
266 Just sets the active state variable to false and expects
267 the select timeout to kick in
268 """
269 self.active = False
Dan Talayco21c75c72010-02-12 22:59:24 -0800270
Dan Talayco1b3f6902010-02-15 14:14:19 -0800271 def shutdown(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800272 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800273 Shutdown the controller closing all sockets
Dan Talayco21c75c72010-02-12 22:59:24 -0800274
Dan Talayco1b3f6902010-02-15 14:14:19 -0800275 @todo Might want to synchronize shutdown with self.sync...
276 """
Dan Talayco710438c2010-02-18 15:16:07 -0800277 self.active = False
278 try:
279 self.switch_socket.shutdown(socket.SHUT_RDWR)
280 except:
281 self.dbg(DEBUG_INFO, "Ignoring switch soc shutdown error")
282 self.switch_socket = None
Dan Talayco1b3f6902010-02-15 14:14:19 -0800283
Dan Talayco710438c2010-02-18 15:16:07 -0800284 try:
285 self.listen_socket.shutdown(socket.SHUT_RDWR)
286 except:
287 self.dbg(DEBUG_INFO, "Ignoring listen soc shutdown error")
288 self.listen_socket = None
289 self.dbg_state = "down"
290
Dan Talayco21c75c72010-02-12 22:59:24 -0800291 def _pkt_handler_check(self, pkt):
292 """
293 Check for packet handling before being enqueued
294
295 This includes checking for an ongoing transaction (see transact())
296 an echo request in case keep_alive is true, followed by
297 registered message handlers.
298 @param pkt The raw packet (string)
299 @return (handled, msg) where handled is a boolean indicating
300 the message was handled; msg if None is the parsed message
301 """
302 # Parse the header to get type
303 hdr = of_header_parse(pkt)
304 if not hdr:
305 self.dbg(DEBUG_INFO, "Could not parse header, pkt len", len(pkt))
306 self.parse_errors += 1
307 return (True, None)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800308 # if self.debug_level <= DEBUG_VERBOSE:
309 # hdr.show()
Dan Talayco21c75c72010-02-12 22:59:24 -0800310
311 self.dbg(DEBUG_VERBOSE, "message: len %d. type %s. hdr.len %d" %
312 (len(pkt), ofp_type_map[hdr.type], hdr.length))
313 msg = of_message_parse(pkt)
314 if not msg:
315 self.parse_errors += 1
316 self.dbg(DEBUG_WARN, "Could not parse message")
317 return (True, None)
318
319 # Check if transaction is waiting
320 self.xid_cv.acquire()
321 if self.xid:
322 if hdr.xid == self.xid:
323 self.xid_response = msg
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800324 self.xid_cv.notify()
Dan Talayco21c75c72010-02-12 22:59:24 -0800325 self.xid_cv.release()
326 return (True, None)
327 self.xid_cv.release()
328
329 # Check if keep alive is set; if so, respond to echo requests
330 if self.keep_alive:
331 if hdr.type == OFPT_ECHO_REQUEST:
332 rep = echo_reply()
333 rep.header.xid = hdr.xid
334 # Ignoring additional data
Dan Talayco710438c2010-02-18 15:16:07 -0800335 self.message_send(rep.pack(), zero_xid=True)
Dan Talayco21c75c72010-02-12 22:59:24 -0800336 return (True, None)
337
338 # Now check for message handlers; preference is given to
339 # handlers for a specific packet
340 handled = False
341 if hdr.type in self.handlers.keys():
342 handled = self.handlers[hdr.type](self, msg, pkt)
343 if not handled and ("all" in self.handlers.keys()):
344 handled = self.handlers["all"](self, msg, pkt)
345
346 return (handled, msg)
347
Dan Talayco34089522010-02-07 23:07:41 -0800348 def register(self, msg_type, handler):
349 """
350 Register a callback to receive a specific message type.
351
352 Only one handler may be registered for a given message type.
353 @param msg_type The type of message to receive. May be DEFAULT
Dan Talayco21c75c72010-02-12 22:59:24 -0800354 for all non-handled packets. The special type, the string "all"
355 will send all packets to the handler.
Dan Talayco34089522010-02-07 23:07:41 -0800356 @param handler The function to call when a message of the given
357 type is received.
358 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800359 # Should check type is valid
360 if not handler and msg_type in self.handlers.keys():
361 del self.handlers[msg_type]
362 return
363 self.handlers[msg_type] = handler
Dan Talayco34089522010-02-07 23:07:41 -0800364
Dan Talayco21c75c72010-02-12 22:59:24 -0800365 def poll(self, exp_msg=None, timeout=None):
Dan Talayco34089522010-02-07 23:07:41 -0800366 """
367 Wait for the next OF message received from the switch.
368
369 @param exp_msg If set, return only when this type of message
370 is received.
Dan Talayco1b3f6902010-02-15 14:14:19 -0800371 @param timeout Not yet supported
Dan Talayco34089522010-02-07 23:07:41 -0800372
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800373 @retval A pair (msg, pkt) where msg is a message object and pkt
374 the string representing the packet as received from the socket.
375 This allows additional parsing by the receiver if necessary.
376
Dan Talayco34089522010-02-07 23:07:41 -0800377 The data members in the message are in host endian order.
Dan Talayco21c75c72010-02-12 22:59:24 -0800378 If an error occurs, None is returned
Dan Talayco34089522010-02-07 23:07:41 -0800379 """
Dan Talayco34089522010-02-07 23:07:41 -0800380
Dan Talayco21c75c72010-02-12 22:59:24 -0800381 # For now do not support time out;
382 if timeout:
Dan Talayco1b3f6902010-02-15 14:14:19 -0800383 self.dbg(DEBUG_WARN, "Poll time out not supported")
Dan Talayco34089522010-02-07 23:07:41 -0800384
Dan Talayco21c75c72010-02-12 22:59:24 -0800385 while len(self.packets) > 0:
386 self.sync.acquire()
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800387 (msg, pkt) = self.packets.pop(0)
Dan Talayco21c75c72010-02-12 22:59:24 -0800388 self.sync.release()
389 if not exp_msg or (exp_msg and (msg.header.type == exp_msg)):
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800390 return msg, pkt
Dan Talayco21c75c72010-02-12 22:59:24 -0800391
Dan Talayco1b3f6902010-02-15 14:14:19 -0800392 return None, None
393
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800394 def transact(self, msg, timeout=None, zero_xid=False):
Dan Talayco34089522010-02-07 23:07:41 -0800395 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800396 Run a message transaction with the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800397
398 Send the message in msg and wait for a reply with a matching
Dan Talayco21c75c72010-02-12 22:59:24 -0800399 transaction id. Transactions have the highest priority in
400 received message handling.
Dan Talaycoe37999f2010-02-09 15:27:12 -0800401
Dan Talayco21c75c72010-02-12 22:59:24 -0800402 @param msg The message object to send; must not be a string
403 @param timeout The timeout in seconds (?)
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800404 @param zero_xid Normally, if the XID is 0 an XID will be generated
405 for the message. Set xero_xid to override this behavior
Dan Talayco21c75c72010-02-12 22:59:24 -0800406 @return The matching message object or None if unsuccessful
Dan Talaycoe37999f2010-02-09 15:27:12 -0800407
Dan Talayco34089522010-02-07 23:07:41 -0800408 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800409
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800410 if not zero_xid and msg.header.xid == 0:
411 msg.header.xid = gen_xid()
412
Dan Talayco21c75c72010-02-12 22:59:24 -0800413 self.xid_cv.acquire()
414 if self.xid:
415 self.xid_cv.release()
416 self.dbg(DEBUG_ERROR,
417 "Can only run one transaction at a time")
418 return None
419
420 self.xid = msg.header.xid
421 self.xid_response = None
422 self.message_send(msg.pack())
423 self.xid_cv.wait(timeout)
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800424 msg = self.xid_response
Dan Talayco21c75c72010-02-12 22:59:24 -0800425 self.xid_response = None
Dan Talayco710438c2010-02-18 15:16:07 -0800426 self.xid = None
Dan Talayco21c75c72010-02-12 22:59:24 -0800427 self.xid_cv.release()
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800428 return msg
Dan Talayco34089522010-02-07 23:07:41 -0800429
Dan Talayco710438c2010-02-18 15:16:07 -0800430 def message_send(self, msg, zero_xid=False):
Dan Talayco34089522010-02-07 23:07:41 -0800431 """
432 Send the message to the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800433
Dan Talayco710438c2010-02-18 15:16:07 -0800434 @param msg A string or OpenFlow message object to be forwarded to
435 the switch.
436 @param zero_xid If msg is an OpenFlow object (not a string) and if
437 the XID in the header is 0, then an XID will be generated
438 for the message. Set xero_xid to override this behavior (and keep an
439 existing 0 xid)
Dan Talaycoe37999f2010-02-09 15:27:12 -0800440
Dan Talayco710438c2010-02-18 15:16:07 -0800441 @return -1 if error, 0 on success
Dan Talayco34089522010-02-07 23:07:41 -0800442
Dan Talayco21c75c72010-02-12 22:59:24 -0800443 """
444
Dan Talayco1b3f6902010-02-15 14:14:19 -0800445 if not self.switch_socket:
446 # Sending a string indicates the message is ready to go
447 self.dbg(DEBUG_INFO, "message_send: no socket")
448 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800449 #@todo If not string, try to pack
Dan Talayco21c75c72010-02-12 22:59:24 -0800450 if type(msg) != type(""):
Dan Talayco710438c2010-02-18 15:16:07 -0800451 try:
452 if msg.header.xid == 0 and not zero_xid:
453 msg.header.xid = gen_xid()
454 outpkt = msg.pack()
455 except:
456 self.dbg(DEBUG_INFO,
457 "message_send: not an OF message or string?")
458 return -1
459 else:
460 outpkt = msg
Dan Talayco21c75c72010-02-12 22:59:24 -0800461
Dan Talayco710438c2010-02-18 15:16:07 -0800462 self.dbg(DEBUG_INFO, "Sending pkt of len " + str(len(outpkt)))
463 if self.switch_socket.sendall(outpkt) is None:
464 return 0
465
466 self.dbg(DEBUG_ERROR, "Unknown error on sendall")
467 return -1
Dan Talayco21c75c72010-02-12 22:59:24 -0800468
469 def __str__(self):
470 string = "Controller:\n"
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800471 string += " state " + self.dbg_state + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800472 string += " switch_addr " + str(self.switch_addr) + "\n"
473 string += " pending pkts " + str(len(self.packets)) + "\n"
474 string += " total pkts " + str(self.packets_total) + "\n"
475 string += " expired pkts " + str(self.packets_expired) + "\n"
476 string += " handled pkts " + str(self.packets_handled) + "\n"
477 string += " parse errors " + str(self.parse_errors) + "\n"
478 string += " sock errrors " + str(self.socket_errors) + "\n"
479 string += " max pkts " + str(self.max_pkts) + "\n"
480 string += " host " + str(self.host) + "\n"
481 string += " port " + str(self.port) + "\n"
482 string += " keep_alive " + str(self.keep_alive) + "\n"
483 return string
484
485 def show(self):
486 print str(self)
487
488def sample_handler(controller, msg, pkt):
489 """
490 Sample message handler
491
492 This is the prototype for functions registered with the controller
493 class for packet reception
494
495 @param controller The controller calling the handler
496 @param msg The parsed message object
497 @param pkt The raw packet that was received on the socket. This is
498 in case the packet contains extra unparsed data.
499 @returns Boolean value indicating if the packet was handled. If
500 not handled, the packet is placed in the queue for pollers to received
501 """
502 pass