blob: 6fd88f51e4232ecf9aa0d73146b28058a5550157 [file] [log] [blame]
Dan Talayco34089522010-02-07 23:07:41 -08001"""
2OpenFlow Test Framework
3
4Controller class
5
6Provide the interface to the control channel to the switch under test.
7
8Class inherits from thread so as to run in background allowing
9asynchronous callbacks (if needed, not required). Also supports
10polling.
11
12The controller thread maintains a queue. Incoming messages that
13are not handled by a callback function are placed in this queue for
14poll calls.
15
16Callbacks and polling support specifying the message type
17
18@todo Support transaction semantics via xid
Dan Talayco1b3f6902010-02-15 14:14:19 -080019@todo Support select and listen on an administrative socket (or
20use a timeout to support clean shutdown).
21
22Currently only one connection is accepted during the life of
23the controller. There seems
24to be no clean way to interrupt an accept call. Using select that also listens
25on an administrative socket and can shut down the socket might work.
26
Dan Talayco34089522010-02-07 23:07:41 -080027"""
28
Dan Talayco34089522010-02-07 23:07:41 -080029import os
30import socket
31import time
Rich Lanecd97d3d2013-01-07 18:50:06 -080032import struct
33import select
34import logging
Dan Talayco34089522010-02-07 23:07:41 -080035from threading import Thread
36from threading import Lock
Dan Talayco21c75c72010-02-12 22:59:24 -080037from threading import Condition
Rich Lane9fd05682013-01-10 15:30:38 -080038import ofp
Rich Lanecd97d3d2013-01-07 18:50:06 -080039import ofutils
Dan Talayco48370102010-03-03 15:17:33 -080040
Dan Talaycof8de5182012-04-12 22:38:41 -070041
42FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.'
43 for x in range(256)])
44
45def hex_dump_buffer(src, length=16):
46 """
47 Convert src to a hex dump string and return the string
48 @param src The source buffer
49 @param length The number of bytes shown in each line
50 @returns A string showing the hex dump
51 """
52 result = ["\n"]
53 for i in xrange(0, len(src), length):
54 chars = src[i:i+length]
55 hex = ' '.join(["%02x" % ord(x) for x in chars])
56 printable = ''.join(["%s" % ((ord(x) <= 127 and
57 FILTER[ord(x)]) or '.') for x in chars])
58 result.append("%04x %-*s %s\n" % (i, length*3, hex, printable))
59 return ''.join(result)
60
Dan Talayco48370102010-03-03 15:17:33 -080061##@todo Find a better home for these identifiers (controller)
Glen Gibb741b1182010-07-08 16:43:58 -070062RCV_SIZE_DEFAULT = 32768
Dan Talayco48370102010-03-03 15:17:33 -080063LISTEN_QUEUE_SIZE = 1
Dan Talayco34089522010-02-07 23:07:41 -080064
65class Controller(Thread):
66 """
67 Class abstracting the control interface to the switch.
68
69 For receiving messages, two mechanism will be implemented. First,
70 query the interface with poll. Second, register to have a
71 function called by message type. The callback is passed the
72 message type as well as the raw packet (or message object)
73
74 One of the main purposes of this object is to translate between network
75 and host byte order. 'Above' this object, things should be in host
76 byte order.
Dan Talayco21c75c72010-02-12 22:59:24 -080077
78 @todo Consider using SocketServer for listening socket
79 @todo Test transaction code
80
81 @var rcv_size The receive size to use for receive calls
82 @var max_pkts The max size of the receive queue
83 @var keep_alive If true, listen for echo requests and respond w/
84 echo replies
Dan Talayco710438c2010-02-18 15:16:07 -080085 @var initial_hello If true, will send a hello message immediately
86 upon connecting to the switch
Dan Talayco69ca4d62012-11-15 11:50:22 -080087 @var switch If not None, do an active connection to the switch
Dan Talayco21c75c72010-02-12 22:59:24 -080088 @var host The host to use for connect
89 @var port The port to connect on
90 @var packets_total Total number of packets received
91 @var packets_expired Number of packets popped from queue as queue full
92 @var packets_handled Number of packets handled by something
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080093 @var dbg_state Debug indication of state
Dan Talayco34089522010-02-07 23:07:41 -080094 """
95
Dan Talayco69ca4d62012-11-15 11:50:22 -080096 def __init__(self, switch=None, host='127.0.0.1', port=6633, max_pkts=1024):
Dan Talayco21c75c72010-02-12 22:59:24 -080097 Thread.__init__(self)
Dan Talayco1b3f6902010-02-15 14:14:19 -080098 # Socket related
Dan Talayco21c75c72010-02-12 22:59:24 -080099 self.rcv_size = RCV_SIZE_DEFAULT
Dan Talayco1b3f6902010-02-15 14:14:19 -0800100 self.listen_socket = None
101 self.switch_socket = None
102 self.switch_addr = None
Dan Talayco710438c2010-02-18 15:16:07 -0800103 self.connect_cv = Condition()
Dan Talaycoe226eb12010-02-18 23:06:30 -0800104 self.message_cv = Condition()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800105
Rich Lane4dfd5e12012-12-22 19:48:01 -0800106 # Used to wake up the event loop from another thread
Rich Lanecd97d3d2013-01-07 18:50:06 -0800107 self.waker = ofutils.EventDescriptor()
Rich Lane32797542012-12-22 17:46:05 -0800108
Dan Talayco1b3f6902010-02-15 14:14:19 -0800109 # Counters
Dan Talayco21c75c72010-02-12 22:59:24 -0800110 self.socket_errors = 0
111 self.parse_errors = 0
Dan Talayco21c75c72010-02-12 22:59:24 -0800112 self.packets_total = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -0800113 self.packets_expired = 0
114 self.packets_handled = 0
Dan Talaycoe226eb12010-02-18 23:06:30 -0800115 self.poll_discards = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -0800116
117 # State
Dan Talayco21c75c72010-02-12 22:59:24 -0800118 self.sync = Lock()
119 self.handlers = {}
120 self.keep_alive = False
Dan Talayco710438c2010-02-18 15:16:07 -0800121 self.active = True
122 self.initial_hello = True
Dan Talayco1b3f6902010-02-15 14:14:19 -0800123
Rich Lanec4f071b2012-07-11 17:25:57 -0700124 # OpenFlow message/packet queue
125 # Protected by the packets_cv lock / condition variable
126 self.packets = []
127 self.packets_cv = Condition()
128
Dan Talayco1b3f6902010-02-15 14:14:19 -0800129 # Settings
130 self.max_pkts = max_pkts
Dan Talayco69ca4d62012-11-15 11:50:22 -0800131 self.switch = switch
132 self.passive = not self.switch
Dan Talayco48370102010-03-03 15:17:33 -0800133 self.host = host
134 self.port = port
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800135 self.dbg_state = "init"
Dan Talayco48370102010-03-03 15:17:33 -0800136 self.logger = logging.getLogger("controller")
Dan Talaycof8de5182012-04-12 22:38:41 -0700137 self.filter_packet_in = False # Drop "excessive" packet ins
138 self.pkt_in_run = 0 # Count on run of packet ins
139 self.pkt_in_filter_limit = 50 # Count on run of packet ins
140 self.pkt_in_dropped = 0 # Total dropped packet ins
141 self.transact_to = 15 # Transact timeout default value; add to config
Dan Talayco1b3f6902010-02-15 14:14:19 -0800142
Dan Talaycoe226eb12010-02-18 23:06:30 -0800143 # Transaction and message type waiting variables
144 # xid_cv: Condition variable (semaphore) for packet waiters
Dan Talayco21c75c72010-02-12 22:59:24 -0800145 # xid: Transaction ID being waited on
146 # xid_response: Transaction response message
147 self.xid_cv = Condition()
148 self.xid = None
149 self.xid_response = None
150
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800151 self.buffered_input = ""
Dan Talaycoe226eb12010-02-18 23:06:30 -0800152
Rich Lane207502e2012-12-31 14:29:12 -0800153 # Create listen socket
154 if self.passive:
155 self.logger.info("Create/listen at " + self.host + ":" +
156 str(self.port))
157 self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
158 self.listen_socket.setsockopt(socket.SOL_SOCKET,
159 socket.SO_REUSEADDR, 1)
160 self.listen_socket.bind((self.host, self.port))
161 self.listen_socket.listen(LISTEN_QUEUE_SIZE)
162
Dan Talaycof8de5182012-04-12 22:38:41 -0700163 def filter_packet(self, rawmsg, hdr):
164 """
165 Check if packet should be filtered
166
167 Currently filters packet in messages
168 @return Boolean, True if packet should be dropped
169 """
170 # Add check for packet in and rate limit
171 if self.filter_packet_in:
Rich Lanec4f071b2012-07-11 17:25:57 -0700172 # If we were dropping packets, report number dropped
173 # TODO dont drop expected packet ins
174 if self.pkt_in_run > self.pkt_in_filter_limit:
175 self.logger.debug("Dropped %d packet ins (%d total)"
176 % ((self.pkt_in_run -
177 self.pkt_in_filter_limit),
178 self.pkt_in_dropped))
179 self.pkt_in_run = 0
Dan Talaycof8de5182012-04-12 22:38:41 -0700180
181 return False
182
Dan Talaycod12b6612010-03-07 22:00:46 -0800183 def _pkt_handle(self, pkt):
184 """
185 Check for all packet handling conditions
186
187 Parse and verify message
188 Check if XID matches something waiting
189 Check if message is being expected for a poll operation
190 Check if keep alive is on and message is an echo request
191 Check if any registered handler wants the packet
192 Enqueue if none of those conditions is met
193
194 an echo request in case keep_alive is true, followed by
195 registered message handlers.
Glen Gibb6d467062010-07-08 16:15:08 -0700196 @param pkt The raw packet (string) which may contain multiple OF msgs
Dan Talaycod12b6612010-03-07 22:00:46 -0800197 """
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800198
199 # snag any left over data from last read()
200 pkt = self.buffered_input + pkt
201 self.buffered_input = ""
202
Glen Gibb6d467062010-07-08 16:15:08 -0700203 # Process each of the OF msgs inside the pkt
204 offset = 0
205 while offset < len(pkt):
206 # Parse the header to get type
Rich Lane78ef8b92013-01-10 12:19:23 -0800207 hdr = ofp.parse.of_header_parse(pkt[offset:])
Dan Talaycof8de5182012-04-12 22:38:41 -0700208 if not hdr or hdr.length == 0:
209 self.logger.error("Could not parse header")
210 self.logger.error("pkt len %d." % len(pkt))
211 if hdr:
212 self.logger.error("hdr len %d." % hdr.length)
213 self.logger.error("%s" % hex_dump_buffer(pkt[:200]))
Rich Lane376bb402012-12-31 15:20:16 -0800214 self.shutdown()
Dan Talaycod12b6612010-03-07 22:00:46 -0800215 return
216
Glen Gibb6d467062010-07-08 16:15:08 -0700217 # Extract the raw message bytes
Ed Swierk836e5bd2012-03-20 11:08:53 -0700218 if (offset + hdr.length) > len(pkt):
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800219 break
Glen Gibb6d467062010-07-08 16:15:08 -0700220 rawmsg = pkt[offset : offset + hdr.length]
Dan Talayco4306d3e2011-09-07 09:42:26 -0700221 offset += hdr.length
Dan Talaycof8de5182012-04-12 22:38:41 -0700222
223 if self.filter_packet(rawmsg, hdr):
224 continue
225
Rich Lane5d63b9c2013-01-11 14:12:37 -0800226 self.logger.debug("Msg in: buf len %d. hdr.type %s. hdr.len %d hdr.version %d hdr.xid %d" %
Rich Laned7b0ffa2013-03-08 15:53:42 -0800227 (len(pkt), ofp.ofp_type_map[hdr.type], hdr.length, hdr.version, hdr.xid))
228 if hdr.version < ofp.OFP_VERSION:
Rich Lanec44b6242013-01-10 12:23:54 -0800229 self.logger.error("Switch only supports up to OpenFlow version %d (OFTest version is %d)",
Rich Laned7b0ffa2013-03-08 15:53:42 -0800230 hdr.version, ofp.OFP_VERSION)
Rich Lanec44b6242013-01-10 12:23:54 -0800231 print "Switch only supports up to OpenFlow version %d (OFTest version is %d)" % \
Rich Laned7b0ffa2013-03-08 15:53:42 -0800232 (hdr.version, ofp.OFP_VERSION)
Ken Chiangadc950f2012-10-05 13:50:03 -0700233 self.disconnect()
Dan Talaycof8de5182012-04-12 22:38:41 -0700234 return
Dan Talaycod12b6612010-03-07 22:00:46 -0800235
Rich Lane78ef8b92013-01-10 12:19:23 -0800236 msg = ofp.parse.of_message_parse(rawmsg)
Glen Gibb6d467062010-07-08 16:15:08 -0700237 if not msg:
238 self.parse_errors += 1
239 self.logger.warn("Could not parse message")
240 continue
241
Rich Lanec4f071b2012-07-11 17:25:57 -0700242 with self.sync:
243 # Check if transaction is waiting
244 with self.xid_cv:
245 if self.xid and hdr.xid == self.xid:
246 self.logger.debug("Matched expected XID " + str(hdr.xid))
247 self.xid_response = (msg, rawmsg)
248 self.xid = None
249 self.xid_cv.notify()
250 continue
Glen Gibb6d467062010-07-08 16:15:08 -0700251
Rich Lanec4f071b2012-07-11 17:25:57 -0700252 # Check if keep alive is set; if so, respond to echo requests
253 if self.keep_alive:
Rich Laned7b0ffa2013-03-08 15:53:42 -0800254 if hdr.type == ofp.OFPT_ECHO_REQUEST:
Rich Lanec4f071b2012-07-11 17:25:57 -0700255 self.logger.debug("Responding to echo request")
Rich Lane78ef8b92013-01-10 12:19:23 -0800256 rep = ofp.message.echo_reply()
Rich Lanec4f071b2012-07-11 17:25:57 -0700257 rep.header.xid = hdr.xid
258 # Ignoring additional data
Rich Lane5c3151c2013-01-03 17:15:41 -0800259 self.message_send(rep.pack(), zero_xid=True)
Rich Lanec4f071b2012-07-11 17:25:57 -0700260 continue
Glen Gibb6d467062010-07-08 16:15:08 -0700261
Rich Lane5d63b9c2013-01-11 14:12:37 -0800262 # Log error messages
263 if hdr.type == ofp.OFPT_ERROR:
264 if msg.type in ofp.ofp_error_type_map:
265 type_str = ofp.ofp_error_type_map[msg.type]
266 if msg.type == ofp.OFPET_HELLO_FAILED:
267 code_map = ofp.ofp_hello_failed_code_map
268 elif msg.type == ofp.OFPET_BAD_REQUEST:
269 code_map = ofp.ofp_bad_request_code_map
270 elif msg.type == ofp.OFPET_BAD_ACTION:
271 code_map = ofp.ofp_bad_action_code_map
272 elif msg.type == ofp.OFPET_FLOW_MOD_FAILED:
273 code_map = ofp.ofp_flow_mod_failed_code_map
274 elif msg.type == ofp.OFPET_PORT_MOD_FAILED:
275 code_map = ofp.ofp_port_mod_failed_code_map
276 elif msg.type == ofp.OFPET_QUEUE_OP_FAILED:
277 code_map = ofp.ofp_queue_op_failed_code_map
278 else:
279 code_map = None
280
281 if code_map and msg.code in code_map:
282 code_str = code_map[msg.code]
283 else:
284 code_str = "unknown"
285 else:
286 type_str = "unknown"
287 self.logger.warn("Received error message: xid=%d type=%s (%d) code=%s (%d)",
288 hdr.xid, type_str, msg.type, code_str, msg.code)
289
Rich Lanec4f071b2012-07-11 17:25:57 -0700290 # Now check for message handlers; preference is given to
291 # handlers for a specific packet
292 handled = False
293 if hdr.type in self.handlers.keys():
294 handled = self.handlers[hdr.type](self, msg, rawmsg)
295 if not handled and ("all" in self.handlers.keys()):
296 handled = self.handlers["all"](self, msg, rawmsg)
Glen Gibb6d467062010-07-08 16:15:08 -0700297
Rich Lanec4f071b2012-07-11 17:25:57 -0700298 if not handled: # Not handled, enqueue
Rich Laned7b0ffa2013-03-08 15:53:42 -0800299 self.logger.debug("Enqueuing pkt type " + ofp.ofp_type_map[hdr.type])
Rich Lanec4f071b2012-07-11 17:25:57 -0700300 with self.packets_cv:
301 if len(self.packets) >= self.max_pkts:
302 self.packets.pop(0)
303 self.packets_expired += 1
304 self.packets.append((msg, rawmsg))
305 self.packets_cv.notify_all()
306 self.packets_total += 1
307 else:
308 self.packets_handled += 1
309 self.logger.debug("Message handled by callback")
Glen Gibb6d467062010-07-08 16:15:08 -0700310
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800311 # end of 'while offset < len(pkt)'
312 # note that if offset = len(pkt), this is
313 # appends a harmless empty string
314 self.buffered_input += pkt[offset:]
Dan Talaycod12b6612010-03-07 22:00:46 -0800315
Dan Talayco710438c2010-02-18 15:16:07 -0800316 def _socket_ready_handle(self, s):
317 """
318 Handle an input-ready socket
Dan Talaycof8de5182012-04-12 22:38:41 -0700319
Dan Talayco710438c2010-02-18 15:16:07 -0800320 @param s The socket object that is ready
Dan Talaycof8de5182012-04-12 22:38:41 -0700321 @returns 0 on success, -1 on error
Dan Talayco710438c2010-02-18 15:16:07 -0800322 """
323
Dan Talayco69ca4d62012-11-15 11:50:22 -0800324 if self.passive and s and s == self.listen_socket:
Dan Talayco710438c2010-02-18 15:16:07 -0800325 if self.switch_socket:
Rich Lanee1da7ea2012-07-26 15:58:45 -0700326 self.logger.warning("Ignoring incoming connection; already connected to switch")
Rich Laneb4f8ecb2012-09-25 09:36:26 -0700327 (sock, addr) = self.listen_socket.accept()
328 sock.close()
Rich Lanee1da7ea2012-07-26 15:58:45 -0700329 return 0
Dan Talayco710438c2010-02-18 15:16:07 -0800330
Ken Chiange875baf2012-10-09 15:24:40 -0700331 try:
332 (sock, addr) = self.listen_socket.accept()
333 except:
334 self.logger.warning("Error on listen socket accept")
335 return -1
Ken Chiang77173992012-10-30 15:44:39 -0700336 self.logger.info(self.host+":"+str(self.port)+": Incoming connection from "+str(addr))
Rich Lanee1da7ea2012-07-26 15:58:45 -0700337
Rich Laneee3586c2012-07-11 17:26:02 -0700338 with self.connect_cv:
Rich Lanee1da7ea2012-07-26 15:58:45 -0700339 (self.switch_socket, self.switch_addr) = (sock, addr)
Rich Lane82ef1832012-12-22 17:04:35 -0800340 self.switch_socket.setsockopt(socket.IPPROTO_TCP,
341 socket.TCP_NODELAY, True)
Rich Lane1a8d5aa2012-10-08 15:40:03 -0700342 if self.initial_hello:
Rich Lane78ef8b92013-01-10 12:19:23 -0800343 self.message_send(ofp.message.hello())
Rich Lanee1da7ea2012-07-26 15:58:45 -0700344 self.connect_cv.notify() # Notify anyone waiting
Dan Talaycof8de5182012-04-12 22:38:41 -0700345 elif s and s == self.switch_socket:
346 for idx in range(3): # debug: try a couple of times
347 try:
348 pkt = self.switch_socket.recv(self.rcv_size)
349 except:
350 self.logger.warning("Error on switch read")
351 return -1
352
353 if not self.active:
354 return 0
355
356 if len(pkt) == 0:
357 self.logger.warning("Zero-length switch read, %d" % idx)
358 else:
359 break
Dan Talayco710438c2010-02-18 15:16:07 -0800360
Dan Talaycof8de5182012-04-12 22:38:41 -0700361 if len(pkt) == 0: # Still no packet
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700362 self.logger.warning("Zero-length switch read; closing cxn")
Dan Talaycof8de5182012-04-12 22:38:41 -0700363 self.logger.info(str(self))
364 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800365
Dan Talaycod12b6612010-03-07 22:00:46 -0800366 self._pkt_handle(pkt)
Rich Lane4dfd5e12012-12-22 19:48:01 -0800367 elif s and s == self.waker:
368 self.waker.wait()
Dan Talayco710438c2010-02-18 15:16:07 -0800369 else:
Dan Talayco48370102010-03-03 15:17:33 -0800370 self.logger.error("Unknown socket ready: " + str(s))
Dan Talaycof8de5182012-04-12 22:38:41 -0700371 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800372
Dan Talaycof8de5182012-04-12 22:38:41 -0700373 return 0
Dan Talayco710438c2010-02-18 15:16:07 -0800374
Dan Talayco69ca4d62012-11-15 11:50:22 -0800375 def active_connect(self):
376 """
377 Actively connect to a switch IP addr
378 """
379 try:
380 self.logger.info("Trying active connection to %s" % self.switch)
381 soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
382 soc.connect((self.switch, self.port))
383 self.logger.info("Connected to " + self.switch + " on " +
384 str(self.port))
Rich Lane82ef1832012-12-22 17:04:35 -0800385 soc.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800386 self.switch_addr = (self.switch, self.port)
387 return soc
388 except (StandardError, socket.error), e:
389 self.logger.error("Could not connect to %s at %d:: %s" %
390 (self.switch, self.port, str(e)))
391 return None
392
Rich Lane32797542012-12-22 17:46:05 -0800393 def wakeup(self):
394 """
395 Wake up the event loop, presumably from another thread.
396 """
Rich Lane4dfd5e12012-12-22 19:48:01 -0800397 self.waker.notify()
Rich Lane32797542012-12-22 17:46:05 -0800398
399 def sockets(self):
400 """
401 Return list of sockets to select on.
402 """
Rich Lane4dfd5e12012-12-22 19:48:01 -0800403 socs = [self.listen_socket, self.switch_socket, self.waker]
Rich Lane32797542012-12-22 17:46:05 -0800404 return [x for x in socs if x]
405
Dan Talayco1b3f6902010-02-15 14:14:19 -0800406 def run(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800407 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800408 Activity function for class
Dan Talayco21c75c72010-02-12 22:59:24 -0800409
Dan Talayco1b3f6902010-02-15 14:14:19 -0800410 Assumes connection to switch already exists. Listens on
411 switch_socket for messages until an error (or zero len pkt)
412 occurs.
Dan Talayco21c75c72010-02-12 22:59:24 -0800413
Dan Talayco1b3f6902010-02-15 14:14:19 -0800414 When there is a message on the socket, check for handlers; queue the
415 packet if no one handles the packet.
416
417 See note for controller describing the limitation of a single
418 connection for now.
419 """
420
Rich Lane207502e2012-12-31 14:29:12 -0800421 self.dbg_state = "running"
Ken Chiangadc950f2012-10-05 13:50:03 -0700422
Dan Talayco710438c2010-02-18 15:16:07 -0800423 while self.active:
Dan Talayco710438c2010-02-18 15:16:07 -0800424 try:
425 sel_in, sel_out, sel_err = \
Rich Lane32797542012-12-22 17:46:05 -0800426 select.select(self.sockets(), [], self.sockets(), 1)
Dan Talayco710438c2010-02-18 15:16:07 -0800427 except:
428 print sys.exc_info()
Ken Chiangadc950f2012-10-05 13:50:03 -0700429 self.logger.error("Select error, disconnecting")
430 self.disconnect()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800431
Dan Talayco710438c2010-02-18 15:16:07 -0800432 for s in sel_err:
Ken Chiangadc950f2012-10-05 13:50:03 -0700433 self.logger.error("Got socket error on: " + str(s) + ", disconnecting")
434 self.disconnect()
Dan Talaycof8de5182012-04-12 22:38:41 -0700435
436 for s in sel_in:
437 if self._socket_ready_handle(s) == -1:
Ken Chiangadc950f2012-10-05 13:50:03 -0700438 self.disconnect()
Dan Talayco710438c2010-02-18 15:16:07 -0800439
Dan Talayco710438c2010-02-18 15:16:07 -0800440 # End of main loop
441 self.dbg_state = "closing"
Dan Talayco48370102010-03-03 15:17:33 -0800442 self.logger.info("Exiting controller thread")
Dan Talayco710438c2010-02-18 15:16:07 -0800443 self.shutdown()
444
Rich Lane8806bc42012-07-26 19:18:37 -0700445 def connect(self, timeout=-1):
Dan Talayco710438c2010-02-18 15:16:07 -0800446 """
447 Connect to the switch
448
Rich Lane8806bc42012-07-26 19:18:37 -0700449 @param timeout Block for up to timeout seconds. Pass -1 for the default.
Dan Talayco710438c2010-02-18 15:16:07 -0800450 @return Boolean, True if connected
451 """
452
Dan Talayco69ca4d62012-11-15 11:50:22 -0800453 if not self.passive: # Do active connection now
454 self.logger.info("Attempting to connect to %s on port %s" %
455 (self.switch, str(self.port)))
456 soc = self.active_connect()
457 if soc:
458 self.logger.info("Connected to %s", self.switch)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800459 self.dbg_state = "running"
460 self.switch_socket = soc
Rich Lane32797542012-12-22 17:46:05 -0800461 self.wakeup()
Dan Talayco69ca4d62012-11-15 11:50:22 -0800462 with self.connect_cv:
463 if self.initial_hello:
464 self.message_send(hello())
465 self.connect_cv.notify() # Notify anyone waiting
466 else:
467 self.logger.error("Could not actively connect to switch %s",
468 self.switch)
469 self.active = False
470 else:
471 with self.connect_cv:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800472 ofutils.timed_wait(self.connect_cv, lambda: self.switch_socket,
473 timeout=timeout)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800474
Dan Talayco710438c2010-02-18 15:16:07 -0800475 return self.switch_socket is not None
476
Ken Chiangadc950f2012-10-05 13:50:03 -0700477 def disconnect(self, timeout=-1):
478 """
479 If connected to a switch, disconnect.
480 """
481 if self.switch_socket:
Ken Chiangadc950f2012-10-05 13:50:03 -0700482 self.switch_socket.close()
483 self.switch_socket = None
484 self.switch_addr = None
Ken Chiang74be4722012-12-21 13:07:03 -0800485 with self.packets_cv:
486 self.packets = []
Ken Chiange875baf2012-10-09 15:24:40 -0700487 with self.connect_cv:
488 self.connect_cv.notifyAll()
Ken Chiangadc950f2012-10-05 13:50:03 -0700489
490 def wait_disconnected(self, timeout=-1):
491 """
492 @param timeout Block for up to timeout seconds. Pass -1 for the default.
493 @return Boolean, True if disconnected
494 """
495
Ken Chiange875baf2012-10-09 15:24:40 -0700496 with self.connect_cv:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800497 ofutils.timed_wait(self.connect_cv,
498 lambda: True if not self.switch_socket else None,
499 timeout=timeout)
Ken Chiangadc950f2012-10-05 13:50:03 -0700500 return self.switch_socket is None
501
Dan Talayco710438c2010-02-18 15:16:07 -0800502 def kill(self):
503 """
504 Force the controller thread to quit
Dan Talayco710438c2010-02-18 15:16:07 -0800505 """
506 self.active = False
Rich Lane32797542012-12-22 17:46:05 -0800507 self.wakeup()
Rich Lane376bb402012-12-31 15:20:16 -0800508 self.join()
Dan Talayco21c75c72010-02-12 22:59:24 -0800509
Dan Talayco1b3f6902010-02-15 14:14:19 -0800510 def shutdown(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800511 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800512 Shutdown the controller closing all sockets
Dan Talayco21c75c72010-02-12 22:59:24 -0800513
Dan Talayco1b3f6902010-02-15 14:14:19 -0800514 @todo Might want to synchronize shutdown with self.sync...
515 """
Dan Talaycof8de5182012-04-12 22:38:41 -0700516
Dan Talayco710438c2010-02-18 15:16:07 -0800517 self.active = False
518 try:
519 self.switch_socket.shutdown(socket.SHUT_RDWR)
520 except:
Dan Talayco48370102010-03-03 15:17:33 -0800521 self.logger.info("Ignoring switch soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800522 self.switch_socket = None
Dan Talayco1b3f6902010-02-15 14:14:19 -0800523
Dan Talayco710438c2010-02-18 15:16:07 -0800524 try:
525 self.listen_socket.shutdown(socket.SHUT_RDWR)
526 except:
Dan Talayco48370102010-03-03 15:17:33 -0800527 self.logger.info("Ignoring listen soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800528 self.listen_socket = None
Dan Talaycof8de5182012-04-12 22:38:41 -0700529
Rich Laneee3586c2012-07-11 17:26:02 -0700530 # Wakeup condition variables on which controller may be wait
531 with self.xid_cv:
532 self.xid_cv.notifyAll()
Dan Talaycof8de5182012-04-12 22:38:41 -0700533
Rich Laneee3586c2012-07-11 17:26:02 -0700534 with self.connect_cv:
535 self.connect_cv.notifyAll()
Dan Talaycof8de5182012-04-12 22:38:41 -0700536
Rich Lane32797542012-12-22 17:46:05 -0800537 self.wakeup()
Dan Talayco710438c2010-02-18 15:16:07 -0800538 self.dbg_state = "down"
539
Dan Talayco34089522010-02-07 23:07:41 -0800540 def register(self, msg_type, handler):
541 """
542 Register a callback to receive a specific message type.
543
544 Only one handler may be registered for a given message type.
Dan Talaycod12b6612010-03-07 22:00:46 -0800545
546 WARNING: A lock is held during the handler call back, so
547 the handler should not make any blocking calls
548
Dan Talayco34089522010-02-07 23:07:41 -0800549 @param msg_type The type of message to receive. May be DEFAULT
Dan Talayco21c75c72010-02-12 22:59:24 -0800550 for all non-handled packets. The special type, the string "all"
551 will send all packets to the handler.
Dan Talayco34089522010-02-07 23:07:41 -0800552 @param handler The function to call when a message of the given
553 type is received.
554 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800555 # Should check type is valid
556 if not handler and msg_type in self.handlers.keys():
557 del self.handlers[msg_type]
558 return
559 self.handlers[msg_type] = handler
Dan Talayco34089522010-02-07 23:07:41 -0800560
Rich Laneb64ce3d2012-07-26 15:37:57 -0700561 def poll(self, exp_msg=None, timeout=-1):
Dan Talayco34089522010-02-07 23:07:41 -0800562 """
563 Wait for the next OF message received from the switch.
564
565 @param exp_msg If set, return only when this type of message
Dan Talayco48370102010-03-03 15:17:33 -0800566 is received (unless timeout occurs).
Rich Laneb64ce3d2012-07-26 15:37:57 -0700567
568 @param timeout Maximum number of seconds to wait for the message.
569 Pass -1 for the default timeout.
Dan Talayco34089522010-02-07 23:07:41 -0800570
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800571 @retval A pair (msg, pkt) where msg is a message object and pkt
572 the string representing the packet as received from the socket.
573 This allows additional parsing by the receiver if necessary.
574
Dan Talayco34089522010-02-07 23:07:41 -0800575 The data members in the message are in host endian order.
Dan Talayco48370102010-03-03 15:17:33 -0800576 If an error occurs, (None, None) is returned
Dan Talayco34089522010-02-07 23:07:41 -0800577 """
Dan Talayco34089522010-02-07 23:07:41 -0800578
Ken Chiang77173992012-10-30 15:44:39 -0700579 if exp_msg is not None:
Rich Laned7b0ffa2013-03-08 15:53:42 -0800580 self.logger.debug("Poll for %s" % ofp.ofp_type_map[exp_msg])
Ed Swierk9e55e282012-08-22 06:57:28 -0700581 else:
582 self.logger.debug("Poll for any OF message")
Rich Laneb64ce3d2012-07-26 15:37:57 -0700583
584 # Take the packet from the queue
Rich Lanec4f071b2012-07-11 17:25:57 -0700585 def grab():
586 if len(self.packets) > 0:
Ken Chiang77173992012-10-30 15:44:39 -0700587 if exp_msg is None:
Rich Lanec4f071b2012-07-11 17:25:57 -0700588 self.logger.debug("Looking for any packet")
589 (msg, pkt) = self.packets.pop(0)
590 return (msg, pkt)
591 else:
Rich Laned7b0ffa2013-03-08 15:53:42 -0800592 self.logger.debug("Looking for %s" % ofp.ofp_type_map[exp_msg])
Rich Lanec4f071b2012-07-11 17:25:57 -0700593 for i in range(len(self.packets)):
594 msg = self.packets[i][0]
Rich Laned7b0ffa2013-03-08 15:53:42 -0800595 self.logger.debug("Checking packets[%d] (%s)" % (i, ofp.ofp_type_map[msg.header.type]))
Rich Lanec4f071b2012-07-11 17:25:57 -0700596 if msg.header.type == exp_msg:
597 (msg, pkt) = self.packets.pop(i)
598 return (msg, pkt)
599 # Not found
600 self.logger.debug("Packet not in queue")
Rich Laneb64ce3d2012-07-26 15:37:57 -0700601 return None
Dan Talayco21c75c72010-02-12 22:59:24 -0800602
Rich Lanec4f071b2012-07-11 17:25:57 -0700603 with self.packets_cv:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800604 ret = ofutils.timed_wait(self.packets_cv, grab, timeout=timeout)
Rich Lanec4f071b2012-07-11 17:25:57 -0700605
Rich Laneb64ce3d2012-07-26 15:37:57 -0700606 if ret != None:
607 (msg, pkt) = ret
608 self.logger.debug("Got message %s" % str(msg))
609 return (msg, pkt)
610 else:
611 return (None, None)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800612
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700613 def transact(self, msg, timeout=-1, zero_xid=False):
Dan Talayco34089522010-02-07 23:07:41 -0800614 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800615 Run a message transaction with the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800616
617 Send the message in msg and wait for a reply with a matching
Dan Talayco21c75c72010-02-12 22:59:24 -0800618 transaction id. Transactions have the highest priority in
619 received message handling.
Dan Talaycoe37999f2010-02-09 15:27:12 -0800620
Dan Talayco21c75c72010-02-12 22:59:24 -0800621 @param msg The message object to send; must not be a string
Rich Lanee1da7ea2012-07-26 15:58:45 -0700622 @param timeout The timeout in seconds; if -1 use default.
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800623 @param zero_xid Normally, if the XID is 0 an XID will be generated
Ed Swierk9e55e282012-08-22 06:57:28 -0700624 for the message. Set zero_xid to override this behavior
Dan Talayco21c75c72010-02-12 22:59:24 -0800625 @return The matching message object or None if unsuccessful
Dan Talaycoe37999f2010-02-09 15:27:12 -0800626
Dan Talayco34089522010-02-07 23:07:41 -0800627 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800628
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800629 if not zero_xid and msg.header.xid == 0:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800630 msg.header.xid = ofutils.gen_xid()
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800631
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700632 self.logger.debug("Running transaction %d" % msg.header.xid)
Dan Talayco21c75c72010-02-12 22:59:24 -0800633
Rich Lane9aca1992012-07-11 17:26:31 -0700634 with self.xid_cv:
635 if self.xid:
636 self.logger.error("Can only run one transaction at a time")
637 return (None, None)
Dan Talaycof8de5182012-04-12 22:38:41 -0700638
Rich Lane9aca1992012-07-11 17:26:31 -0700639 self.xid = msg.header.xid
Dan Talaycod12b6612010-03-07 22:00:46 -0800640 self.xid_response = None
Rich Lane5c3151c2013-01-03 17:15:41 -0800641 self.message_send(msg.pack())
Rich Lane9aca1992012-07-11 17:26:31 -0700642
Rich Lane8806bc42012-07-26 19:18:37 -0700643 self.logger.debug("Waiting for transaction %d" % msg.header.xid)
Rich Lanecd97d3d2013-01-07 18:50:06 -0800644 ofutils.timed_wait(self.xid_cv, lambda: self.xid_response, timeout=timeout)
Rich Lane9aca1992012-07-11 17:26:31 -0700645
646 if self.xid_response:
647 (resp, pkt) = self.xid_response
648 self.xid_response = None
649 else:
650 (resp, pkt) = (None, None)
651
Dan Talayco09c2c592010-05-13 14:21:52 -0700652 if resp is None:
653 self.logger.warning("No response for xid " + str(self.xid))
654 return (resp, pkt)
Dan Talayco34089522010-02-07 23:07:41 -0800655
Dan Talayco710438c2010-02-18 15:16:07 -0800656 def message_send(self, msg, zero_xid=False):
Dan Talayco34089522010-02-07 23:07:41 -0800657 """
658 Send the message to the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800659
Dan Talayco11c26e72010-03-07 22:03:57 -0800660 @param msg A string or OpenFlow message object to be forwarded to
661 the switch.
662 @param zero_xid If msg is an OpenFlow object (not a string) and if
Dan Talayco710438c2010-02-18 15:16:07 -0800663 the XID in the header is 0, then an XID will be generated
Ed Swierk9e55e282012-08-22 06:57:28 -0700664 for the message. Set zero_xid to override this behavior (and keep an
Dan Talayco710438c2010-02-18 15:16:07 -0800665 existing 0 xid)
Dan Talayco21c75c72010-02-12 22:59:24 -0800666 """
667
Dan Talayco1b3f6902010-02-15 14:14:19 -0800668 if not self.switch_socket:
669 # Sending a string indicates the message is ready to go
Ed Swierk9e55e282012-08-22 06:57:28 -0700670 raise Exception("no socket")
Dan Talayco710438c2010-02-18 15:16:07 -0800671 #@todo If not string, try to pack
Dan Talayco21c75c72010-02-12 22:59:24 -0800672 if type(msg) != type(""):
Ed Swierk9e55e282012-08-22 06:57:28 -0700673 if msg.header.xid == 0 and not zero_xid:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800674 msg.header.xid = ofutils.gen_xid()
Ed Swierk9e55e282012-08-22 06:57:28 -0700675 outpkt = msg.pack()
Dan Talayco710438c2010-02-18 15:16:07 -0800676 else:
677 outpkt = msg
Dan Talayco21c75c72010-02-12 22:59:24 -0800678
Rich Lanef18980d2012-12-31 17:11:41 -0800679 msg_version, msg_type, msg_len, msg_xid = struct.unpack_from("!BBHL", outpkt)
Rich Lane5d63b9c2013-01-11 14:12:37 -0800680 self.logger.debug("Msg out: buf len %d. hdr.type %s. hdr.len %d hdr.version %d hdr.xid %d",
Rich Lanef18980d2012-12-31 17:11:41 -0800681 len(outpkt),
Rich Laned7b0ffa2013-03-08 15:53:42 -0800682 ofp.ofp_type_map.get(msg_type, "unknown (%d)" % msg_type),
Rich Lanecd97d3d2013-01-07 18:50:06 -0800683 msg_len,
Rich Lane5d63b9c2013-01-11 14:12:37 -0800684 msg_version,
685 msg_xid)
Ed Swierk9e55e282012-08-22 06:57:28 -0700686 if self.switch_socket.sendall(outpkt) is not None:
Rich Lane5c3151c2013-01-03 17:15:41 -0800687 raise AssertionError("failed to send message to switch")
Dan Talayco710438c2010-02-18 15:16:07 -0800688
Rich Lane5c3151c2013-01-03 17:15:41 -0800689 return 0 # for backwards compatibility
Dan Talayco21c75c72010-02-12 22:59:24 -0800690
691 def __str__(self):
692 string = "Controller:\n"
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800693 string += " state " + self.dbg_state + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800694 string += " switch_addr " + str(self.switch_addr) + "\n"
695 string += " pending pkts " + str(len(self.packets)) + "\n"
696 string += " total pkts " + str(self.packets_total) + "\n"
697 string += " expired pkts " + str(self.packets_expired) + "\n"
698 string += " handled pkts " + str(self.packets_handled) + "\n"
Dan Talaycoe226eb12010-02-18 23:06:30 -0800699 string += " poll discards " + str(self.poll_discards) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800700 string += " parse errors " + str(self.parse_errors) + "\n"
701 string += " sock errrors " + str(self.socket_errors) + "\n"
702 string += " max pkts " + str(self.max_pkts) + "\n"
Dan Talayco69ca4d62012-11-15 11:50:22 -0800703 string += " target switch " + str(self.switch) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800704 string += " host " + str(self.host) + "\n"
705 string += " port " + str(self.port) + "\n"
706 string += " keep_alive " + str(self.keep_alive) + "\n"
Dan Talaycof8de5182012-04-12 22:38:41 -0700707 string += " pkt_in_run " + str(self.pkt_in_run) + "\n"
708 string += " pkt_in_dropped " + str(self.pkt_in_dropped) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800709 return string
710
711 def show(self):
712 print str(self)
713
714def sample_handler(controller, msg, pkt):
715 """
716 Sample message handler
717
718 This is the prototype for functions registered with the controller
719 class for packet reception
720
721 @param controller The controller calling the handler
722 @param msg The parsed message object
723 @param pkt The raw packet that was received on the socket. This is
724 in case the packet contains extra unparsed data.
725 @returns Boolean value indicating if the packet was handled. If
726 not handled, the packet is placed in the queue for pollers to received
727 """
728 pass