blob: 4262ca990dae6e0dd629da345b989bdd3900ea91 [file] [log] [blame]
Dan Talayco34089522010-02-07 23:07:41 -08001"""
2OpenFlow Test Framework
3
4Controller class
5
6Provide the interface to the control channel to the switch under test.
7
8Class inherits from thread so as to run in background allowing
9asynchronous callbacks (if needed, not required). Also supports
10polling.
11
12The controller thread maintains a queue. Incoming messages that
13are not handled by a callback function are placed in this queue for
14poll calls.
15
16Callbacks and polling support specifying the message type
17
18@todo Support transaction semantics via xid
Dan Talayco21c75c72010-02-12 22:59:24 -080019@todo Set up reasonable logging facility
Dan Talayco34089522010-02-07 23:07:41 -080020"""
21
Dan Talayco21c75c72010-02-12 22:59:24 -080022from oft_config import *
Dan Talayco34089522010-02-07 23:07:41 -080023import os
24import socket
25import time
Dan Talayco34089522010-02-07 23:07:41 -080026from threading import Thread
27from threading import Lock
Dan Talayco21c75c72010-02-12 22:59:24 -080028from threading import Condition
Dan Talayco34089522010-02-07 23:07:41 -080029from message import *
Dan Talaycoe37999f2010-02-09 15:27:12 -080030from parse import *
31from netutils import *
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080032from ofutils import *
Dan Talayco34089522010-02-07 23:07:41 -080033
34class Controller(Thread):
35 """
36 Class abstracting the control interface to the switch.
37
38 For receiving messages, two mechanism will be implemented. First,
39 query the interface with poll. Second, register to have a
40 function called by message type. The callback is passed the
41 message type as well as the raw packet (or message object)
42
43 One of the main purposes of this object is to translate between network
44 and host byte order. 'Above' this object, things should be in host
45 byte order.
Dan Talayco21c75c72010-02-12 22:59:24 -080046
47 @todo Consider using SocketServer for listening socket
48 @todo Test transaction code
49
50 @var rcv_size The receive size to use for receive calls
51 @var max_pkts The max size of the receive queue
52 @var keep_alive If true, listen for echo requests and respond w/
53 echo replies
54 @var host The host to use for connect
55 @var port The port to connect on
56 @var packets_total Total number of packets received
57 @var packets_expired Number of packets popped from queue as queue full
58 @var packets_handled Number of packets handled by something
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080059 @var dbg_state Debug indication of state
Dan Talayco34089522010-02-07 23:07:41 -080060 """
61
Dan Talayco21c75c72010-02-12 22:59:24 -080062 def __init__(self, max_pkts=1024):
63 Thread.__init__(self)
64 self.rcv_size = RCV_SIZE_DEFAULT
65 self.socket_errors = 0
66 self.parse_errors = 0
67 self.connected = False
68 self.running = False
69 self.max_pkts = max_pkts
70 self.packets_total = 0
71 self.packets = []
72 self.sync = Lock()
73 self.handlers = {}
74 self.keep_alive = False
75 self.host = controller_host
76 self.port = controller_port
77 self.passive = True
78 self.packets_expired = 0
79 self.packets_handled = 0
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080080 self.dbg_state = "init"
Dan Talayco21c75c72010-02-12 22:59:24 -080081 self.listen_socket = None
82 self.switch_socket = None
83 self.switch_addr = None
84 self.debug_level = DEBUG_VERBOSE
85 # Transaction variables
86 # xid_cv: Condition variable (semaphore) for transaction
87 # xid: Transaction ID being waited on
88 # xid_response: Transaction response message
89 self.xid_cv = Condition()
90 self.xid = None
91 self.xid_response = None
92
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080093 # Connection condition variable to notify anyone waiting for a cxn
94 self.connect_cv = Condition()
95
Dan Talayco21c75c72010-02-12 22:59:24 -080096 def dbg(self, level, string):
97 debug_log("CTRL", self.debug_level, level, string)
98
99 def connect(self):
100 """
101 Open the socket connection
102
103 @param host The host address to use for the socket
104 @param port The port number to use for the socket
105 @param passive If True, use passive cxn: Not yet supported.
106
107 @return Boolean where True indicates success
108
109 If already connected, will close the current connection
110 """
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800111 oldstate = self.dbg_state
112 self.dbg_state = "connecting"
Dan Talayco21c75c72010-02-12 22:59:24 -0800113 if self.connected:
114 self.dbg(DEBUG_WARN, "Disconnect when already connected")
115 self.disconnect()
116
117 if not self.passive:
Dan Talayco34089522010-02-07 23:07:41 -0800118 print "Error in controller init: Active cxn not supported"
Dan Talayco21c75c72010-02-12 22:59:24 -0800119 # raise unsupported
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800120 self.dbg_state = oldstate
Dan Talayco21c75c72010-02-12 22:59:24 -0800121 return False
122
123 # FIXME: add error handling; try SocketServer?
124 self.dbg(DEBUG_INFO, "open ctl host: >" + str(self.host) + "< port " +
125 str(self.port))
126 if not self.listen_socket:
127 self.listen_socket = socket.socket(socket.AF_INET,
128 socket.SOCK_STREAM)
129 self.listen_socket.setsockopt(socket.SOL_SOCKET,
130 socket.SO_REUSEADDR, 1)
131 self.listen_socket.bind((self.host, self.port))
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800132 self.dbg_state = "listening"
Dan Talayco21c75c72010-02-12 22:59:24 -0800133 self.listen_socket.listen(LISTEN_QUEUE_SIZE)
134 (self.switch_socket, self.switch_addr) = self.listen_socket.accept()
135 if not self.switch_socket:
136 self.socket_errors += 1
137 self.dbg(DEBUG_WARN, "Failed on accept")
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800138 self.dbg_state = "error"
Dan Talayco21c75c72010-02-12 22:59:24 -0800139 return False
140
141 self.connected = True
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800142 self.dbg_state = "connected"
Dan Talayco21c75c72010-02-12 22:59:24 -0800143 self.dbg(DEBUG_INFO, "Got connection to " + str(self.switch_addr))
144 return True
145
146 def disconnect(self):
147 """
148 Disconnect the switch socket
149 """
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800150 self.dbg_state = "disconnecting"
Dan Talayco21c75c72010-02-12 22:59:24 -0800151 if not self.connected:
152 self.dbg(DEBUG_INFO, "disconnect when not connected")
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800153 self.dbg_state = "disconnected"
Dan Talayco21c75c72010-02-12 22:59:24 -0800154 return
155 self.switch_socket.close()
156 self.connected = False
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800157 self.dbg_state = "disconnected"
Dan Talayco21c75c72010-02-12 22:59:24 -0800158
159 def _pkt_handler_check(self, pkt):
160 """
161 Check for packet handling before being enqueued
162
163 This includes checking for an ongoing transaction (see transact())
164 an echo request in case keep_alive is true, followed by
165 registered message handlers.
166 @param pkt The raw packet (string)
167 @return (handled, msg) where handled is a boolean indicating
168 the message was handled; msg if None is the parsed message
169 """
170 # Parse the header to get type
171 hdr = of_header_parse(pkt)
172 if not hdr:
173 self.dbg(DEBUG_INFO, "Could not parse header, pkt len", len(pkt))
174 self.parse_errors += 1
175 return (True, None)
176 if self.debug_level <= DEBUG_VERBOSE:
177 hdr.show()
178
179 self.dbg(DEBUG_VERBOSE, "message: len %d. type %s. hdr.len %d" %
180 (len(pkt), ofp_type_map[hdr.type], hdr.length))
181 msg = of_message_parse(pkt)
182 if not msg:
183 self.parse_errors += 1
184 self.dbg(DEBUG_WARN, "Could not parse message")
185 return (True, None)
186
187 # Check if transaction is waiting
188 self.xid_cv.acquire()
189 if self.xid:
190 if hdr.xid == self.xid:
191 self.xid_response = msg
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800192 self.xid_cv.notify()
Dan Talayco21c75c72010-02-12 22:59:24 -0800193 self.xid_cv.release()
194 return (True, None)
195 self.xid_cv.release()
196
197 # Check if keep alive is set; if so, respond to echo requests
198 if self.keep_alive:
199 if hdr.type == OFPT_ECHO_REQUEST:
200 rep = echo_reply()
201 rep.header.xid = hdr.xid
202 # Ignoring additional data
203 self.message_send(rep.pack())
204 return (True, None)
205
206 # Now check for message handlers; preference is given to
207 # handlers for a specific packet
208 handled = False
209 if hdr.type in self.handlers.keys():
210 handled = self.handlers[hdr.type](self, msg, pkt)
211 if not handled and ("all" in self.handlers.keys()):
212 handled = self.handlers["all"](self, msg, pkt)
213
214 return (handled, msg)
215
216 def run(self):
217 """
218 Activity function for class
219
220 Loops until stop is called (or self.running is set to False).
221 If the connection drops, it connects again. It then receives
222 a message on the socket and checks for handlers, queuing the
223 packet if no one handles the packet.
224 """
225 self.running = True
226 while self.running:
227 if not self.connected:
228 if not self.connect():
229 self.dbg(DEBUG_ERROR,
230 "Controller thread error connecting; exit")
231 break
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800232 # Notify anyone waiting that a connection is made
233 self.connect_cv.acquire()
234 self.connect_cv.notify()
235 self.connect_cv.release()
Dan Talayco21c75c72010-02-12 22:59:24 -0800236 try:
237 pkt = self.switch_socket.recv(self.rcv_size)
238 if len(pkt) == 0:
239 self.dbg(DEBUG_WARN, "length 0 pkt in")
240 self.disconnect()
241 continue
242
243 (handled, msg) = self._pkt_handler_check(pkt)
244 if handled:
245 self.packets_handled += 1
246 continue
247
248 # Not handled, enqueue
249 self.sync.acquire()
250 if len(self.packets) >= self.max_pkts:
251 self.packets.pop(0)
252 self.packets_expired += 1
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800253 self.packets.append((msg, pkt))
Dan Talayco21c75c72010-02-12 22:59:24 -0800254 self.packets_total += 1
255 self.sync.release()
256
257 except socket.error:
258 print "Controller socket read error"
259 self.socket_errors += 1
260 self.disconnect()
261
262 def stop(self):
263 """
264 Stop the running loop and disconnect the socket
265 """
266 self.running = False
267 # Is there something to do to switch_socket to stop an inprogress
268 # connect?
269 self.disconnect()
Dan Talayco34089522010-02-07 23:07:41 -0800270
271 def register(self, msg_type, handler):
272 """
273 Register a callback to receive a specific message type.
274
275 Only one handler may be registered for a given message type.
276 @param msg_type The type of message to receive. May be DEFAULT
Dan Talayco21c75c72010-02-12 22:59:24 -0800277 for all non-handled packets. The special type, the string "all"
278 will send all packets to the handler.
Dan Talayco34089522010-02-07 23:07:41 -0800279 @param handler The function to call when a message of the given
280 type is received.
281 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800282 # Should check type is valid
283 if not handler and msg_type in self.handlers.keys():
284 del self.handlers[msg_type]
285 return
286 self.handlers[msg_type] = handler
Dan Talayco34089522010-02-07 23:07:41 -0800287
Dan Talayco21c75c72010-02-12 22:59:24 -0800288 def poll(self, exp_msg=None, timeout=None):
Dan Talayco34089522010-02-07 23:07:41 -0800289 """
290 Wait for the next OF message received from the switch.
291
292 @param exp_msg If set, return only when this type of message
293 is received.
294
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800295 @retval A pair (msg, pkt) where msg is a message object and pkt
296 the string representing the packet as received from the socket.
297 This allows additional parsing by the receiver if necessary.
298
Dan Talayco34089522010-02-07 23:07:41 -0800299 The data members in the message are in host endian order.
Dan Talayco21c75c72010-02-12 22:59:24 -0800300 If an error occurs, None is returned
Dan Talayco34089522010-02-07 23:07:41 -0800301 """
Dan Talayco34089522010-02-07 23:07:41 -0800302
Dan Talayco21c75c72010-02-12 22:59:24 -0800303 # For now do not support time out;
304 if timeout:
305 print "DEBUG WARNING: poll time out not supported"
Dan Talayco34089522010-02-07 23:07:41 -0800306
Dan Talayco21c75c72010-02-12 22:59:24 -0800307 while len(self.packets) > 0:
308 self.sync.acquire()
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800309 (msg, pkt) = self.packets.pop(0)
Dan Talayco21c75c72010-02-12 22:59:24 -0800310 self.sync.release()
311 if not exp_msg or (exp_msg and (msg.header.type == exp_msg)):
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800312 return msg, pkt
Dan Talayco21c75c72010-02-12 22:59:24 -0800313
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800314 def transact(self, msg, timeout=None, zero_xid=False):
Dan Talayco34089522010-02-07 23:07:41 -0800315 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800316 Run a message transaction with the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800317
318 Send the message in msg and wait for a reply with a matching
Dan Talayco21c75c72010-02-12 22:59:24 -0800319 transaction id. Transactions have the highest priority in
320 received message handling.
Dan Talaycoe37999f2010-02-09 15:27:12 -0800321
Dan Talayco21c75c72010-02-12 22:59:24 -0800322 @param msg The message object to send; must not be a string
323 @param timeout The timeout in seconds (?)
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800324 @param zero_xid Normally, if the XID is 0 an XID will be generated
325 for the message. Set xero_xid to override this behavior
Dan Talayco21c75c72010-02-12 22:59:24 -0800326 @return The matching message object or None if unsuccessful
Dan Talaycoe37999f2010-02-09 15:27:12 -0800327
Dan Talayco34089522010-02-07 23:07:41 -0800328 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800329
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800330 if not zero_xid and msg.header.xid == 0:
331 msg.header.xid = gen_xid()
332
Dan Talayco21c75c72010-02-12 22:59:24 -0800333 self.xid_cv.acquire()
334 if self.xid:
335 self.xid_cv.release()
336 self.dbg(DEBUG_ERROR,
337 "Can only run one transaction at a time")
338 return None
339
340 self.xid = msg.header.xid
341 self.xid_response = None
342 self.message_send(msg.pack())
343 self.xid_cv.wait(timeout)
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800344 msg = self.xid_response
Dan Talayco21c75c72010-02-12 22:59:24 -0800345 self.xid_response = None
346 self.xid_cv.release()
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800347 return msg
Dan Talayco34089522010-02-07 23:07:41 -0800348
349 def message_send(self, msg):
350 """
351 Send the message to the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800352
353 @param msg An OpenFlow message object to be forwarded to the switch.
354
Dan Talayco21c75c72010-02-12 22:59:24 -0800355 @return None on success
Dan Talayco34089522010-02-07 23:07:41 -0800356
Dan Talayco21c75c72010-02-12 22:59:24 -0800357 """
358
359 if type(msg) != type(""):
360 # Sending a string indicates the message is ready to go
361 self.dbg(DEBUG_INFO, "message_send requires packet as string")
362 return 0
363
364 self.dbg(DEBUG_INFO, "Sending pkt of len " + str(len(msg)))
365 return self.switch_socket.sendall(msg)
366
367 def __str__(self):
368 string = "Controller:\n"
369 string += " connected " + str(self.connected) + "\n"
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800370 string += " state " + self.dbg_state + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800371 string += " running " + str(self.running) + "\n"
372 string += " switch_addr " + str(self.switch_addr) + "\n"
373 string += " pending pkts " + str(len(self.packets)) + "\n"
374 string += " total pkts " + str(self.packets_total) + "\n"
375 string += " expired pkts " + str(self.packets_expired) + "\n"
376 string += " handled pkts " + str(self.packets_handled) + "\n"
377 string += " parse errors " + str(self.parse_errors) + "\n"
378 string += " sock errrors " + str(self.socket_errors) + "\n"
379 string += " max pkts " + str(self.max_pkts) + "\n"
380 string += " host " + str(self.host) + "\n"
381 string += " port " + str(self.port) + "\n"
382 string += " keep_alive " + str(self.keep_alive) + "\n"
383 return string
384
385 def show(self):
386 print str(self)
387
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800388 def connect_to_switch(self, timeout=None, send_hello=True):
389 """
390 Connect to switch
391
392 Block until a connection to a switch
393 is established or a timeout occurs. Sends a hello message
394 once established.
395 @param timeout Timeout in seconds
396 @param send_hello If True, will send hello when connection established
397 @return True if successful, False otherwise
398 """
399 h = hello()
400 print "x1"
401 self.connect_cv.acquire()
402 if self.connected:
403 print "x2"
404 self.connect_cv.release()
405 return True
406
407 print "x3"
408 self.connect_cv.wait(timeout)
409 self.connect_cv.release()
410 print "x4"
411 if send_hello:
412 self.message_send(h.pack())
413 print "x5"
414 return self.connected
415
Dan Talayco21c75c72010-02-12 22:59:24 -0800416def sample_handler(controller, msg, pkt):
417 """
418 Sample message handler
419
420 This is the prototype for functions registered with the controller
421 class for packet reception
422
423 @param controller The controller calling the handler
424 @param msg The parsed message object
425 @param pkt The raw packet that was received on the socket. This is
426 in case the packet contains extra unparsed data.
427 @returns Boolean value indicating if the packet was handled. If
428 not handled, the packet is placed in the queue for pollers to received
429 """
430 pass