blob: 9e3354d7aca6c03e56a56047be9058ca718135aa [file] [log] [blame]
Dan Talayco34089522010-02-07 23:07:41 -08001"""
2OpenFlow Test Framework
3
4Controller class
5
6Provide the interface to the control channel to the switch under test.
7
8Class inherits from thread so as to run in background allowing
9asynchronous callbacks (if needed, not required). Also supports
10polling.
11
12The controller thread maintains a queue. Incoming messages that
13are not handled by a callback function are placed in this queue for
14poll calls.
15
16Callbacks and polling support specifying the message type
17
18@todo Support transaction semantics via xid
Dan Talayco1b3f6902010-02-15 14:14:19 -080019@todo Support select and listen on an administrative socket (or
20use a timeout to support clean shutdown).
21
22Currently only one connection is accepted during the life of
23the controller. There seems
24to be no clean way to interrupt an accept call. Using select that also listens
25on an administrative socket and can shut down the socket might work.
26
Dan Talayco34089522010-02-07 23:07:41 -080027"""
28
Dan Talayco34089522010-02-07 23:07:41 -080029import os
30import socket
31import time
Dan Talayco34089522010-02-07 23:07:41 -080032from threading import Thread
33from threading import Lock
Dan Talayco21c75c72010-02-12 22:59:24 -080034from threading import Condition
Dan Talayco34089522010-02-07 23:07:41 -080035from message import *
Dan Talaycoe37999f2010-02-09 15:27:12 -080036from parse import *
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080037from ofutils import *
Dan Talayco710438c2010-02-18 15:16:07 -080038# For some reason, it seems select to be last (or later).
39# Otherwise get an attribute error when calling select.select
40import select
Dan Talayco48370102010-03-03 15:17:33 -080041import logging
42
Dan Talaycof8de5182012-04-12 22:38:41 -070043
44FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.'
45 for x in range(256)])
46
47def hex_dump_buffer(src, length=16):
48 """
49 Convert src to a hex dump string and return the string
50 @param src The source buffer
51 @param length The number of bytes shown in each line
52 @returns A string showing the hex dump
53 """
54 result = ["\n"]
55 for i in xrange(0, len(src), length):
56 chars = src[i:i+length]
57 hex = ' '.join(["%02x" % ord(x) for x in chars])
58 printable = ''.join(["%s" % ((ord(x) <= 127 and
59 FILTER[ord(x)]) or '.') for x in chars])
60 result.append("%04x %-*s %s\n" % (i, length*3, hex, printable))
61 return ''.join(result)
62
Dan Talayco48370102010-03-03 15:17:33 -080063##@todo Find a better home for these identifiers (controller)
Glen Gibb741b1182010-07-08 16:43:58 -070064RCV_SIZE_DEFAULT = 32768
Dan Talayco48370102010-03-03 15:17:33 -080065LISTEN_QUEUE_SIZE = 1
Dan Talayco34089522010-02-07 23:07:41 -080066
67class Controller(Thread):
68 """
69 Class abstracting the control interface to the switch.
70
71 For receiving messages, two mechanism will be implemented. First,
72 query the interface with poll. Second, register to have a
73 function called by message type. The callback is passed the
74 message type as well as the raw packet (or message object)
75
76 One of the main purposes of this object is to translate between network
77 and host byte order. 'Above' this object, things should be in host
78 byte order.
Dan Talayco21c75c72010-02-12 22:59:24 -080079
80 @todo Consider using SocketServer for listening socket
81 @todo Test transaction code
82
83 @var rcv_size The receive size to use for receive calls
84 @var max_pkts The max size of the receive queue
85 @var keep_alive If true, listen for echo requests and respond w/
86 echo replies
Dan Talayco710438c2010-02-18 15:16:07 -080087 @var initial_hello If true, will send a hello message immediately
88 upon connecting to the switch
Dan Talayco21c75c72010-02-12 22:59:24 -080089 @var host The host to use for connect
90 @var port The port to connect on
91 @var packets_total Total number of packets received
92 @var packets_expired Number of packets popped from queue as queue full
93 @var packets_handled Number of packets handled by something
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080094 @var dbg_state Debug indication of state
Dan Talayco34089522010-02-07 23:07:41 -080095 """
96
Dan Talayco48370102010-03-03 15:17:33 -080097 def __init__(self, host='127.0.0.1', port=6633, max_pkts=1024):
Dan Talayco21c75c72010-02-12 22:59:24 -080098 Thread.__init__(self)
Dan Talayco1b3f6902010-02-15 14:14:19 -080099 # Socket related
Dan Talayco21c75c72010-02-12 22:59:24 -0800100 self.rcv_size = RCV_SIZE_DEFAULT
Dan Talayco1b3f6902010-02-15 14:14:19 -0800101 self.listen_socket = None
102 self.switch_socket = None
103 self.switch_addr = None
Dan Talayco710438c2010-02-18 15:16:07 -0800104 self.socs = []
105 self.connect_cv = Condition()
Dan Talaycoe226eb12010-02-18 23:06:30 -0800106 self.message_cv = Condition()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800107
108 # Counters
Dan Talayco21c75c72010-02-12 22:59:24 -0800109 self.socket_errors = 0
110 self.parse_errors = 0
Dan Talayco21c75c72010-02-12 22:59:24 -0800111 self.packets_total = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -0800112 self.packets_expired = 0
113 self.packets_handled = 0
Dan Talaycoe226eb12010-02-18 23:06:30 -0800114 self.poll_discards = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -0800115
116 # State
Dan Talayco21c75c72010-02-12 22:59:24 -0800117 self.packets = []
118 self.sync = Lock()
119 self.handlers = {}
120 self.keep_alive = False
Dan Talayco710438c2010-02-18 15:16:07 -0800121 self.active = True
122 self.initial_hello = True
Dan Talayco1b3f6902010-02-15 14:14:19 -0800123
124 # Settings
125 self.max_pkts = max_pkts
126 self.passive = True
Dan Talayco48370102010-03-03 15:17:33 -0800127 self.host = host
128 self.port = port
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800129 self.dbg_state = "init"
Dan Talayco48370102010-03-03 15:17:33 -0800130 self.logger = logging.getLogger("controller")
Dan Talaycof8de5182012-04-12 22:38:41 -0700131 self.filter_packet_in = False # Drop "excessive" packet ins
132 self.pkt_in_run = 0 # Count on run of packet ins
133 self.pkt_in_filter_limit = 50 # Count on run of packet ins
134 self.pkt_in_dropped = 0 # Total dropped packet ins
135 self.transact_to = 15 # Transact timeout default value; add to config
Dan Talayco1b3f6902010-02-15 14:14:19 -0800136
Dan Talaycoe226eb12010-02-18 23:06:30 -0800137 # Transaction and message type waiting variables
138 # xid_cv: Condition variable (semaphore) for packet waiters
Dan Talayco21c75c72010-02-12 22:59:24 -0800139 # xid: Transaction ID being waited on
140 # xid_response: Transaction response message
Dan Talaycoe226eb12010-02-18 23:06:30 -0800141 # expect_msg: Is a message being waited on
142 # expect_msg_cv: Semaphore for waiters
143 # expect_msg_type: Type of message expected
144 # expect_msg_response: Result passed through here
145
Dan Talayco21c75c72010-02-12 22:59:24 -0800146 self.xid_cv = Condition()
147 self.xid = None
148 self.xid_response = None
149
Dan Talaycoe226eb12010-02-18 23:06:30 -0800150 self.expect_msg = False
151 self.expect_msg_cv = Condition()
152 self.expect_msg_type = None
153 self.expect_msg_response = None
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800154 self.buffered_input = ""
Dan Talaycoe226eb12010-02-18 23:06:30 -0800155
Dan Talaycof8de5182012-04-12 22:38:41 -0700156 def filter_packet(self, rawmsg, hdr):
157 """
158 Check if packet should be filtered
159
160 Currently filters packet in messages
161 @return Boolean, True if packet should be dropped
162 """
163 # Add check for packet in and rate limit
164 if self.filter_packet_in:
165 # If this is a packet in and not expecting a packet in
166 if ((not self.expect_msg_type or
167 (self.expect_msg_type != hdr.type)) and
168 hdr.type == OFPT_PACKET_IN):
169
170 self.pkt_in_run += 1
171 # Check if limit exceeded
172 if self.pkt_in_run > self.pkt_in_filter_limit:
173 self.pkt_in_dropped += 1
174 return True
175
176 else: # Not in filtering mode
177 # If we were dropping packets, report number dropped
178 if self.pkt_in_run > self.pkt_in_filter_limit:
179 self.logger.debug("Dropped %d packet ins (%d total)"
180 % ((self.pkt_in_run -
181 self.pkt_in_filter_limit),
182 self.pkt_in_dropped))
183 self.pkt_in_run = 0
184
185 return False
186
Dan Talaycod12b6612010-03-07 22:00:46 -0800187 def _pkt_handle(self, pkt):
188 """
189 Check for all packet handling conditions
190
191 Parse and verify message
192 Check if XID matches something waiting
193 Check if message is being expected for a poll operation
194 Check if keep alive is on and message is an echo request
195 Check if any registered handler wants the packet
196 Enqueue if none of those conditions is met
197
198 an echo request in case keep_alive is true, followed by
199 registered message handlers.
Glen Gibb6d467062010-07-08 16:15:08 -0700200 @param pkt The raw packet (string) which may contain multiple OF msgs
Dan Talaycod12b6612010-03-07 22:00:46 -0800201 """
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800202
203 # snag any left over data from last read()
204 pkt = self.buffered_input + pkt
205 self.buffered_input = ""
206
Glen Gibb6d467062010-07-08 16:15:08 -0700207 # Process each of the OF msgs inside the pkt
208 offset = 0
209 while offset < len(pkt):
210 # Parse the header to get type
211 hdr = of_header_parse(pkt[offset:])
Dan Talaycof8de5182012-04-12 22:38:41 -0700212 if not hdr or hdr.length == 0:
213 self.logger.error("Could not parse header")
214 self.logger.error("pkt len %d." % len(pkt))
215 if hdr:
216 self.logger.error("hdr len %d." % hdr.length)
217 self.logger.error("%s" % hex_dump_buffer(pkt[:200]))
218 self.kill()
Dan Talaycod12b6612010-03-07 22:00:46 -0800219 return
220
Glen Gibb6d467062010-07-08 16:15:08 -0700221 # Extract the raw message bytes
Ed Swierk836e5bd2012-03-20 11:08:53 -0700222 if (offset + hdr.length) > len(pkt):
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800223 break
Glen Gibb6d467062010-07-08 16:15:08 -0700224 rawmsg = pkt[offset : offset + hdr.length]
Dan Talayco4306d3e2011-09-07 09:42:26 -0700225 offset += hdr.length
Dan Talaycof8de5182012-04-12 22:38:41 -0700226
227 if self.filter_packet(rawmsg, hdr):
228 continue
229
230 self.logger.debug("Msg in: buf len %d. hdr.type %s. hdr.len %d" %
231 (len(pkt), ofp_type_map[hdr.type], hdr.length))
Glen Gibb6d467062010-07-08 16:15:08 -0700232 if hdr.version != OFP_VERSION:
233 self.logger.error("Version %d does not match OFTest version %d"
234 % (hdr.version, OFP_VERSION))
235 print "Version %d does not match OFTest version %d" % \
236 (hdr.version, OFP_VERSION)
237 self.active = False
238 self.switch_socket = None
Dan Talaycof8de5182012-04-12 22:38:41 -0700239 return
Dan Talaycod12b6612010-03-07 22:00:46 -0800240
Glen Gibb6d467062010-07-08 16:15:08 -0700241 msg = of_message_parse(rawmsg)
242 if not msg:
243 self.parse_errors += 1
244 self.logger.warn("Could not parse message")
245 continue
246
247 self.sync.acquire()
248
249 # Check if transaction is waiting
250 self.xid_cv.acquire()
251 if self.xid:
252 if hdr.xid == self.xid:
253 self.logger.debug("Matched expected XID " + str(hdr.xid))
254 self.xid_response = (msg, rawmsg)
255 self.xid = None
256 self.xid_cv.notify()
257 self.xid_cv.release()
258 self.sync.release()
259 continue
260 self.xid_cv.release()
261
262 # PREVENT QUEUE ACCESS AT THIS POINT?
263 # Check if anyone waiting on this type of message
264 self.expect_msg_cv.acquire()
265 if self.expect_msg:
266 if not self.expect_msg_type or (self.expect_msg_type == hdr.type):
root2843d2b2012-04-06 10:27:46 -0700267 self.logger.debug("Matched msg; type %s. expected %s " %
268 (ofp_type_map[hdr.type],
Howard Pershc00c7472012-04-09 15:49:07 -0700269 str(self.expect_msg_type)))
Glen Gibb6d467062010-07-08 16:15:08 -0700270 self.expect_msg_response = (msg, rawmsg)
271 self.expect_msg = False
272 self.expect_msg_cv.notify()
273 self.expect_msg_cv.release()
274 self.sync.release()
275 continue
276 self.expect_msg_cv.release()
277
278 # Check if keep alive is set; if so, respond to echo requests
279 if self.keep_alive:
280 if hdr.type == OFPT_ECHO_REQUEST:
281 self.sync.release()
282 self.logger.debug("Responding to echo request")
283 rep = echo_reply()
284 rep.header.xid = hdr.xid
285 # Ignoring additional data
Dan Talaycof8de5182012-04-12 22:38:41 -0700286 if self.message_send(rep.pack(), zero_xid=True) < 0:
287 self.logger.error("Error sending echo reply")
Glen Gibb6d467062010-07-08 16:15:08 -0700288 continue
289
290 # Now check for message handlers; preference is given to
291 # handlers for a specific packet
292 handled = False
293 if hdr.type in self.handlers.keys():
294 handled = self.handlers[hdr.type](self, msg, rawmsg)
295 if not handled and ("all" in self.handlers.keys()):
296 handled = self.handlers["all"](self, msg, rawmsg)
297
298 if not handled: # Not handled, enqueue
299 self.logger.debug("Enqueuing pkt type " + ofp_type_map[hdr.type])
300 if len(self.packets) >= self.max_pkts:
301 self.packets.pop(0)
302 self.packets_expired += 1
303 self.packets.append((msg, rawmsg))
304 self.packets_total += 1
305 else:
306 self.packets_handled += 1
307 self.logger.debug("Message handled by callback")
308
309 self.sync.release()
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800310 # end of 'while offset < len(pkt)'
311 # note that if offset = len(pkt), this is
312 # appends a harmless empty string
313 self.buffered_input += pkt[offset:]
Dan Talaycod12b6612010-03-07 22:00:46 -0800314
Dan Talayco710438c2010-02-18 15:16:07 -0800315 def _socket_ready_handle(self, s):
316 """
317 Handle an input-ready socket
Dan Talaycof8de5182012-04-12 22:38:41 -0700318
Dan Talayco710438c2010-02-18 15:16:07 -0800319 @param s The socket object that is ready
Dan Talaycof8de5182012-04-12 22:38:41 -0700320 @returns 0 on success, -1 on error
Dan Talayco710438c2010-02-18 15:16:07 -0800321 """
322
Dan Talaycof8de5182012-04-12 22:38:41 -0700323 if s and s == self.listen_socket:
Dan Talayco710438c2010-02-18 15:16:07 -0800324 if self.switch_socket:
Dan Talaycof8de5182012-04-12 22:38:41 -0700325 return 0 # Ignore listen socket while connected to switch
Dan Talayco710438c2010-02-18 15:16:07 -0800326
327 (self.switch_socket, self.switch_addr) = \
328 self.listen_socket.accept()
Dan Talayco48370102010-03-03 15:17:33 -0800329 self.logger.info("Got cxn to " + str(self.switch_addr))
Dan Talaycof8de5182012-04-12 22:38:41 -0700330 self.socs.append(self.switch_socket)
Dan Talayco710438c2010-02-18 15:16:07 -0800331 # Notify anyone waiting
332 self.connect_cv.acquire()
333 self.connect_cv.notify()
334 self.connect_cv.release()
Dan Talayco710438c2010-02-18 15:16:07 -0800335 if self.initial_hello:
336 self.message_send(hello())
Dan Talaycof8de5182012-04-12 22:38:41 -0700337 ## @fixme Check return code
338 elif s and s == self.switch_socket:
339 for idx in range(3): # debug: try a couple of times
340 try:
341 pkt = self.switch_socket.recv(self.rcv_size)
342 except:
343 self.logger.warning("Error on switch read")
344 return -1
345
346 if not self.active:
347 return 0
348
349 if len(pkt) == 0:
350 self.logger.warning("Zero-length switch read, %d" % idx)
351 else:
352 break
Dan Talayco710438c2010-02-18 15:16:07 -0800353
Dan Talaycof8de5182012-04-12 22:38:41 -0700354 if len(pkt) == 0: # Still no packet
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700355 self.logger.warning("Zero-length switch read; closing cxn")
Dan Talaycof8de5182012-04-12 22:38:41 -0700356 self.logger.info(str(self))
357 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800358
Dan Talaycod12b6612010-03-07 22:00:46 -0800359 self._pkt_handle(pkt)
Dan Talayco710438c2010-02-18 15:16:07 -0800360 else:
Dan Talayco48370102010-03-03 15:17:33 -0800361 self.logger.error("Unknown socket ready: " + str(s))
Dan Talaycof8de5182012-04-12 22:38:41 -0700362 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800363
Dan Talaycof8de5182012-04-12 22:38:41 -0700364 return 0
Dan Talayco710438c2010-02-18 15:16:07 -0800365
Dan Talayco1b3f6902010-02-15 14:14:19 -0800366 def run(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800367 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800368 Activity function for class
Dan Talayco21c75c72010-02-12 22:59:24 -0800369
Dan Talayco1b3f6902010-02-15 14:14:19 -0800370 Assumes connection to switch already exists. Listens on
371 switch_socket for messages until an error (or zero len pkt)
372 occurs.
Dan Talayco21c75c72010-02-12 22:59:24 -0800373
Dan Talayco1b3f6902010-02-15 14:14:19 -0800374 When there is a message on the socket, check for handlers; queue the
375 packet if no one handles the packet.
376
377 See note for controller describing the limitation of a single
378 connection for now.
379 """
380
Dan Talayco710438c2010-02-18 15:16:07 -0800381 self.dbg_state = "starting"
Dan Talayco1b3f6902010-02-15 14:14:19 -0800382
Dan Talayco710438c2010-02-18 15:16:07 -0800383 # Create listen socket
Dan Talayco48370102010-03-03 15:17:33 -0800384 self.logger.info("Create/listen at " + self.host + ":" +
Dan Talayco710438c2010-02-18 15:16:07 -0800385 str(self.port))
386 self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
387 self.listen_socket.setsockopt(socket.SOL_SOCKET,
388 socket.SO_REUSEADDR, 1)
389 self.listen_socket.bind((self.host, self.port))
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800390 self.dbg_state = "listening"
Dan Talayco21c75c72010-02-12 22:59:24 -0800391 self.listen_socket.listen(LISTEN_QUEUE_SIZE)
Dan Talayco21c75c72010-02-12 22:59:24 -0800392
Dan Talayco48370102010-03-03 15:17:33 -0800393 self.logger.info("Waiting for switch connection")
Dan Talayco710438c2010-02-18 15:16:07 -0800394 self.socs = [self.listen_socket]
395 self.dbg_state = "running"
396 while self.active:
Dan Talayco710438c2010-02-18 15:16:07 -0800397 try:
398 sel_in, sel_out, sel_err = \
399 select.select(self.socs, [], self.socs, 1)
400 except:
401 print sys.exc_info()
Dan Talayco48370102010-03-03 15:17:33 -0800402 self.logger.error("Select error, exiting")
Dan Talaycof8de5182012-04-12 22:38:41 -0700403 self.active = False
Dan Talayco710438c2010-02-18 15:16:07 -0800404 break
Dan Talayco1b3f6902010-02-15 14:14:19 -0800405
Dan Talayco710438c2010-02-18 15:16:07 -0800406 for s in sel_err:
Dan Talayco48370102010-03-03 15:17:33 -0800407 self.logger.error("Got socket error on: " + str(s))
Dan Talaycof8de5182012-04-12 22:38:41 -0700408 self.active = False
409 break
410
411 for s in sel_in:
412 if self._socket_ready_handle(s) == -1:
Dan Talayco710438c2010-02-18 15:16:07 -0800413 self.active = False
414 break
415
Dan Talayco710438c2010-02-18 15:16:07 -0800416 # End of main loop
417 self.dbg_state = "closing"
Dan Talayco48370102010-03-03 15:17:33 -0800418 self.logger.info("Exiting controller thread")
Dan Talayco710438c2010-02-18 15:16:07 -0800419 self.shutdown()
420
421 def connect(self, timeout=None):
422 """
423 Connect to the switch
424
425 @param timeout If None, block until connected. If 0, return
426 immedidately. Otherwise, block for up to timeout seconds
427 @return Boolean, True if connected
428 """
429
430 if timeout == 0:
431 return self.switch_socket is not None
432 if self.switch_socket is not None:
433 return True
434 self.connect_cv.acquire()
435 self.connect_cv.wait(timeout)
436 self.connect_cv.release()
437
438 return self.switch_socket is not None
439
440 def kill(self):
441 """
442 Force the controller thread to quit
443
444 Just sets the active state variable to false and expects
445 the select timeout to kick in
446 """
447 self.active = False
Dan Talayco21c75c72010-02-12 22:59:24 -0800448
Dan Talayco1b3f6902010-02-15 14:14:19 -0800449 def shutdown(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800450 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800451 Shutdown the controller closing all sockets
Dan Talayco21c75c72010-02-12 22:59:24 -0800452
Dan Talayco1b3f6902010-02-15 14:14:19 -0800453 @todo Might want to synchronize shutdown with self.sync...
454 """
Dan Talaycof8de5182012-04-12 22:38:41 -0700455
Dan Talayco710438c2010-02-18 15:16:07 -0800456 self.active = False
457 try:
458 self.switch_socket.shutdown(socket.SHUT_RDWR)
459 except:
Dan Talayco48370102010-03-03 15:17:33 -0800460 self.logger.info("Ignoring switch soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800461 self.switch_socket = None
Dan Talayco1b3f6902010-02-15 14:14:19 -0800462
Dan Talayco710438c2010-02-18 15:16:07 -0800463 try:
464 self.listen_socket.shutdown(socket.SHUT_RDWR)
465 except:
Dan Talayco48370102010-03-03 15:17:33 -0800466 self.logger.info("Ignoring listen soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800467 self.listen_socket = None
Dan Talaycof8de5182012-04-12 22:38:41 -0700468
469 # Release condition variables on which controller may be wait
470 self.xid_cv.acquire()
471 self.xid_cv.notifyAll()
472 self.xid_cv.release()
473
474 self.connect_cv.acquire()
475 self.connect_cv.notifyAll()
476 self.connect_cv.release()
477
Dan Talayco710438c2010-02-18 15:16:07 -0800478 self.dbg_state = "down"
479
Dan Talayco34089522010-02-07 23:07:41 -0800480 def register(self, msg_type, handler):
481 """
482 Register a callback to receive a specific message type.
483
484 Only one handler may be registered for a given message type.
Dan Talaycod12b6612010-03-07 22:00:46 -0800485
486 WARNING: A lock is held during the handler call back, so
487 the handler should not make any blocking calls
488
Dan Talayco34089522010-02-07 23:07:41 -0800489 @param msg_type The type of message to receive. May be DEFAULT
Dan Talayco21c75c72010-02-12 22:59:24 -0800490 for all non-handled packets. The special type, the string "all"
491 will send all packets to the handler.
Dan Talayco34089522010-02-07 23:07:41 -0800492 @param handler The function to call when a message of the given
493 type is received.
494 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800495 # Should check type is valid
496 if not handler and msg_type in self.handlers.keys():
497 del self.handlers[msg_type]
498 return
499 self.handlers[msg_type] = handler
Dan Talayco34089522010-02-07 23:07:41 -0800500
Dan Talayco21c75c72010-02-12 22:59:24 -0800501 def poll(self, exp_msg=None, timeout=None):
Dan Talayco34089522010-02-07 23:07:41 -0800502 """
503 Wait for the next OF message received from the switch.
504
505 @param exp_msg If set, return only when this type of message
Dan Talayco48370102010-03-03 15:17:33 -0800506 is received (unless timeout occurs).
Dan Talaycoe226eb12010-02-18 23:06:30 -0800507 @param timeout If None, do not block. Otherwise, sleep in
Dan Talayco48370102010-03-03 15:17:33 -0800508 intervals of 1 second until message is received.
Dan Talayco34089522010-02-07 23:07:41 -0800509
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800510 @retval A pair (msg, pkt) where msg is a message object and pkt
511 the string representing the packet as received from the socket.
512 This allows additional parsing by the receiver if necessary.
513
Dan Talayco34089522010-02-07 23:07:41 -0800514 The data members in the message are in host endian order.
Dan Talayco48370102010-03-03 15:17:33 -0800515 If an error occurs, (None, None) is returned
516
517 The current queue is searched for a message of the desired type
518 before sleeping on message in events.
Dan Talayco34089522010-02-07 23:07:41 -0800519 """
Dan Talayco34089522010-02-07 23:07:41 -0800520
Dan Talaycoe226eb12010-02-18 23:06:30 -0800521 msg = pkt = None
Dan Talayco34089522010-02-07 23:07:41 -0800522
Dan Talayco48370102010-03-03 15:17:33 -0800523 self.logger.debug("Poll for " + ofp_type_map[exp_msg])
Dan Talaycoe226eb12010-02-18 23:06:30 -0800524 # First check the current queue
525 self.sync.acquire()
526 if len(self.packets) > 0:
527 if not exp_msg:
528 (msg, pkt) = self.packets.pop(0)
529 self.sync.release()
530 return (msg, pkt)
531 else:
532 for i in range(len(self.packets)):
533 msg = self.packets[i][0]
534 if msg.header.type == exp_msg:
535 (msg, pkt) = self.packets.pop(i)
536 self.sync.release()
537 return (msg, pkt)
538
539 # Okay, not currently in the queue
540 if timeout is None or timeout <= 0:
Dan Talayco21c75c72010-02-12 22:59:24 -0800541 self.sync.release()
Dan Talaycoe226eb12010-02-18 23:06:30 -0800542 return (None, None)
Dan Talayco21c75c72010-02-12 22:59:24 -0800543
Dan Talayco48370102010-03-03 15:17:33 -0800544 msg = pkt = None
545 self.logger.debug("Entering timeout")
Dan Talaycoe226eb12010-02-18 23:06:30 -0800546 # Careful of race condition releasing sync before message cv
Dan Talayco90576bd2010-02-19 10:59:02 -0800547 # Also, this style is ripe for a lockup.
Dan Talaycoe226eb12010-02-18 23:06:30 -0800548 self.expect_msg_cv.acquire()
Dan Talayco48370102010-03-03 15:17:33 -0800549 self.expect_msg_response = None
Dan Talaycoe226eb12010-02-18 23:06:30 -0800550 self.expect_msg = True
551 self.expect_msg_type = exp_msg
root2843d2b2012-04-06 10:27:46 -0700552 self.sync.release()
Dan Talaycoe226eb12010-02-18 23:06:30 -0800553 self.expect_msg_cv.wait(timeout)
554 if self.expect_msg_response is not None:
555 (msg, pkt) = self.expect_msg_response
Dan Talaycoe226eb12010-02-18 23:06:30 -0800556 self.expect_msg_cv.release()
557
558 if msg is None:
Dan Talayco48370102010-03-03 15:17:33 -0800559 self.logger.debug("Poll time out")
560 else:
561 self.logger.debug("Got msg " + str(msg))
Dan Talaycoe226eb12010-02-18 23:06:30 -0800562
563 return (msg, pkt)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800564
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700565 def transact(self, msg, timeout=-1, zero_xid=False):
Dan Talayco34089522010-02-07 23:07:41 -0800566 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800567 Run a message transaction with the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800568
569 Send the message in msg and wait for a reply with a matching
Dan Talayco21c75c72010-02-12 22:59:24 -0800570 transaction id. Transactions have the highest priority in
571 received message handling.
Dan Talaycoe37999f2010-02-09 15:27:12 -0800572
Dan Talayco21c75c72010-02-12 22:59:24 -0800573 @param msg The message object to send; must not be a string
Dan Talaycof8de5182012-04-12 22:38:41 -0700574 @param timeout The timeout in seconds; if -1 use default. if None
575 blocks without time out
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800576 @param zero_xid Normally, if the XID is 0 an XID will be generated
577 for the message. Set xero_xid to override this behavior
Dan Talayco21c75c72010-02-12 22:59:24 -0800578 @return The matching message object or None if unsuccessful
Dan Talaycoe37999f2010-02-09 15:27:12 -0800579
Dan Talayco34089522010-02-07 23:07:41 -0800580 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800581
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800582 if not zero_xid and msg.header.xid == 0:
583 msg.header.xid = gen_xid()
584
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700585 if timeout == -1:
Dan Talaycof8de5182012-04-12 22:38:41 -0700586 timeout = self.transact_to
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700587 self.logger.debug("Running transaction %d" % msg.header.xid)
Dan Talayco21c75c72010-02-12 22:59:24 -0800588 self.xid_cv.acquire()
589 if self.xid:
590 self.xid_cv.release()
Dan Talayco48370102010-03-03 15:17:33 -0800591 self.logger.error("Can only run one transaction at a time")
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700592 return (None, None)
Dan Talayco21c75c72010-02-12 22:59:24 -0800593
594 self.xid = msg.header.xid
595 self.xid_response = None
Dan Talaycof8de5182012-04-12 22:38:41 -0700596 if self.message_send(msg.pack()) < 0:
597 self.logger.error("Error sending pkt for transaction %d" %
598 msg.header.xid)
599 return (None, None)
600
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700601 self.logger.debug("Waiting for transaction %d" % msg.header.xid)
Dan Talayco21c75c72010-02-12 22:59:24 -0800602 self.xid_cv.wait(timeout)
Dan Talaycod12b6612010-03-07 22:00:46 -0800603 if self.xid_response:
Dan Talayco09c2c592010-05-13 14:21:52 -0700604 (resp, pkt) = self.xid_response
Dan Talaycod12b6612010-03-07 22:00:46 -0800605 self.xid_response = None
606 else:
Dan Talayco09c2c592010-05-13 14:21:52 -0700607 (resp, pkt) = (None, None)
Dan Talayco21c75c72010-02-12 22:59:24 -0800608 self.xid_cv.release()
Dan Talayco09c2c592010-05-13 14:21:52 -0700609 if resp is None:
610 self.logger.warning("No response for xid " + str(self.xid))
611 return (resp, pkt)
Dan Talayco34089522010-02-07 23:07:41 -0800612
Dan Talayco710438c2010-02-18 15:16:07 -0800613 def message_send(self, msg, zero_xid=False):
Dan Talayco34089522010-02-07 23:07:41 -0800614 """
615 Send the message to the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800616
Dan Talayco11c26e72010-03-07 22:03:57 -0800617 @param msg A string or OpenFlow message object to be forwarded to
618 the switch.
619 @param zero_xid If msg is an OpenFlow object (not a string) and if
Dan Talayco710438c2010-02-18 15:16:07 -0800620 the XID in the header is 0, then an XID will be generated
621 for the message. Set xero_xid to override this behavior (and keep an
622 existing 0 xid)
Dan Talaycoe37999f2010-02-09 15:27:12 -0800623
Dan Talayco710438c2010-02-18 15:16:07 -0800624 @return -1 if error, 0 on success
Dan Talayco34089522010-02-07 23:07:41 -0800625
Dan Talayco21c75c72010-02-12 22:59:24 -0800626 """
627
Dan Talayco1b3f6902010-02-15 14:14:19 -0800628 if not self.switch_socket:
629 # Sending a string indicates the message is ready to go
Dan Talayco48370102010-03-03 15:17:33 -0800630 self.logger.info("message_send: no socket")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800631 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800632 #@todo If not string, try to pack
Dan Talayco21c75c72010-02-12 22:59:24 -0800633 if type(msg) != type(""):
Dan Talayco710438c2010-02-18 15:16:07 -0800634 try:
635 if msg.header.xid == 0 and not zero_xid:
636 msg.header.xid = gen_xid()
637 outpkt = msg.pack()
638 except:
Dan Talayco48370102010-03-03 15:17:33 -0800639 self.logger.error(
Dan Talayco710438c2010-02-18 15:16:07 -0800640 "message_send: not an OF message or string?")
641 return -1
642 else:
643 outpkt = msg
Dan Talayco21c75c72010-02-12 22:59:24 -0800644
Dan Talayco48370102010-03-03 15:17:33 -0800645 self.logger.debug("Sending pkt of len " + str(len(outpkt)))
Dan Talayco710438c2010-02-18 15:16:07 -0800646 if self.switch_socket.sendall(outpkt) is None:
647 return 0
648
Dan Talayco48370102010-03-03 15:17:33 -0800649 self.logger.error("Unknown error on sendall")
Dan Talayco710438c2010-02-18 15:16:07 -0800650 return -1
Dan Talayco21c75c72010-02-12 22:59:24 -0800651
652 def __str__(self):
653 string = "Controller:\n"
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800654 string += " state " + self.dbg_state + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800655 string += " switch_addr " + str(self.switch_addr) + "\n"
656 string += " pending pkts " + str(len(self.packets)) + "\n"
657 string += " total pkts " + str(self.packets_total) + "\n"
658 string += " expired pkts " + str(self.packets_expired) + "\n"
659 string += " handled pkts " + str(self.packets_handled) + "\n"
Dan Talaycoe226eb12010-02-18 23:06:30 -0800660 string += " poll discards " + str(self.poll_discards) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800661 string += " parse errors " + str(self.parse_errors) + "\n"
662 string += " sock errrors " + str(self.socket_errors) + "\n"
663 string += " max pkts " + str(self.max_pkts) + "\n"
664 string += " host " + str(self.host) + "\n"
665 string += " port " + str(self.port) + "\n"
666 string += " keep_alive " + str(self.keep_alive) + "\n"
Dan Talaycof8de5182012-04-12 22:38:41 -0700667 string += " pkt_in_run " + str(self.pkt_in_run) + "\n"
668 string += " pkt_in_dropped " + str(self.pkt_in_dropped) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800669 return string
670
671 def show(self):
672 print str(self)
673
674def sample_handler(controller, msg, pkt):
675 """
676 Sample message handler
677
678 This is the prototype for functions registered with the controller
679 class for packet reception
680
681 @param controller The controller calling the handler
682 @param msg The parsed message object
683 @param pkt The raw packet that was received on the socket. This is
684 in case the packet contains extra unparsed data.
685 @returns Boolean value indicating if the packet was handled. If
686 not handled, the packet is placed in the queue for pollers to received
687 """
688 pass