blob: 25e50809545e2d6281a5dd5e469738d9a321531e [file] [log] [blame]
Dan Talayco34089522010-02-07 23:07:41 -08001"""
2OpenFlow Test Framework
3
4Controller class
5
6Provide the interface to the control channel to the switch under test.
7
8Class inherits from thread so as to run in background allowing
9asynchronous callbacks (if needed, not required). Also supports
10polling.
11
12The controller thread maintains a queue. Incoming messages that
13are not handled by a callback function are placed in this queue for
14poll calls.
15
16Callbacks and polling support specifying the message type
17
18@todo Support transaction semantics via xid
Dan Talayco21c75c72010-02-12 22:59:24 -080019@todo Set up reasonable logging facility
Dan Talayco34089522010-02-07 23:07:41 -080020"""
21
22import sys
Dan Talaycoe37999f2010-02-09 15:27:12 -080023sys.path.append("../protocol")
Dan Talayco21c75c72010-02-12 22:59:24 -080024sys.path.append("../")
25from oft_config import *
Dan Talayco34089522010-02-07 23:07:41 -080026import os
27import socket
28import time
Dan Talayco34089522010-02-07 23:07:41 -080029from threading import Thread
30from threading import Lock
Dan Talayco21c75c72010-02-12 22:59:24 -080031from threading import Condition
Dan Talayco34089522010-02-07 23:07:41 -080032from message import *
Dan Talaycoe37999f2010-02-09 15:27:12 -080033from parse import *
34from netutils import *
Dan Talayco34089522010-02-07 23:07:41 -080035
36class Controller(Thread):
37 """
38 Class abstracting the control interface to the switch.
39
40 For receiving messages, two mechanism will be implemented. First,
41 query the interface with poll. Second, register to have a
42 function called by message type. The callback is passed the
43 message type as well as the raw packet (or message object)
44
45 One of the main purposes of this object is to translate between network
46 and host byte order. 'Above' this object, things should be in host
47 byte order.
Dan Talayco21c75c72010-02-12 22:59:24 -080048
49 @todo Consider using SocketServer for listening socket
50 @todo Test transaction code
51
52 @var rcv_size The receive size to use for receive calls
53 @var max_pkts The max size of the receive queue
54 @var keep_alive If true, listen for echo requests and respond w/
55 echo replies
56 @var host The host to use for connect
57 @var port The port to connect on
58 @var packets_total Total number of packets received
59 @var packets_expired Number of packets popped from queue as queue full
60 @var packets_handled Number of packets handled by something
61 @var state Debug indication of state
Dan Talayco34089522010-02-07 23:07:41 -080062 """
63
Dan Talayco21c75c72010-02-12 22:59:24 -080064 def __init__(self, max_pkts=1024):
65 Thread.__init__(self)
66 self.rcv_size = RCV_SIZE_DEFAULT
67 self.socket_errors = 0
68 self.parse_errors = 0
69 self.connected = False
70 self.running = False
71 self.max_pkts = max_pkts
72 self.packets_total = 0
73 self.packets = []
74 self.sync = Lock()
75 self.handlers = {}
76 self.keep_alive = False
77 self.host = controller_host
78 self.port = controller_port
79 self.passive = True
80 self.packets_expired = 0
81 self.packets_handled = 0
82 self.state = "init" # Debug
83 self.listen_socket = None
84 self.switch_socket = None
85 self.switch_addr = None
86 self.debug_level = DEBUG_VERBOSE
87 # Transaction variables
88 # xid_cv: Condition variable (semaphore) for transaction
89 # xid: Transaction ID being waited on
90 # xid_response: Transaction response message
91 self.xid_cv = Condition()
92 self.xid = None
93 self.xid_response = None
94
95 def dbg(self, level, string):
96 debug_log("CTRL", self.debug_level, level, string)
97
98 def connect(self):
99 """
100 Open the socket connection
101
102 @param host The host address to use for the socket
103 @param port The port number to use for the socket
104 @param passive If True, use passive cxn: Not yet supported.
105
106 @return Boolean where True indicates success
107
108 If already connected, will close the current connection
109 """
110 oldstate = self.state
111 self.state = "connecting"
112 if self.connected:
113 self.dbg(DEBUG_WARN, "Disconnect when already connected")
114 self.disconnect()
115
116 if not self.passive:
Dan Talayco34089522010-02-07 23:07:41 -0800117 print "Error in controller init: Active cxn not supported"
Dan Talayco21c75c72010-02-12 22:59:24 -0800118 # raise unsupported
119 self.state = oldstate
120 return False
121
122 # FIXME: add error handling; try SocketServer?
123 self.dbg(DEBUG_INFO, "open ctl host: >" + str(self.host) + "< port " +
124 str(self.port))
125 if not self.listen_socket:
126 self.listen_socket = socket.socket(socket.AF_INET,
127 socket.SOCK_STREAM)
128 self.listen_socket.setsockopt(socket.SOL_SOCKET,
129 socket.SO_REUSEADDR, 1)
130 self.listen_socket.bind((self.host, self.port))
131 self.state = "listening"
132 self.listen_socket.listen(LISTEN_QUEUE_SIZE)
133 (self.switch_socket, self.switch_addr) = self.listen_socket.accept()
134 if not self.switch_socket:
135 self.socket_errors += 1
136 self.dbg(DEBUG_WARN, "Failed on accept")
137 self.state = "error"
138 return False
139
140 self.connected = True
141 self.state = "connected"
142 self.dbg(DEBUG_INFO, "Got connection to " + str(self.switch_addr))
143 return True
144
145 def disconnect(self):
146 """
147 Disconnect the switch socket
148 """
149 self.state = "disconnected"
150 if not self.connected:
151 self.dbg(DEBUG_INFO, "disconnect when not connected")
152 return
153 self.switch_socket.close()
154 self.connected = False
155
156 def _pkt_handler_check(self, pkt):
157 """
158 Check for packet handling before being enqueued
159
160 This includes checking for an ongoing transaction (see transact())
161 an echo request in case keep_alive is true, followed by
162 registered message handlers.
163 @param pkt The raw packet (string)
164 @return (handled, msg) where handled is a boolean indicating
165 the message was handled; msg if None is the parsed message
166 """
167 # Parse the header to get type
168 hdr = of_header_parse(pkt)
169 if not hdr:
170 self.dbg(DEBUG_INFO, "Could not parse header, pkt len", len(pkt))
171 self.parse_errors += 1
172 return (True, None)
173 if self.debug_level <= DEBUG_VERBOSE:
174 hdr.show()
175
176 self.dbg(DEBUG_VERBOSE, "message: len %d. type %s. hdr.len %d" %
177 (len(pkt), ofp_type_map[hdr.type], hdr.length))
178 msg = of_message_parse(pkt)
179 if not msg:
180 self.parse_errors += 1
181 self.dbg(DEBUG_WARN, "Could not parse message")
182 return (True, None)
183
184 # Check if transaction is waiting
185 self.xid_cv.acquire()
186 if self.xid:
187 if hdr.xid == self.xid:
188 self.xid_response = msg
189 self.xid_cv.release()
190 return (True, None)
191 self.xid_cv.release()
192
193 # Check if keep alive is set; if so, respond to echo requests
194 if self.keep_alive:
195 if hdr.type == OFPT_ECHO_REQUEST:
196 rep = echo_reply()
197 rep.header.xid = hdr.xid
198 # Ignoring additional data
199 self.message_send(rep.pack())
200 return (True, None)
201
202 # Now check for message handlers; preference is given to
203 # handlers for a specific packet
204 handled = False
205 if hdr.type in self.handlers.keys():
206 handled = self.handlers[hdr.type](self, msg, pkt)
207 if not handled and ("all" in self.handlers.keys()):
208 handled = self.handlers["all"](self, msg, pkt)
209
210 return (handled, msg)
211
212 def run(self):
213 """
214 Activity function for class
215
216 Loops until stop is called (or self.running is set to False).
217 If the connection drops, it connects again. It then receives
218 a message on the socket and checks for handlers, queuing the
219 packet if no one handles the packet.
220 """
221 self.running = True
222 while self.running:
223 if not self.connected:
224 if not self.connect():
225 self.dbg(DEBUG_ERROR,
226 "Controller thread error connecting; exit")
227 break
228 try:
229 pkt = self.switch_socket.recv(self.rcv_size)
230 if len(pkt) == 0:
231 self.dbg(DEBUG_WARN, "length 0 pkt in")
232 self.disconnect()
233 continue
234
235 (handled, msg) = self._pkt_handler_check(pkt)
236 if handled:
237 self.packets_handled += 1
238 continue
239
240 # Not handled, enqueue
241 self.sync.acquire()
242 if len(self.packets) >= self.max_pkts:
243 self.packets.pop(0)
244 self.packets_expired += 1
245 self.packets.append((msg, data))
246 self.packets_total += 1
247 self.sync.release()
248
249 except socket.error:
250 print "Controller socket read error"
251 self.socket_errors += 1
252 self.disconnect()
253
254 def stop(self):
255 """
256 Stop the running loop and disconnect the socket
257 """
258 self.running = False
259 # Is there something to do to switch_socket to stop an inprogress
260 # connect?
261 self.disconnect()
Dan Talayco34089522010-02-07 23:07:41 -0800262
263 def register(self, msg_type, handler):
264 """
265 Register a callback to receive a specific message type.
266
267 Only one handler may be registered for a given message type.
268 @param msg_type The type of message to receive. May be DEFAULT
Dan Talayco21c75c72010-02-12 22:59:24 -0800269 for all non-handled packets. The special type, the string "all"
270 will send all packets to the handler.
Dan Talayco34089522010-02-07 23:07:41 -0800271 @param handler The function to call when a message of the given
272 type is received.
273 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800274 # Should check type is valid
275 if not handler and msg_type in self.handlers.keys():
276 del self.handlers[msg_type]
277 return
278 self.handlers[msg_type] = handler
Dan Talayco34089522010-02-07 23:07:41 -0800279
Dan Talayco21c75c72010-02-12 22:59:24 -0800280 def poll(self, exp_msg=None, timeout=None):
Dan Talayco34089522010-02-07 23:07:41 -0800281 """
282 Wait for the next OF message received from the switch.
283
284 @param exp_msg If set, return only when this type of message
285 is received.
286
Dan Talayco21c75c72010-02-12 22:59:24 -0800287 @retval A pair (msg, data) where msg is a message object and data is
Dan Talayco34089522010-02-07 23:07:41 -0800288 a string of any additional information following the
Dan Talayco21c75c72010-02-12 22:59:24 -0800289 parsed message.
Dan Talayco34089522010-02-07 23:07:41 -0800290 The data members in the message are in host endian order.
Dan Talayco21c75c72010-02-12 22:59:24 -0800291 If an error occurs, None is returned
Dan Talayco34089522010-02-07 23:07:41 -0800292 """
Dan Talayco34089522010-02-07 23:07:41 -0800293
Dan Talayco21c75c72010-02-12 22:59:24 -0800294 # For now do not support time out;
295 if timeout:
296 print "DEBUG WARNING: poll time out not supported"
Dan Talayco34089522010-02-07 23:07:41 -0800297
Dan Talayco21c75c72010-02-12 22:59:24 -0800298 while len(self.packets) > 0:
299 self.sync.acquire()
300 (msg, data) = self.packets.pop(0)
301 self.sync.release()
302 if not exp_msg or (exp_msg and (msg.header.type == exp_msg)):
303 return msg, data
304
305 def transact(self, msg, timeout=None):
Dan Talayco34089522010-02-07 23:07:41 -0800306 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800307 Run a message transaction with the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800308
309 Send the message in msg and wait for a reply with a matching
Dan Talayco21c75c72010-02-12 22:59:24 -0800310 transaction id. Transactions have the highest priority in
311 received message handling.
Dan Talaycoe37999f2010-02-09 15:27:12 -0800312
Dan Talayco21c75c72010-02-12 22:59:24 -0800313 @param msg The message object to send; must not be a string
314 @param timeout The timeout in seconds (?)
315 @return The matching message object or None if unsuccessful
Dan Talaycoe37999f2010-02-09 15:27:12 -0800316
Dan Talayco21c75c72010-02-12 22:59:24 -0800317 @todo Implement transact function for controller
Dan Talayco34089522010-02-07 23:07:41 -0800318 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800319
320 self.xid_cv.acquire()
321 if self.xid:
322 self.xid_cv.release()
323 self.dbg(DEBUG_ERROR,
324 "Can only run one transaction at a time")
325 return None
326
327 self.xid = msg.header.xid
328 self.xid_response = None
329 self.message_send(msg.pack())
330 self.xid_cv.wait(timeout)
331 (msg, data) = self.xid_response
332 self.xid_response = None
333 self.xid_cv.release()
334 return (msg, data)
Dan Talayco34089522010-02-07 23:07:41 -0800335
336 def message_send(self, msg):
337 """
338 Send the message to the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800339
340 @param msg An OpenFlow message object to be forwarded to the switch.
341
Dan Talayco21c75c72010-02-12 22:59:24 -0800342 @return None on success
Dan Talayco34089522010-02-07 23:07:41 -0800343
Dan Talayco21c75c72010-02-12 22:59:24 -0800344 """
345
346 if type(msg) != type(""):
347 # Sending a string indicates the message is ready to go
348 self.dbg(DEBUG_INFO, "message_send requires packet as string")
349 return 0
350
351 self.dbg(DEBUG_INFO, "Sending pkt of len " + str(len(msg)))
352 return self.switch_socket.sendall(msg)
353
354 def __str__(self):
355 string = "Controller:\n"
356 string += " connected " + str(self.connected) + "\n"
357 string += " state " + self.state + "\n"
358 string += " running " + str(self.running) + "\n"
359 string += " switch_addr " + str(self.switch_addr) + "\n"
360 string += " pending pkts " + str(len(self.packets)) + "\n"
361 string += " total pkts " + str(self.packets_total) + "\n"
362 string += " expired pkts " + str(self.packets_expired) + "\n"
363 string += " handled pkts " + str(self.packets_handled) + "\n"
364 string += " parse errors " + str(self.parse_errors) + "\n"
365 string += " sock errrors " + str(self.socket_errors) + "\n"
366 string += " max pkts " + str(self.max_pkts) + "\n"
367 string += " host " + str(self.host) + "\n"
368 string += " port " + str(self.port) + "\n"
369 string += " keep_alive " + str(self.keep_alive) + "\n"
370 return string
371
372 def show(self):
373 print str(self)
374
375def sample_handler(controller, msg, pkt):
376 """
377 Sample message handler
378
379 This is the prototype for functions registered with the controller
380 class for packet reception
381
382 @param controller The controller calling the handler
383 @param msg The parsed message object
384 @param pkt The raw packet that was received on the socket. This is
385 in case the packet contains extra unparsed data.
386 @returns Boolean value indicating if the packet was handled. If
387 not handled, the packet is placed in the queue for pollers to received
388 """
389 pass