blob: 779e42c093f5b13d4cc2533dcefa88ff8ed835af [file] [log] [blame]
Dan Talayco34089522010-02-07 23:07:41 -08001"""
2OpenFlow Test Framework
3
4Controller class
5
6Provide the interface to the control channel to the switch under test.
7
8Class inherits from thread so as to run in background allowing
9asynchronous callbacks (if needed, not required). Also supports
10polling.
11
12The controller thread maintains a queue. Incoming messages that
13are not handled by a callback function are placed in this queue for
14poll calls.
15
16Callbacks and polling support specifying the message type
17
18@todo Support transaction semantics via xid
Dan Talayco21c75c72010-02-12 22:59:24 -080019@todo Set up reasonable logging facility
Dan Talayco1b3f6902010-02-15 14:14:19 -080020@todo Support select and listen on an administrative socket (or
21use a timeout to support clean shutdown).
22
23Currently only one connection is accepted during the life of
24the controller. There seems
25to be no clean way to interrupt an accept call. Using select that also listens
26on an administrative socket and can shut down the socket might work.
27
Dan Talayco34089522010-02-07 23:07:41 -080028"""
29
Dan Talayco21c75c72010-02-12 22:59:24 -080030from oft_config import *
Dan Talayco34089522010-02-07 23:07:41 -080031import os
32import socket
33import time
Dan Talayco34089522010-02-07 23:07:41 -080034from threading import Thread
35from threading import Lock
Dan Talayco21c75c72010-02-12 22:59:24 -080036from threading import Condition
Dan Talayco34089522010-02-07 23:07:41 -080037from message import *
Dan Talaycoe37999f2010-02-09 15:27:12 -080038from parse import *
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080039from ofutils import *
Dan Talayco34089522010-02-07 23:07:41 -080040
41class Controller(Thread):
42 """
43 Class abstracting the control interface to the switch.
44
45 For receiving messages, two mechanism will be implemented. First,
46 query the interface with poll. Second, register to have a
47 function called by message type. The callback is passed the
48 message type as well as the raw packet (or message object)
49
50 One of the main purposes of this object is to translate between network
51 and host byte order. 'Above' this object, things should be in host
52 byte order.
Dan Talayco21c75c72010-02-12 22:59:24 -080053
54 @todo Consider using SocketServer for listening socket
55 @todo Test transaction code
56
57 @var rcv_size The receive size to use for receive calls
58 @var max_pkts The max size of the receive queue
59 @var keep_alive If true, listen for echo requests and respond w/
60 echo replies
61 @var host The host to use for connect
62 @var port The port to connect on
63 @var packets_total Total number of packets received
64 @var packets_expired Number of packets popped from queue as queue full
65 @var packets_handled Number of packets handled by something
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080066 @var dbg_state Debug indication of state
Dan Talayco34089522010-02-07 23:07:41 -080067 """
68
Dan Talayco21c75c72010-02-12 22:59:24 -080069 def __init__(self, max_pkts=1024):
70 Thread.__init__(self)
Dan Talayco1b3f6902010-02-15 14:14:19 -080071 # Socket related
Dan Talayco21c75c72010-02-12 22:59:24 -080072 self.rcv_size = RCV_SIZE_DEFAULT
Dan Talayco1b3f6902010-02-15 14:14:19 -080073 self.listen_socket = None
74 self.switch_socket = None
75 self.switch_addr = None
76
77 # Counters
Dan Talayco21c75c72010-02-12 22:59:24 -080078 self.socket_errors = 0
79 self.parse_errors = 0
Dan Talayco21c75c72010-02-12 22:59:24 -080080 self.packets_total = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -080081 self.packets_expired = 0
82 self.packets_handled = 0
83
84 # State
85 self.connected = False
Dan Talayco21c75c72010-02-12 22:59:24 -080086 self.packets = []
87 self.sync = Lock()
88 self.handlers = {}
89 self.keep_alive = False
Dan Talayco1b3f6902010-02-15 14:14:19 -080090
91 # Settings
92 self.max_pkts = max_pkts
93 self.passive = True
Dan Talayco21c75c72010-02-12 22:59:24 -080094 self.host = controller_host
95 self.port = controller_port
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080096 self.dbg_state = "init"
Dan Talayco1b3f6902010-02-15 14:14:19 -080097 # self.debug_level = DEBUG_VERBOSE
98 self.debug_level = debug_level_default
99
Dan Talayco21c75c72010-02-12 22:59:24 -0800100 # Transaction variables
101 # xid_cv: Condition variable (semaphore) for transaction
102 # xid: Transaction ID being waited on
103 # xid_response: Transaction response message
104 self.xid_cv = Condition()
105 self.xid = None
106 self.xid_response = None
107
108 def dbg(self, level, string):
109 debug_log("CTRL", self.debug_level, level, string)
110
Dan Talayco1b3f6902010-02-15 14:14:19 -0800111 def run(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800112 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800113 Activity function for class
Dan Talayco21c75c72010-02-12 22:59:24 -0800114
Dan Talayco1b3f6902010-02-15 14:14:19 -0800115 Assumes connection to switch already exists. Listens on
116 switch_socket for messages until an error (or zero len pkt)
117 occurs.
Dan Talayco21c75c72010-02-12 22:59:24 -0800118
Dan Talayco1b3f6902010-02-15 14:14:19 -0800119 When there is a message on the socket, check for handlers; queue the
120 packet if no one handles the packet.
121
122 See note for controller describing the limitation of a single
123 connection for now.
124 """
125
126 if not self.switch_socket:
127 self.dbg(DEBUG_ERROR,
128 "Error in controller thread, no switch socket")
129 self.shutdown()
130 return
131
132 # Read and process packets from socket connected to switch
133 while 1:
134 try:
135 pkt = self.switch_socket.recv(self.rcv_size)
136 except socket.error:
137 self.dbg(DEBUG_ERROR, "Controller socket read error")
138 self.socket_errors += 1
139 break
140
141 if len(pkt) == 0:
142 # Considered an error; usually means switch has disconnected
143 self.dbg(DEBUG_INFO, "length 0 pkt in")
144 self.socket_errors += 1
145 break
146
147 if not self.connected:
148 break
149
150 (handled, msg) = self._pkt_handler_check(pkt)
151 if handled:
152 self.packets_handled += 1
153 continue
154
155 # Not handled, enqueue
156 self.sync.acquire()
157 if len(self.packets) >= self.max_pkts:
158 self.packets.pop(0)
159 self.packets_expired += 1
160 self.packets.append((msg, pkt))
161 self.packets_total += 1
162 self.sync.release()
163
164 self.shutdown()
165
166 def connect(self, send_hello=True):
167 """
168 Connect to a switch and start this thread
169
170 Create the listening socket, accept and block until a
171 connection is made to the switch. Then start the local
172 thread and return. Parameters to the call are take from
173 member variables.
174
175 @param send_hello If True, send the initial hello packet on connection
Dan Talayco21c75c72010-02-12 22:59:24 -0800176 @return Boolean where True indicates success
177
Dan Talayco1b3f6902010-02-15 14:14:19 -0800178 If already connected, returns True
179 If the listen socket does not exist, will create it
Dan Talayco21c75c72010-02-12 22:59:24 -0800180 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800181
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800182 self.dbg_state = "connecting"
Dan Talayco21c75c72010-02-12 22:59:24 -0800183 self.dbg(DEBUG_INFO, "open ctl host: >" + str(self.host) + "< port " +
184 str(self.port))
Dan Talayco1b3f6902010-02-15 14:14:19 -0800185 # Create the listening socket
Dan Talayco21c75c72010-02-12 22:59:24 -0800186 if not self.listen_socket:
187 self.listen_socket = socket.socket(socket.AF_INET,
188 socket.SOCK_STREAM)
189 self.listen_socket.setsockopt(socket.SOL_SOCKET,
190 socket.SO_REUSEADDR, 1)
191 self.listen_socket.bind((self.host, self.port))
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800192 self.dbg_state = "listening"
Dan Talayco21c75c72010-02-12 22:59:24 -0800193 self.listen_socket.listen(LISTEN_QUEUE_SIZE)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800194 self.dbg_state = "accepting"
Dan Talayco21c75c72010-02-12 22:59:24 -0800195 (self.switch_socket, self.switch_addr) = self.listen_socket.accept()
196 if not self.switch_socket:
197 self.socket_errors += 1
198 self.dbg(DEBUG_WARN, "Failed on accept")
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800199 self.dbg_state = "error"
Dan Talayco1b3f6902010-02-15 14:14:19 -0800200 self.listen_socket.close()
Dan Talayco21c75c72010-02-12 22:59:24 -0800201 return False
202
203 self.connected = True
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800204 self.dbg_state = "connected"
Dan Talayco21c75c72010-02-12 22:59:24 -0800205 self.dbg(DEBUG_INFO, "Got connection to " + str(self.switch_addr))
Dan Talayco1b3f6902010-02-15 14:14:19 -0800206
207 if send_hello:
208 h = hello()
209 self.message_send(h.pack())
210
211 # Start the background thread
212 self.start()
213
Dan Talayco21c75c72010-02-12 22:59:24 -0800214 return True
215
Dan Talayco1b3f6902010-02-15 14:14:19 -0800216 def shutdown(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800217 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800218 Shutdown the controller closing all sockets
Dan Talayco21c75c72010-02-12 22:59:24 -0800219
Dan Talayco1b3f6902010-02-15 14:14:19 -0800220 @todo Might want to synchronize shutdown with self.sync...
221 """
222 self.connected = False
223 self.dbg_state = "closed"
224 if self.switch_socket:
225 try:
226 self.switch_socket.shutdown(socket.SHUT_RDWR)
227 except:
228 self.dbg(DEBUG_INFO, "Ignoring switch soc shutdown error")
229 self.switch_socket = None
230
231 if self.listen_socket:
232 try:
233 self.listen_socket.shutdown(socket.SHUT_RDWR)
234 except:
235 self.dbg(DEBUG_INFO, "Ignoring listen soc shutdown error")
236 self.listen_socket = None
237
Dan Talayco21c75c72010-02-12 22:59:24 -0800238 def _pkt_handler_check(self, pkt):
239 """
240 Check for packet handling before being enqueued
241
242 This includes checking for an ongoing transaction (see transact())
243 an echo request in case keep_alive is true, followed by
244 registered message handlers.
245 @param pkt The raw packet (string)
246 @return (handled, msg) where handled is a boolean indicating
247 the message was handled; msg if None is the parsed message
248 """
249 # Parse the header to get type
250 hdr = of_header_parse(pkt)
251 if not hdr:
252 self.dbg(DEBUG_INFO, "Could not parse header, pkt len", len(pkt))
253 self.parse_errors += 1
254 return (True, None)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800255 # if self.debug_level <= DEBUG_VERBOSE:
256 # hdr.show()
Dan Talayco21c75c72010-02-12 22:59:24 -0800257
258 self.dbg(DEBUG_VERBOSE, "message: len %d. type %s. hdr.len %d" %
259 (len(pkt), ofp_type_map[hdr.type], hdr.length))
260 msg = of_message_parse(pkt)
261 if not msg:
262 self.parse_errors += 1
263 self.dbg(DEBUG_WARN, "Could not parse message")
264 return (True, None)
265
266 # Check if transaction is waiting
267 self.xid_cv.acquire()
268 if self.xid:
269 if hdr.xid == self.xid:
270 self.xid_response = msg
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800271 self.xid_cv.notify()
Dan Talayco21c75c72010-02-12 22:59:24 -0800272 self.xid_cv.release()
273 return (True, None)
274 self.xid_cv.release()
275
276 # Check if keep alive is set; if so, respond to echo requests
277 if self.keep_alive:
278 if hdr.type == OFPT_ECHO_REQUEST:
279 rep = echo_reply()
280 rep.header.xid = hdr.xid
281 # Ignoring additional data
282 self.message_send(rep.pack())
283 return (True, None)
284
285 # Now check for message handlers; preference is given to
286 # handlers for a specific packet
287 handled = False
288 if hdr.type in self.handlers.keys():
289 handled = self.handlers[hdr.type](self, msg, pkt)
290 if not handled and ("all" in self.handlers.keys()):
291 handled = self.handlers["all"](self, msg, pkt)
292
293 return (handled, msg)
294
Dan Talayco34089522010-02-07 23:07:41 -0800295 def register(self, msg_type, handler):
296 """
297 Register a callback to receive a specific message type.
298
299 Only one handler may be registered for a given message type.
300 @param msg_type The type of message to receive. May be DEFAULT
Dan Talayco21c75c72010-02-12 22:59:24 -0800301 for all non-handled packets. The special type, the string "all"
302 will send all packets to the handler.
Dan Talayco34089522010-02-07 23:07:41 -0800303 @param handler The function to call when a message of the given
304 type is received.
305 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800306 # Should check type is valid
307 if not handler and msg_type in self.handlers.keys():
308 del self.handlers[msg_type]
309 return
310 self.handlers[msg_type] = handler
Dan Talayco34089522010-02-07 23:07:41 -0800311
Dan Talayco21c75c72010-02-12 22:59:24 -0800312 def poll(self, exp_msg=None, timeout=None):
Dan Talayco34089522010-02-07 23:07:41 -0800313 """
314 Wait for the next OF message received from the switch.
315
316 @param exp_msg If set, return only when this type of message
317 is received.
Dan Talayco1b3f6902010-02-15 14:14:19 -0800318 @param timeout Not yet supported
Dan Talayco34089522010-02-07 23:07:41 -0800319
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800320 @retval A pair (msg, pkt) where msg is a message object and pkt
321 the string representing the packet as received from the socket.
322 This allows additional parsing by the receiver if necessary.
323
Dan Talayco34089522010-02-07 23:07:41 -0800324 The data members in the message are in host endian order.
Dan Talayco21c75c72010-02-12 22:59:24 -0800325 If an error occurs, None is returned
Dan Talayco34089522010-02-07 23:07:41 -0800326 """
Dan Talayco34089522010-02-07 23:07:41 -0800327
Dan Talayco21c75c72010-02-12 22:59:24 -0800328 # For now do not support time out;
329 if timeout:
Dan Talayco1b3f6902010-02-15 14:14:19 -0800330 self.dbg(DEBUG_WARN, "Poll time out not supported")
Dan Talayco34089522010-02-07 23:07:41 -0800331
Dan Talayco21c75c72010-02-12 22:59:24 -0800332 while len(self.packets) > 0:
333 self.sync.acquire()
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800334 (msg, pkt) = self.packets.pop(0)
Dan Talayco21c75c72010-02-12 22:59:24 -0800335 self.sync.release()
336 if not exp_msg or (exp_msg and (msg.header.type == exp_msg)):
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800337 return msg, pkt
Dan Talayco21c75c72010-02-12 22:59:24 -0800338
Dan Talayco1b3f6902010-02-15 14:14:19 -0800339 return None, None
340
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800341 def transact(self, msg, timeout=None, zero_xid=False):
Dan Talayco34089522010-02-07 23:07:41 -0800342 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800343 Run a message transaction with the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800344
345 Send the message in msg and wait for a reply with a matching
Dan Talayco21c75c72010-02-12 22:59:24 -0800346 transaction id. Transactions have the highest priority in
347 received message handling.
Dan Talaycoe37999f2010-02-09 15:27:12 -0800348
Dan Talayco21c75c72010-02-12 22:59:24 -0800349 @param msg The message object to send; must not be a string
350 @param timeout The timeout in seconds (?)
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800351 @param zero_xid Normally, if the XID is 0 an XID will be generated
352 for the message. Set xero_xid to override this behavior
Dan Talayco21c75c72010-02-12 22:59:24 -0800353 @return The matching message object or None if unsuccessful
Dan Talaycoe37999f2010-02-09 15:27:12 -0800354
Dan Talayco34089522010-02-07 23:07:41 -0800355 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800356
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800357 if not zero_xid and msg.header.xid == 0:
358 msg.header.xid = gen_xid()
359
Dan Talayco21c75c72010-02-12 22:59:24 -0800360 self.xid_cv.acquire()
361 if self.xid:
362 self.xid_cv.release()
363 self.dbg(DEBUG_ERROR,
364 "Can only run one transaction at a time")
365 return None
366
367 self.xid = msg.header.xid
368 self.xid_response = None
369 self.message_send(msg.pack())
370 self.xid_cv.wait(timeout)
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800371 msg = self.xid_response
Dan Talayco21c75c72010-02-12 22:59:24 -0800372 self.xid_response = None
373 self.xid_cv.release()
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800374 return msg
Dan Talayco34089522010-02-07 23:07:41 -0800375
376 def message_send(self, msg):
377 """
378 Send the message to the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800379
380 @param msg An OpenFlow message object to be forwarded to the switch.
381
Dan Talayco21c75c72010-02-12 22:59:24 -0800382 @return None on success
Dan Talayco34089522010-02-07 23:07:41 -0800383
Dan Talayco21c75c72010-02-12 22:59:24 -0800384 """
385
Dan Talayco1b3f6902010-02-15 14:14:19 -0800386 if not self.switch_socket:
387 # Sending a string indicates the message is ready to go
388 self.dbg(DEBUG_INFO, "message_send: no socket")
389 return -1
390
Dan Talayco21c75c72010-02-12 22:59:24 -0800391 if type(msg) != type(""):
392 # Sending a string indicates the message is ready to go
393 self.dbg(DEBUG_INFO, "message_send requires packet as string")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800394 return -1
Dan Talayco21c75c72010-02-12 22:59:24 -0800395
396 self.dbg(DEBUG_INFO, "Sending pkt of len " + str(len(msg)))
397 return self.switch_socket.sendall(msg)
398
399 def __str__(self):
400 string = "Controller:\n"
401 string += " connected " + str(self.connected) + "\n"
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800402 string += " state " + self.dbg_state + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800403 string += " switch_addr " + str(self.switch_addr) + "\n"
404 string += " pending pkts " + str(len(self.packets)) + "\n"
405 string += " total pkts " + str(self.packets_total) + "\n"
406 string += " expired pkts " + str(self.packets_expired) + "\n"
407 string += " handled pkts " + str(self.packets_handled) + "\n"
408 string += " parse errors " + str(self.parse_errors) + "\n"
409 string += " sock errrors " + str(self.socket_errors) + "\n"
410 string += " max pkts " + str(self.max_pkts) + "\n"
411 string += " host " + str(self.host) + "\n"
412 string += " port " + str(self.port) + "\n"
413 string += " keep_alive " + str(self.keep_alive) + "\n"
414 return string
415
416 def show(self):
417 print str(self)
418
419def sample_handler(controller, msg, pkt):
420 """
421 Sample message handler
422
423 This is the prototype for functions registered with the controller
424 class for packet reception
425
426 @param controller The controller calling the handler
427 @param msg The parsed message object
428 @param pkt The raw packet that was received on the socket. This is
429 in case the packet contains extra unparsed data.
430 @returns Boolean value indicating if the packet was handled. If
431 not handled, the packet is placed in the queue for pollers to received
432 """
433 pass