blob: 2ae2bd7aea9bdf35c03f3bc160085dfb10f6186b [file] [log] [blame]
Dan Talayco34089522010-02-07 23:07:41 -08001"""
2OpenFlow Test Framework
3
4Controller class
5
6Provide the interface to the control channel to the switch under test.
7
8Class inherits from thread so as to run in background allowing
9asynchronous callbacks (if needed, not required). Also supports
10polling.
11
12The controller thread maintains a queue. Incoming messages that
13are not handled by a callback function are placed in this queue for
14poll calls.
15
16Callbacks and polling support specifying the message type
17
18@todo Support transaction semantics via xid
Dan Talayco1b3f6902010-02-15 14:14:19 -080019@todo Support select and listen on an administrative socket (or
20use a timeout to support clean shutdown).
21
22Currently only one connection is accepted during the life of
23the controller. There seems
24to be no clean way to interrupt an accept call. Using select that also listens
25on an administrative socket and can shut down the socket might work.
26
Dan Talayco34089522010-02-07 23:07:41 -080027"""
28
Dan Talayco34089522010-02-07 23:07:41 -080029import os
30import socket
31import time
Dan Talayco34089522010-02-07 23:07:41 -080032from threading import Thread
33from threading import Lock
Dan Talayco21c75c72010-02-12 22:59:24 -080034from threading import Condition
Dan Talayco34089522010-02-07 23:07:41 -080035from message import *
Dan Talaycoe37999f2010-02-09 15:27:12 -080036from parse import *
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080037from ofutils import *
Dan Talayco710438c2010-02-18 15:16:07 -080038# For some reason, it seems select to be last (or later).
39# Otherwise get an attribute error when calling select.select
40import select
Dan Talayco48370102010-03-03 15:17:33 -080041import logging
42
Dan Talaycof8de5182012-04-12 22:38:41 -070043
44FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.'
45 for x in range(256)])
46
47def hex_dump_buffer(src, length=16):
48 """
49 Convert src to a hex dump string and return the string
50 @param src The source buffer
51 @param length The number of bytes shown in each line
52 @returns A string showing the hex dump
53 """
54 result = ["\n"]
55 for i in xrange(0, len(src), length):
56 chars = src[i:i+length]
57 hex = ' '.join(["%02x" % ord(x) for x in chars])
58 printable = ''.join(["%s" % ((ord(x) <= 127 and
59 FILTER[ord(x)]) or '.') for x in chars])
60 result.append("%04x %-*s %s\n" % (i, length*3, hex, printable))
61 return ''.join(result)
62
Dan Talayco48370102010-03-03 15:17:33 -080063##@todo Find a better home for these identifiers (controller)
Glen Gibb741b1182010-07-08 16:43:58 -070064RCV_SIZE_DEFAULT = 32768
Dan Talayco48370102010-03-03 15:17:33 -080065LISTEN_QUEUE_SIZE = 1
Dan Talayco34089522010-02-07 23:07:41 -080066
67class Controller(Thread):
68 """
69 Class abstracting the control interface to the switch.
70
71 For receiving messages, two mechanism will be implemented. First,
72 query the interface with poll. Second, register to have a
73 function called by message type. The callback is passed the
74 message type as well as the raw packet (or message object)
75
76 One of the main purposes of this object is to translate between network
77 and host byte order. 'Above' this object, things should be in host
78 byte order.
Dan Talayco21c75c72010-02-12 22:59:24 -080079
80 @todo Consider using SocketServer for listening socket
81 @todo Test transaction code
82
83 @var rcv_size The receive size to use for receive calls
84 @var max_pkts The max size of the receive queue
85 @var keep_alive If true, listen for echo requests and respond w/
86 echo replies
Dan Talayco710438c2010-02-18 15:16:07 -080087 @var initial_hello If true, will send a hello message immediately
88 upon connecting to the switch
Dan Talayco69ca4d62012-11-15 11:50:22 -080089 @var switch If not None, do an active connection to the switch
Dan Talayco21c75c72010-02-12 22:59:24 -080090 @var host The host to use for connect
91 @var port The port to connect on
92 @var packets_total Total number of packets received
93 @var packets_expired Number of packets popped from queue as queue full
94 @var packets_handled Number of packets handled by something
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080095 @var dbg_state Debug indication of state
Dan Talayco34089522010-02-07 23:07:41 -080096 """
97
Dan Talayco69ca4d62012-11-15 11:50:22 -080098 def __init__(self, switch=None, host='127.0.0.1', port=6633, max_pkts=1024):
Dan Talayco21c75c72010-02-12 22:59:24 -080099 Thread.__init__(self)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800100 # Socket related
Dan Talayco21c75c72010-02-12 22:59:24 -0800101 self.rcv_size = RCV_SIZE_DEFAULT
Dan Talayco1b3f6902010-02-15 14:14:19 -0800102 self.listen_socket = None
103 self.switch_socket = None
104 self.switch_addr = None
Dan Talayco710438c2010-02-18 15:16:07 -0800105 self.socs = []
106 self.connect_cv = Condition()
Dan Talaycoe226eb12010-02-18 23:06:30 -0800107 self.message_cv = Condition()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800108
109 # Counters
Dan Talayco21c75c72010-02-12 22:59:24 -0800110 self.socket_errors = 0
111 self.parse_errors = 0
Dan Talayco21c75c72010-02-12 22:59:24 -0800112 self.packets_total = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -0800113 self.packets_expired = 0
114 self.packets_handled = 0
Dan Talaycoe226eb12010-02-18 23:06:30 -0800115 self.poll_discards = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -0800116
117 # State
Dan Talayco21c75c72010-02-12 22:59:24 -0800118 self.sync = Lock()
119 self.handlers = {}
120 self.keep_alive = False
Dan Talayco710438c2010-02-18 15:16:07 -0800121 self.active = True
122 self.initial_hello = True
Dan Talayco1b3f6902010-02-15 14:14:19 -0800123
Rich Lanec4f071b2012-07-11 17:25:57 -0700124 # OpenFlow message/packet queue
125 # Protected by the packets_cv lock / condition variable
126 self.packets = []
127 self.packets_cv = Condition()
128
Dan Talayco1b3f6902010-02-15 14:14:19 -0800129 # Settings
130 self.max_pkts = max_pkts
Dan Talayco69ca4d62012-11-15 11:50:22 -0800131 self.switch = switch
132 self.passive = not self.switch
Dan Talayco48370102010-03-03 15:17:33 -0800133 self.host = host
134 self.port = port
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800135 self.dbg_state = "init"
Dan Talayco48370102010-03-03 15:17:33 -0800136 self.logger = logging.getLogger("controller")
Dan Talaycof8de5182012-04-12 22:38:41 -0700137 self.filter_packet_in = False # Drop "excessive" packet ins
138 self.pkt_in_run = 0 # Count on run of packet ins
139 self.pkt_in_filter_limit = 50 # Count on run of packet ins
140 self.pkt_in_dropped = 0 # Total dropped packet ins
141 self.transact_to = 15 # Transact timeout default value; add to config
Dan Talayco1b3f6902010-02-15 14:14:19 -0800142
Dan Talaycoe226eb12010-02-18 23:06:30 -0800143 # Transaction and message type waiting variables
144 # xid_cv: Condition variable (semaphore) for packet waiters
Dan Talayco21c75c72010-02-12 22:59:24 -0800145 # xid: Transaction ID being waited on
146 # xid_response: Transaction response message
147 self.xid_cv = Condition()
148 self.xid = None
149 self.xid_response = None
150
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800151 self.buffered_input = ""
Dan Talaycoe226eb12010-02-18 23:06:30 -0800152
Dan Talaycof8de5182012-04-12 22:38:41 -0700153 def filter_packet(self, rawmsg, hdr):
154 """
155 Check if packet should be filtered
156
157 Currently filters packet in messages
158 @return Boolean, True if packet should be dropped
159 """
160 # Add check for packet in and rate limit
161 if self.filter_packet_in:
Rich Lanec4f071b2012-07-11 17:25:57 -0700162 # If we were dropping packets, report number dropped
163 # TODO dont drop expected packet ins
164 if self.pkt_in_run > self.pkt_in_filter_limit:
165 self.logger.debug("Dropped %d packet ins (%d total)"
166 % ((self.pkt_in_run -
167 self.pkt_in_filter_limit),
168 self.pkt_in_dropped))
169 self.pkt_in_run = 0
Dan Talaycof8de5182012-04-12 22:38:41 -0700170
171 return False
172
Dan Talaycod12b6612010-03-07 22:00:46 -0800173 def _pkt_handle(self, pkt):
174 """
175 Check for all packet handling conditions
176
177 Parse and verify message
178 Check if XID matches something waiting
179 Check if message is being expected for a poll operation
180 Check if keep alive is on and message is an echo request
181 Check if any registered handler wants the packet
182 Enqueue if none of those conditions is met
183
184 an echo request in case keep_alive is true, followed by
185 registered message handlers.
Glen Gibb6d467062010-07-08 16:15:08 -0700186 @param pkt The raw packet (string) which may contain multiple OF msgs
Dan Talaycod12b6612010-03-07 22:00:46 -0800187 """
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800188
189 # snag any left over data from last read()
190 pkt = self.buffered_input + pkt
191 self.buffered_input = ""
192
Glen Gibb6d467062010-07-08 16:15:08 -0700193 # Process each of the OF msgs inside the pkt
194 offset = 0
195 while offset < len(pkt):
196 # Parse the header to get type
197 hdr = of_header_parse(pkt[offset:])
Dan Talaycof8de5182012-04-12 22:38:41 -0700198 if not hdr or hdr.length == 0:
199 self.logger.error("Could not parse header")
200 self.logger.error("pkt len %d." % len(pkt))
201 if hdr:
202 self.logger.error("hdr len %d." % hdr.length)
203 self.logger.error("%s" % hex_dump_buffer(pkt[:200]))
204 self.kill()
Dan Talaycod12b6612010-03-07 22:00:46 -0800205 return
206
Glen Gibb6d467062010-07-08 16:15:08 -0700207 # Extract the raw message bytes
Ed Swierk836e5bd2012-03-20 11:08:53 -0700208 if (offset + hdr.length) > len(pkt):
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800209 break
Glen Gibb6d467062010-07-08 16:15:08 -0700210 rawmsg = pkt[offset : offset + hdr.length]
Dan Talayco4306d3e2011-09-07 09:42:26 -0700211 offset += hdr.length
Dan Talaycof8de5182012-04-12 22:38:41 -0700212
213 if self.filter_packet(rawmsg, hdr):
214 continue
215
216 self.logger.debug("Msg in: buf len %d. hdr.type %s. hdr.len %d" %
217 (len(pkt), ofp_type_map[hdr.type], hdr.length))
Glen Gibb6d467062010-07-08 16:15:08 -0700218 if hdr.version != OFP_VERSION:
219 self.logger.error("Version %d does not match OFTest version %d"
220 % (hdr.version, OFP_VERSION))
221 print "Version %d does not match OFTest version %d" % \
222 (hdr.version, OFP_VERSION)
Ken Chiangadc950f2012-10-05 13:50:03 -0700223 self.disconnect()
Dan Talaycof8de5182012-04-12 22:38:41 -0700224 return
Dan Talaycod12b6612010-03-07 22:00:46 -0800225
Glen Gibb6d467062010-07-08 16:15:08 -0700226 msg = of_message_parse(rawmsg)
227 if not msg:
228 self.parse_errors += 1
229 self.logger.warn("Could not parse message")
230 continue
231
Rich Lanec4f071b2012-07-11 17:25:57 -0700232 with self.sync:
233 # Check if transaction is waiting
234 with self.xid_cv:
235 if self.xid and hdr.xid == self.xid:
236 self.logger.debug("Matched expected XID " + str(hdr.xid))
237 self.xid_response = (msg, rawmsg)
238 self.xid = None
239 self.xid_cv.notify()
240 continue
Glen Gibb6d467062010-07-08 16:15:08 -0700241
Rich Lanec4f071b2012-07-11 17:25:57 -0700242 # Check if keep alive is set; if so, respond to echo requests
243 if self.keep_alive:
244 if hdr.type == OFPT_ECHO_REQUEST:
245 self.logger.debug("Responding to echo request")
246 rep = echo_reply()
247 rep.header.xid = hdr.xid
248 # Ignoring additional data
249 if self.message_send(rep.pack(), zero_xid=True) < 0:
250 self.logger.error("Error sending echo reply")
251 continue
Glen Gibb6d467062010-07-08 16:15:08 -0700252
Rich Lanec4f071b2012-07-11 17:25:57 -0700253 # Now check for message handlers; preference is given to
254 # handlers for a specific packet
255 handled = False
256 if hdr.type in self.handlers.keys():
257 handled = self.handlers[hdr.type](self, msg, rawmsg)
258 if not handled and ("all" in self.handlers.keys()):
259 handled = self.handlers["all"](self, msg, rawmsg)
Glen Gibb6d467062010-07-08 16:15:08 -0700260
Rich Lanec4f071b2012-07-11 17:25:57 -0700261 if not handled: # Not handled, enqueue
262 self.logger.debug("Enqueuing pkt type " + ofp_type_map[hdr.type])
263 with self.packets_cv:
264 if len(self.packets) >= self.max_pkts:
265 self.packets.pop(0)
266 self.packets_expired += 1
267 self.packets.append((msg, rawmsg))
268 self.packets_cv.notify_all()
269 self.packets_total += 1
270 else:
271 self.packets_handled += 1
272 self.logger.debug("Message handled by callback")
Glen Gibb6d467062010-07-08 16:15:08 -0700273
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800274 # end of 'while offset < len(pkt)'
275 # note that if offset = len(pkt), this is
276 # appends a harmless empty string
277 self.buffered_input += pkt[offset:]
Dan Talaycod12b6612010-03-07 22:00:46 -0800278
Dan Talayco710438c2010-02-18 15:16:07 -0800279 def _socket_ready_handle(self, s):
280 """
281 Handle an input-ready socket
Dan Talaycof8de5182012-04-12 22:38:41 -0700282
Dan Talayco710438c2010-02-18 15:16:07 -0800283 @param s The socket object that is ready
Dan Talaycof8de5182012-04-12 22:38:41 -0700284 @returns 0 on success, -1 on error
Dan Talayco710438c2010-02-18 15:16:07 -0800285 """
286
Dan Talayco69ca4d62012-11-15 11:50:22 -0800287 if self.passive and s and s == self.listen_socket:
Dan Talayco710438c2010-02-18 15:16:07 -0800288 if self.switch_socket:
Rich Lanee1da7ea2012-07-26 15:58:45 -0700289 self.logger.warning("Ignoring incoming connection; already connected to switch")
Rich Laneb4f8ecb2012-09-25 09:36:26 -0700290 (sock, addr) = self.listen_socket.accept()
291 sock.close()
Rich Lanee1da7ea2012-07-26 15:58:45 -0700292 return 0
Dan Talayco710438c2010-02-18 15:16:07 -0800293
Ken Chiange875baf2012-10-09 15:24:40 -0700294 try:
295 (sock, addr) = self.listen_socket.accept()
296 except:
297 self.logger.warning("Error on listen socket accept")
298 return -1
Rich Lanee1da7ea2012-07-26 15:58:45 -0700299 self.socs.append(sock)
Ken Chiang77173992012-10-30 15:44:39 -0700300 self.logger.info(self.host+":"+str(self.port)+": Incoming connection from "+str(addr))
Rich Lanee1da7ea2012-07-26 15:58:45 -0700301
Rich Laneee3586c2012-07-11 17:26:02 -0700302 with self.connect_cv:
Rich Lanee1da7ea2012-07-26 15:58:45 -0700303 (self.switch_socket, self.switch_addr) = (sock, addr)
Rich Lane82ef1832012-12-22 17:04:35 -0800304 self.switch_socket.setsockopt(socket.IPPROTO_TCP,
305 socket.TCP_NODELAY, True)
Rich Lane1a8d5aa2012-10-08 15:40:03 -0700306 if self.initial_hello:
307 self.message_send(hello())
Rich Lanee1da7ea2012-07-26 15:58:45 -0700308 self.connect_cv.notify() # Notify anyone waiting
Dan Talaycof8de5182012-04-12 22:38:41 -0700309 elif s and s == self.switch_socket:
310 for idx in range(3): # debug: try a couple of times
311 try:
312 pkt = self.switch_socket.recv(self.rcv_size)
313 except:
314 self.logger.warning("Error on switch read")
315 return -1
316
317 if not self.active:
318 return 0
319
320 if len(pkt) == 0:
321 self.logger.warning("Zero-length switch read, %d" % idx)
322 else:
323 break
Dan Talayco710438c2010-02-18 15:16:07 -0800324
Dan Talaycof8de5182012-04-12 22:38:41 -0700325 if len(pkt) == 0: # Still no packet
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700326 self.logger.warning("Zero-length switch read; closing cxn")
Dan Talaycof8de5182012-04-12 22:38:41 -0700327 self.logger.info(str(self))
328 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800329
Dan Talaycod12b6612010-03-07 22:00:46 -0800330 self._pkt_handle(pkt)
Dan Talayco710438c2010-02-18 15:16:07 -0800331 else:
Dan Talayco48370102010-03-03 15:17:33 -0800332 self.logger.error("Unknown socket ready: " + str(s))
Dan Talaycof8de5182012-04-12 22:38:41 -0700333 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800334
Dan Talaycof8de5182012-04-12 22:38:41 -0700335 return 0
Dan Talayco710438c2010-02-18 15:16:07 -0800336
Dan Talayco69ca4d62012-11-15 11:50:22 -0800337 def active_connect(self):
338 """
339 Actively connect to a switch IP addr
340 """
341 try:
342 self.logger.info("Trying active connection to %s" % self.switch)
343 soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
344 soc.connect((self.switch, self.port))
345 self.logger.info("Connected to " + self.switch + " on " +
346 str(self.port))
Rich Lane82ef1832012-12-22 17:04:35 -0800347 soc.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800348 self.switch_addr = (self.switch, self.port)
349 return soc
350 except (StandardError, socket.error), e:
351 self.logger.error("Could not connect to %s at %d:: %s" %
352 (self.switch, self.port, str(e)))
353 return None
354
Dan Talayco1b3f6902010-02-15 14:14:19 -0800355 def run(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800356 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800357 Activity function for class
Dan Talayco21c75c72010-02-12 22:59:24 -0800358
Dan Talayco1b3f6902010-02-15 14:14:19 -0800359 Assumes connection to switch already exists. Listens on
360 switch_socket for messages until an error (or zero len pkt)
361 occurs.
Dan Talayco21c75c72010-02-12 22:59:24 -0800362
Dan Talayco1b3f6902010-02-15 14:14:19 -0800363 When there is a message on the socket, check for handlers; queue the
364 packet if no one handles the packet.
365
366 See note for controller describing the limitation of a single
367 connection for now.
368 """
369
Dan Talayco710438c2010-02-18 15:16:07 -0800370 self.dbg_state = "starting"
Dan Talayco1b3f6902010-02-15 14:14:19 -0800371
Dan Talayco710438c2010-02-18 15:16:07 -0800372 # Create listen socket
Dan Talayco69ca4d62012-11-15 11:50:22 -0800373 if self.passive:
374 self.logger.info("Create/listen at " + self.host + ":" +
375 str(self.port))
376 self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
377 self.listen_socket.setsockopt(socket.SOL_SOCKET,
378 socket.SO_REUSEADDR, 1)
379 self.listen_socket.bind((self.host, self.port))
380 self.dbg_state = "listening"
381 self.listen_socket.listen(LISTEN_QUEUE_SIZE)
Dan Talayco21c75c72010-02-12 22:59:24 -0800382
Dan Talayco69ca4d62012-11-15 11:50:22 -0800383 self.logger.info("Listening for switch connection")
384 self.socs = [self.listen_socket]
385 self.dbg_state = "running"
Ken Chiangadc950f2012-10-05 13:50:03 -0700386
Dan Talayco710438c2010-02-18 15:16:07 -0800387 while self.active:
Dan Talayco710438c2010-02-18 15:16:07 -0800388 try:
389 sel_in, sel_out, sel_err = \
390 select.select(self.socs, [], self.socs, 1)
391 except:
392 print sys.exc_info()
Ken Chiangadc950f2012-10-05 13:50:03 -0700393 self.logger.error("Select error, disconnecting")
394 self.disconnect()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800395
Dan Talayco710438c2010-02-18 15:16:07 -0800396 for s in sel_err:
Ken Chiangadc950f2012-10-05 13:50:03 -0700397 self.logger.error("Got socket error on: " + str(s) + ", disconnecting")
398 self.disconnect()
Dan Talaycof8de5182012-04-12 22:38:41 -0700399
400 for s in sel_in:
401 if self._socket_ready_handle(s) == -1:
Ken Chiangadc950f2012-10-05 13:50:03 -0700402 self.disconnect()
Dan Talayco710438c2010-02-18 15:16:07 -0800403
Dan Talayco710438c2010-02-18 15:16:07 -0800404 # End of main loop
405 self.dbg_state = "closing"
Dan Talayco48370102010-03-03 15:17:33 -0800406 self.logger.info("Exiting controller thread")
Dan Talayco710438c2010-02-18 15:16:07 -0800407 self.shutdown()
408
Rich Lane8806bc42012-07-26 19:18:37 -0700409 def connect(self, timeout=-1):
Dan Talayco710438c2010-02-18 15:16:07 -0800410 """
411 Connect to the switch
412
Rich Lane8806bc42012-07-26 19:18:37 -0700413 @param timeout Block for up to timeout seconds. Pass -1 for the default.
Dan Talayco710438c2010-02-18 15:16:07 -0800414 @return Boolean, True if connected
415 """
416
Dan Talayco69ca4d62012-11-15 11:50:22 -0800417 if not self.passive: # Do active connection now
418 self.logger.info("Attempting to connect to %s on port %s" %
419 (self.switch, str(self.port)))
420 soc = self.active_connect()
421 if soc:
422 self.logger.info("Connected to %s", self.switch)
423 self.socs = [soc]
424 self.dbg_state = "running"
425 self.switch_socket = soc
426 with self.connect_cv:
427 if self.initial_hello:
428 self.message_send(hello())
429 self.connect_cv.notify() # Notify anyone waiting
430 else:
431 self.logger.error("Could not actively connect to switch %s",
432 self.switch)
433 self.active = False
434 else:
435 with self.connect_cv:
436 timed_wait(self.connect_cv, lambda: self.switch_socket,
437 timeout=timeout)
438
Dan Talayco710438c2010-02-18 15:16:07 -0800439 return self.switch_socket is not None
440
Ken Chiangadc950f2012-10-05 13:50:03 -0700441 def disconnect(self, timeout=-1):
442 """
443 If connected to a switch, disconnect.
444 """
445 if self.switch_socket:
446 self.socs.remove(self.switch_socket)
447 self.switch_socket.close()
448 self.switch_socket = None
449 self.switch_addr = None
Ken Chiang74be4722012-12-21 13:07:03 -0800450 with self.packets_cv:
451 self.packets = []
Ken Chiange875baf2012-10-09 15:24:40 -0700452 with self.connect_cv:
453 self.connect_cv.notifyAll()
Ken Chiangadc950f2012-10-05 13:50:03 -0700454
455 def wait_disconnected(self, timeout=-1):
456 """
457 @param timeout Block for up to timeout seconds. Pass -1 for the default.
458 @return Boolean, True if disconnected
459 """
460
Ken Chiange875baf2012-10-09 15:24:40 -0700461 with self.connect_cv:
462 timed_wait(self.connect_cv,
Ken Chiangadc950f2012-10-05 13:50:03 -0700463 lambda: True if not self.switch_socket else None,
464 timeout=timeout)
465 return self.switch_socket is None
466
Dan Talayco710438c2010-02-18 15:16:07 -0800467 def kill(self):
468 """
469 Force the controller thread to quit
470
471 Just sets the active state variable to false and expects
472 the select timeout to kick in
473 """
474 self.active = False
Dan Talayco21c75c72010-02-12 22:59:24 -0800475
Dan Talayco1b3f6902010-02-15 14:14:19 -0800476 def shutdown(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800477 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800478 Shutdown the controller closing all sockets
Dan Talayco21c75c72010-02-12 22:59:24 -0800479
Dan Talayco1b3f6902010-02-15 14:14:19 -0800480 @todo Might want to synchronize shutdown with self.sync...
481 """
Dan Talaycof8de5182012-04-12 22:38:41 -0700482
Dan Talayco710438c2010-02-18 15:16:07 -0800483 self.active = False
484 try:
485 self.switch_socket.shutdown(socket.SHUT_RDWR)
486 except:
Dan Talayco48370102010-03-03 15:17:33 -0800487 self.logger.info("Ignoring switch soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800488 self.switch_socket = None
Dan Talayco1b3f6902010-02-15 14:14:19 -0800489
Dan Talayco710438c2010-02-18 15:16:07 -0800490 try:
491 self.listen_socket.shutdown(socket.SHUT_RDWR)
492 except:
Dan Talayco48370102010-03-03 15:17:33 -0800493 self.logger.info("Ignoring listen soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800494 self.listen_socket = None
Dan Talaycof8de5182012-04-12 22:38:41 -0700495
Rich Laneee3586c2012-07-11 17:26:02 -0700496 # Wakeup condition variables on which controller may be wait
497 with self.xid_cv:
498 self.xid_cv.notifyAll()
Dan Talaycof8de5182012-04-12 22:38:41 -0700499
Rich Laneee3586c2012-07-11 17:26:02 -0700500 with self.connect_cv:
501 self.connect_cv.notifyAll()
Dan Talaycof8de5182012-04-12 22:38:41 -0700502
Dan Talayco710438c2010-02-18 15:16:07 -0800503 self.dbg_state = "down"
504
Dan Talayco34089522010-02-07 23:07:41 -0800505 def register(self, msg_type, handler):
506 """
507 Register a callback to receive a specific message type.
508
509 Only one handler may be registered for a given message type.
Dan Talaycod12b6612010-03-07 22:00:46 -0800510
511 WARNING: A lock is held during the handler call back, so
512 the handler should not make any blocking calls
513
Dan Talayco34089522010-02-07 23:07:41 -0800514 @param msg_type The type of message to receive. May be DEFAULT
Dan Talayco21c75c72010-02-12 22:59:24 -0800515 for all non-handled packets. The special type, the string "all"
516 will send all packets to the handler.
Dan Talayco34089522010-02-07 23:07:41 -0800517 @param handler The function to call when a message of the given
518 type is received.
519 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800520 # Should check type is valid
521 if not handler and msg_type in self.handlers.keys():
522 del self.handlers[msg_type]
523 return
524 self.handlers[msg_type] = handler
Dan Talayco34089522010-02-07 23:07:41 -0800525
Rich Laneb64ce3d2012-07-26 15:37:57 -0700526 def poll(self, exp_msg=None, timeout=-1):
Dan Talayco34089522010-02-07 23:07:41 -0800527 """
528 Wait for the next OF message received from the switch.
529
530 @param exp_msg If set, return only when this type of message
Dan Talayco48370102010-03-03 15:17:33 -0800531 is received (unless timeout occurs).
Rich Laneb64ce3d2012-07-26 15:37:57 -0700532
533 @param timeout Maximum number of seconds to wait for the message.
534 Pass -1 for the default timeout.
Dan Talayco34089522010-02-07 23:07:41 -0800535
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800536 @retval A pair (msg, pkt) where msg is a message object and pkt
537 the string representing the packet as received from the socket.
538 This allows additional parsing by the receiver if necessary.
539
Dan Talayco34089522010-02-07 23:07:41 -0800540 The data members in the message are in host endian order.
Dan Talayco48370102010-03-03 15:17:33 -0800541 If an error occurs, (None, None) is returned
Dan Talayco34089522010-02-07 23:07:41 -0800542 """
Dan Talayco34089522010-02-07 23:07:41 -0800543
Ken Chiang77173992012-10-30 15:44:39 -0700544 if exp_msg is not None:
Ed Swierk9e55e282012-08-22 06:57:28 -0700545 self.logger.debug("Poll for %s" % ofp_type_map[exp_msg])
546 else:
547 self.logger.debug("Poll for any OF message")
Rich Laneb64ce3d2012-07-26 15:37:57 -0700548
549 # Take the packet from the queue
Rich Lanec4f071b2012-07-11 17:25:57 -0700550 def grab():
551 if len(self.packets) > 0:
Ken Chiang77173992012-10-30 15:44:39 -0700552 if exp_msg is None:
Rich Lanec4f071b2012-07-11 17:25:57 -0700553 self.logger.debug("Looking for any packet")
554 (msg, pkt) = self.packets.pop(0)
555 return (msg, pkt)
556 else:
557 self.logger.debug("Looking for %s" % ofp_type_map[exp_msg])
558 for i in range(len(self.packets)):
559 msg = self.packets[i][0]
560 self.logger.debug("Checking packets[%d] (%s)" % (i, ofp_type_map[msg.header.type]))
561 if msg.header.type == exp_msg:
562 (msg, pkt) = self.packets.pop(i)
563 return (msg, pkt)
564 # Not found
565 self.logger.debug("Packet not in queue")
Rich Laneb64ce3d2012-07-26 15:37:57 -0700566 return None
Dan Talayco21c75c72010-02-12 22:59:24 -0800567
Rich Lanec4f071b2012-07-11 17:25:57 -0700568 with self.packets_cv:
Rich Lane8806bc42012-07-26 19:18:37 -0700569 ret = timed_wait(self.packets_cv, grab, timeout=timeout)
Rich Lanec4f071b2012-07-11 17:25:57 -0700570
Rich Laneb64ce3d2012-07-26 15:37:57 -0700571 if ret != None:
572 (msg, pkt) = ret
573 self.logger.debug("Got message %s" % str(msg))
574 return (msg, pkt)
575 else:
576 return (None, None)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800577
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700578 def transact(self, msg, timeout=-1, zero_xid=False):
Dan Talayco34089522010-02-07 23:07:41 -0800579 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800580 Run a message transaction with the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800581
582 Send the message in msg and wait for a reply with a matching
Dan Talayco21c75c72010-02-12 22:59:24 -0800583 transaction id. Transactions have the highest priority in
584 received message handling.
Dan Talaycoe37999f2010-02-09 15:27:12 -0800585
Dan Talayco21c75c72010-02-12 22:59:24 -0800586 @param msg The message object to send; must not be a string
Rich Lanee1da7ea2012-07-26 15:58:45 -0700587 @param timeout The timeout in seconds; if -1 use default.
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800588 @param zero_xid Normally, if the XID is 0 an XID will be generated
Ed Swierk9e55e282012-08-22 06:57:28 -0700589 for the message. Set zero_xid to override this behavior
Dan Talayco21c75c72010-02-12 22:59:24 -0800590 @return The matching message object or None if unsuccessful
Dan Talaycoe37999f2010-02-09 15:27:12 -0800591
Dan Talayco34089522010-02-07 23:07:41 -0800592 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800593
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800594 if not zero_xid and msg.header.xid == 0:
595 msg.header.xid = gen_xid()
596
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700597 self.logger.debug("Running transaction %d" % msg.header.xid)
Dan Talayco21c75c72010-02-12 22:59:24 -0800598
Rich Lane9aca1992012-07-11 17:26:31 -0700599 with self.xid_cv:
600 if self.xid:
601 self.logger.error("Can only run one transaction at a time")
602 return (None, None)
Dan Talaycof8de5182012-04-12 22:38:41 -0700603
Rich Lane9aca1992012-07-11 17:26:31 -0700604 self.xid = msg.header.xid
Dan Talaycod12b6612010-03-07 22:00:46 -0800605 self.xid_response = None
Rich Lane9aca1992012-07-11 17:26:31 -0700606 if self.message_send(msg.pack()) < 0:
607 self.logger.error("Error sending pkt for transaction %d" %
608 msg.header.xid)
609 return (None, None)
610
Rich Lane8806bc42012-07-26 19:18:37 -0700611 self.logger.debug("Waiting for transaction %d" % msg.header.xid)
Rich Lanee1da7ea2012-07-26 15:58:45 -0700612 timed_wait(self.xid_cv, lambda: self.xid_response, timeout=timeout)
Rich Lane9aca1992012-07-11 17:26:31 -0700613
614 if self.xid_response:
615 (resp, pkt) = self.xid_response
616 self.xid_response = None
617 else:
618 (resp, pkt) = (None, None)
619
Dan Talayco09c2c592010-05-13 14:21:52 -0700620 if resp is None:
621 self.logger.warning("No response for xid " + str(self.xid))
622 return (resp, pkt)
Dan Talayco34089522010-02-07 23:07:41 -0800623
Dan Talayco710438c2010-02-18 15:16:07 -0800624 def message_send(self, msg, zero_xid=False):
Dan Talayco34089522010-02-07 23:07:41 -0800625 """
626 Send the message to the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800627
Dan Talayco11c26e72010-03-07 22:03:57 -0800628 @param msg A string or OpenFlow message object to be forwarded to
629 the switch.
630 @param zero_xid If msg is an OpenFlow object (not a string) and if
Dan Talayco710438c2010-02-18 15:16:07 -0800631 the XID in the header is 0, then an XID will be generated
Ed Swierk9e55e282012-08-22 06:57:28 -0700632 for the message. Set zero_xid to override this behavior (and keep an
Dan Talayco710438c2010-02-18 15:16:07 -0800633 existing 0 xid)
Dan Talaycoe37999f2010-02-09 15:27:12 -0800634
Ed Swierk9e55e282012-08-22 06:57:28 -0700635 @return 0 on success
Dan Talayco34089522010-02-07 23:07:41 -0800636
Dan Talayco21c75c72010-02-12 22:59:24 -0800637 """
638
Dan Talayco1b3f6902010-02-15 14:14:19 -0800639 if not self.switch_socket:
640 # Sending a string indicates the message is ready to go
Ed Swierk9e55e282012-08-22 06:57:28 -0700641 raise Exception("no socket")
Dan Talayco710438c2010-02-18 15:16:07 -0800642 #@todo If not string, try to pack
Dan Talayco21c75c72010-02-12 22:59:24 -0800643 if type(msg) != type(""):
Ed Swierk9e55e282012-08-22 06:57:28 -0700644 if msg.header.xid == 0 and not zero_xid:
645 msg.header.xid = gen_xid()
646 outpkt = msg.pack()
Dan Talayco710438c2010-02-18 15:16:07 -0800647 else:
648 outpkt = msg
Dan Talayco21c75c72010-02-12 22:59:24 -0800649
Dan Talayco48370102010-03-03 15:17:33 -0800650 self.logger.debug("Sending pkt of len " + str(len(outpkt)))
Ed Swierk9e55e282012-08-22 06:57:28 -0700651 if self.switch_socket.sendall(outpkt) is not None:
652 raise Exception("unknown error on sendall")
Dan Talayco710438c2010-02-18 15:16:07 -0800653
Ed Swierk9e55e282012-08-22 06:57:28 -0700654 return 0
Dan Talayco21c75c72010-02-12 22:59:24 -0800655
656 def __str__(self):
657 string = "Controller:\n"
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800658 string += " state " + self.dbg_state + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800659 string += " switch_addr " + str(self.switch_addr) + "\n"
660 string += " pending pkts " + str(len(self.packets)) + "\n"
661 string += " total pkts " + str(self.packets_total) + "\n"
662 string += " expired pkts " + str(self.packets_expired) + "\n"
663 string += " handled pkts " + str(self.packets_handled) + "\n"
Dan Talaycoe226eb12010-02-18 23:06:30 -0800664 string += " poll discards " + str(self.poll_discards) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800665 string += " parse errors " + str(self.parse_errors) + "\n"
666 string += " sock errrors " + str(self.socket_errors) + "\n"
667 string += " max pkts " + str(self.max_pkts) + "\n"
Dan Talayco69ca4d62012-11-15 11:50:22 -0800668 string += " target switch " + str(self.switch) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800669 string += " host " + str(self.host) + "\n"
670 string += " port " + str(self.port) + "\n"
671 string += " keep_alive " + str(self.keep_alive) + "\n"
Dan Talaycof8de5182012-04-12 22:38:41 -0700672 string += " pkt_in_run " + str(self.pkt_in_run) + "\n"
673 string += " pkt_in_dropped " + str(self.pkt_in_dropped) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800674 return string
675
676 def show(self):
677 print str(self)
678
679def sample_handler(controller, msg, pkt):
680 """
681 Sample message handler
682
683 This is the prototype for functions registered with the controller
684 class for packet reception
685
686 @param controller The controller calling the handler
687 @param msg The parsed message object
688 @param pkt The raw packet that was received on the socket. This is
689 in case the packet contains extra unparsed data.
690 @returns Boolean value indicating if the packet was handled. If
691 not handled, the packet is placed in the queue for pollers to received
692 """
693 pass