blob: b23eefa841d5e11c795cc588bf993fac66328700 [file] [log] [blame]
Dan Talayco34089522010-02-07 23:07:41 -08001"""
2OpenFlow Test Framework
3
4Controller class
5
6Provide the interface to the control channel to the switch under test.
7
8Class inherits from thread so as to run in background allowing
9asynchronous callbacks (if needed, not required). Also supports
10polling.
11
12The controller thread maintains a queue. Incoming messages that
13are not handled by a callback function are placed in this queue for
14poll calls.
15
16Callbacks and polling support specifying the message type
17
18@todo Support transaction semantics via xid
Dan Talayco1b3f6902010-02-15 14:14:19 -080019@todo Support select and listen on an administrative socket (or
20use a timeout to support clean shutdown).
21
22Currently only one connection is accepted during the life of
23the controller. There seems
24to be no clean way to interrupt an accept call. Using select that also listens
25on an administrative socket and can shut down the socket might work.
26
Dan Talayco34089522010-02-07 23:07:41 -080027"""
28
Rich Lane720eaf22013-08-09 18:00:45 -070029import sys
Dan Talayco34089522010-02-07 23:07:41 -080030import os
31import socket
32import time
Rich Lanecd97d3d2013-01-07 18:50:06 -080033import struct
34import select
35import logging
Dan Talayco34089522010-02-07 23:07:41 -080036from threading import Thread
37from threading import Lock
Dan Talayco21c75c72010-02-12 22:59:24 -080038from threading import Condition
Rich Lane9fd05682013-01-10 15:30:38 -080039import ofp
Rich Lanecd97d3d2013-01-07 18:50:06 -080040import ofutils
Dan Talayco48370102010-03-03 15:17:33 -080041
Dan Talaycof8de5182012-04-12 22:38:41 -070042
43FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.'
44 for x in range(256)])
45
46def hex_dump_buffer(src, length=16):
47 """
48 Convert src to a hex dump string and return the string
49 @param src The source buffer
50 @param length The number of bytes shown in each line
51 @returns A string showing the hex dump
52 """
53 result = ["\n"]
54 for i in xrange(0, len(src), length):
55 chars = src[i:i+length]
56 hex = ' '.join(["%02x" % ord(x) for x in chars])
57 printable = ''.join(["%s" % ((ord(x) <= 127 and
58 FILTER[ord(x)]) or '.') for x in chars])
59 result.append("%04x %-*s %s\n" % (i, length*3, hex, printable))
60 return ''.join(result)
61
Dan Talayco48370102010-03-03 15:17:33 -080062##@todo Find a better home for these identifiers (controller)
Glen Gibb741b1182010-07-08 16:43:58 -070063RCV_SIZE_DEFAULT = 32768
Dan Talayco48370102010-03-03 15:17:33 -080064LISTEN_QUEUE_SIZE = 1
Dan Talayco34089522010-02-07 23:07:41 -080065
66class Controller(Thread):
67 """
68 Class abstracting the control interface to the switch.
69
70 For receiving messages, two mechanism will be implemented. First,
71 query the interface with poll. Second, register to have a
72 function called by message type. The callback is passed the
73 message type as well as the raw packet (or message object)
74
75 One of the main purposes of this object is to translate between network
76 and host byte order. 'Above' this object, things should be in host
77 byte order.
Dan Talayco21c75c72010-02-12 22:59:24 -080078
79 @todo Consider using SocketServer for listening socket
80 @todo Test transaction code
81
82 @var rcv_size The receive size to use for receive calls
83 @var max_pkts The max size of the receive queue
84 @var keep_alive If true, listen for echo requests and respond w/
85 echo replies
Dan Talayco710438c2010-02-18 15:16:07 -080086 @var initial_hello If true, will send a hello message immediately
87 upon connecting to the switch
Dan Talayco69ca4d62012-11-15 11:50:22 -080088 @var switch If not None, do an active connection to the switch
Dan Talayco21c75c72010-02-12 22:59:24 -080089 @var host The host to use for connect
90 @var port The port to connect on
91 @var packets_total Total number of packets received
92 @var packets_expired Number of packets popped from queue as queue full
93 @var packets_handled Number of packets handled by something
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080094 @var dbg_state Debug indication of state
Dan Talayco34089522010-02-07 23:07:41 -080095 """
96
Dan Talayco69ca4d62012-11-15 11:50:22 -080097 def __init__(self, switch=None, host='127.0.0.1', port=6633, max_pkts=1024):
Dan Talayco21c75c72010-02-12 22:59:24 -080098 Thread.__init__(self)
Dan Talayco1b3f6902010-02-15 14:14:19 -080099 # Socket related
Dan Talayco21c75c72010-02-12 22:59:24 -0800100 self.rcv_size = RCV_SIZE_DEFAULT
Dan Talayco1b3f6902010-02-15 14:14:19 -0800101 self.listen_socket = None
102 self.switch_socket = None
103 self.switch_addr = None
Dan Talayco710438c2010-02-18 15:16:07 -0800104 self.connect_cv = Condition()
Dan Talaycoe226eb12010-02-18 23:06:30 -0800105 self.message_cv = Condition()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800106
Rich Lane4dfd5e12012-12-22 19:48:01 -0800107 # Used to wake up the event loop from another thread
Rich Lanecd97d3d2013-01-07 18:50:06 -0800108 self.waker = ofutils.EventDescriptor()
Rich Lane32797542012-12-22 17:46:05 -0800109
Dan Talayco1b3f6902010-02-15 14:14:19 -0800110 # Counters
Dan Talayco21c75c72010-02-12 22:59:24 -0800111 self.socket_errors = 0
112 self.parse_errors = 0
Dan Talayco21c75c72010-02-12 22:59:24 -0800113 self.packets_total = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -0800114 self.packets_expired = 0
115 self.packets_handled = 0
Dan Talaycoe226eb12010-02-18 23:06:30 -0800116 self.poll_discards = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -0800117
118 # State
Dan Talayco21c75c72010-02-12 22:59:24 -0800119 self.sync = Lock()
120 self.handlers = {}
121 self.keep_alive = False
Dan Talayco710438c2010-02-18 15:16:07 -0800122 self.active = True
123 self.initial_hello = True
Dan Talayco1b3f6902010-02-15 14:14:19 -0800124
Rich Lanec4f071b2012-07-11 17:25:57 -0700125 # OpenFlow message/packet queue
126 # Protected by the packets_cv lock / condition variable
127 self.packets = []
128 self.packets_cv = Condition()
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700129 self.packet_in_count = 0
Rich Lanec4f071b2012-07-11 17:25:57 -0700130
Dan Talayco1b3f6902010-02-15 14:14:19 -0800131 # Settings
132 self.max_pkts = max_pkts
Dan Talayco69ca4d62012-11-15 11:50:22 -0800133 self.switch = switch
134 self.passive = not self.switch
Dan Talayco48370102010-03-03 15:17:33 -0800135 self.host = host
136 self.port = port
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800137 self.dbg_state = "init"
Dan Talayco48370102010-03-03 15:17:33 -0800138 self.logger = logging.getLogger("controller")
Dan Talaycof8de5182012-04-12 22:38:41 -0700139 self.filter_packet_in = False # Drop "excessive" packet ins
140 self.pkt_in_run = 0 # Count on run of packet ins
141 self.pkt_in_filter_limit = 50 # Count on run of packet ins
142 self.pkt_in_dropped = 0 # Total dropped packet ins
143 self.transact_to = 15 # Transact timeout default value; add to config
Dan Talayco1b3f6902010-02-15 14:14:19 -0800144
Dan Talaycoe226eb12010-02-18 23:06:30 -0800145 # Transaction and message type waiting variables
146 # xid_cv: Condition variable (semaphore) for packet waiters
Dan Talayco21c75c72010-02-12 22:59:24 -0800147 # xid: Transaction ID being waited on
148 # xid_response: Transaction response message
149 self.xid_cv = Condition()
150 self.xid = None
151 self.xid_response = None
152
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800153 self.buffered_input = ""
Dan Talaycoe226eb12010-02-18 23:06:30 -0800154
Rich Lane207502e2012-12-31 14:29:12 -0800155 # Create listen socket
156 if self.passive:
157 self.logger.info("Create/listen at " + self.host + ":" +
158 str(self.port))
159 self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
160 self.listen_socket.setsockopt(socket.SOL_SOCKET,
161 socket.SO_REUSEADDR, 1)
162 self.listen_socket.bind((self.host, self.port))
163 self.listen_socket.listen(LISTEN_QUEUE_SIZE)
164
Dan Talaycof8de5182012-04-12 22:38:41 -0700165 def filter_packet(self, rawmsg, hdr):
166 """
167 Check if packet should be filtered
168
169 Currently filters packet in messages
170 @return Boolean, True if packet should be dropped
171 """
Rich Lane1622bbb2013-03-11 17:11:53 -0700172 # XXX didn't actually check for packet-in...
173 return False
Dan Talaycof8de5182012-04-12 22:38:41 -0700174 # Add check for packet in and rate limit
175 if self.filter_packet_in:
Rich Lanec4f071b2012-07-11 17:25:57 -0700176 # If we were dropping packets, report number dropped
177 # TODO dont drop expected packet ins
178 if self.pkt_in_run > self.pkt_in_filter_limit:
179 self.logger.debug("Dropped %d packet ins (%d total)"
180 % ((self.pkt_in_run -
181 self.pkt_in_filter_limit),
182 self.pkt_in_dropped))
183 self.pkt_in_run = 0
Dan Talaycof8de5182012-04-12 22:38:41 -0700184
185 return False
186
Dan Talaycod12b6612010-03-07 22:00:46 -0800187 def _pkt_handle(self, pkt):
188 """
189 Check for all packet handling conditions
190
191 Parse and verify message
192 Check if XID matches something waiting
193 Check if message is being expected for a poll operation
194 Check if keep alive is on and message is an echo request
195 Check if any registered handler wants the packet
196 Enqueue if none of those conditions is met
197
198 an echo request in case keep_alive is true, followed by
199 registered message handlers.
Glen Gibb6d467062010-07-08 16:15:08 -0700200 @param pkt The raw packet (string) which may contain multiple OF msgs
Dan Talaycod12b6612010-03-07 22:00:46 -0800201 """
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800202
203 # snag any left over data from last read()
204 pkt = self.buffered_input + pkt
205 self.buffered_input = ""
206
Glen Gibb6d467062010-07-08 16:15:08 -0700207 # Process each of the OF msgs inside the pkt
208 offset = 0
209 while offset < len(pkt):
Rich Lane1622bbb2013-03-11 17:11:53 -0700210 if offset + 8 > len(pkt):
211 break
212
Glen Gibb6d467062010-07-08 16:15:08 -0700213 # Parse the header to get type
Rich Lane1622bbb2013-03-11 17:11:53 -0700214 hdr_version, hdr_type, hdr_length, hdr_xid = ofp.message.parse_header(pkt[offset:])
Dan Talaycod12b6612010-03-07 22:00:46 -0800215
Glen Gibb6d467062010-07-08 16:15:08 -0700216 # Extract the raw message bytes
Rich Lane1622bbb2013-03-11 17:11:53 -0700217 if (offset + hdr_length) > len(pkt):
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800218 break
Rich Lane1622bbb2013-03-11 17:11:53 -0700219 rawmsg = pkt[offset : offset + hdr_length]
220 offset += hdr_length
Dan Talaycof8de5182012-04-12 22:38:41 -0700221
Rich Lane1622bbb2013-03-11 17:11:53 -0700222 #if self.filter_packet(rawmsg, hdr):
223 # continue
Dan Talaycof8de5182012-04-12 22:38:41 -0700224
Rich Lane1879dc72013-03-11 22:08:51 -0700225 self.logger.debug("Msg in: version %d type %s (%d) len %d xid %d",
226 hdr_version,
227 ofp.ofp_type_map.get(hdr_type, "unknown"), hdr_type,
228 hdr_length, hdr_version)
Rich Lane1622bbb2013-03-11 17:11:53 -0700229 if hdr_version < ofp.OFP_VERSION:
Rich Lanec44b6242013-01-10 12:23:54 -0800230 self.logger.error("Switch only supports up to OpenFlow version %d (OFTest version is %d)",
Rich Lane1622bbb2013-03-11 17:11:53 -0700231 hdr_version, ofp.OFP_VERSION)
Rich Lanec44b6242013-01-10 12:23:54 -0800232 print "Switch only supports up to OpenFlow version %d (OFTest version is %d)" % \
Rich Lane1622bbb2013-03-11 17:11:53 -0700233 (hdr_version, ofp.OFP_VERSION)
Ken Chiangadc950f2012-10-05 13:50:03 -0700234 self.disconnect()
Dan Talaycof8de5182012-04-12 22:38:41 -0700235 return
Dan Talaycod12b6612010-03-07 22:00:46 -0800236
Rich Lanef6883512013-03-11 17:00:09 -0700237 msg = ofp.message.parse_message(rawmsg)
Glen Gibb6d467062010-07-08 16:15:08 -0700238 if not msg:
239 self.parse_errors += 1
240 self.logger.warn("Could not parse message")
241 continue
242
Rich Lanec4f071b2012-07-11 17:25:57 -0700243 with self.sync:
244 # Check if transaction is waiting
245 with self.xid_cv:
Rich Lane1622bbb2013-03-11 17:11:53 -0700246 if self.xid and hdr_xid == self.xid:
247 self.logger.debug("Matched expected XID " + str(hdr_xid))
Rich Lanec4f071b2012-07-11 17:25:57 -0700248 self.xid_response = (msg, rawmsg)
249 self.xid = None
250 self.xid_cv.notify()
251 continue
Glen Gibb6d467062010-07-08 16:15:08 -0700252
Rich Lanec4f071b2012-07-11 17:25:57 -0700253 # Check if keep alive is set; if so, respond to echo requests
254 if self.keep_alive:
Rich Lane1622bbb2013-03-11 17:11:53 -0700255 if hdr_type == ofp.OFPT_ECHO_REQUEST:
Rich Lanec4f071b2012-07-11 17:25:57 -0700256 self.logger.debug("Responding to echo request")
Rich Lane78ef8b92013-01-10 12:19:23 -0800257 rep = ofp.message.echo_reply()
Rich Lane1622bbb2013-03-11 17:11:53 -0700258 rep.xid = hdr_xid
Rich Lanec4f071b2012-07-11 17:25:57 -0700259 # Ignoring additional data
Rich Lane8fbfd662013-03-11 15:30:44 -0700260 self.message_send(rep.pack())
Rich Lanec4f071b2012-07-11 17:25:57 -0700261 continue
Glen Gibb6d467062010-07-08 16:15:08 -0700262
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700263 # Generalize to counters for all packet types?
264 if msg.type == ofp.OFPT_PACKET_IN:
265 self.packet_in_count += 1
266
Rich Lane5d63b9c2013-01-11 14:12:37 -0800267 # Log error messages
Rich Lane1622bbb2013-03-11 17:11:53 -0700268 if hdr_type == ofp.OFPT_ERROR:
Rich Laneb73808c2013-03-11 15:22:23 -0700269 if msg.err_type in ofp.ofp_error_type_map:
270 type_str = ofp.ofp_error_type_map[msg.err_type]
271 if msg.err_type == ofp.OFPET_HELLO_FAILED:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800272 code_map = ofp.ofp_hello_failed_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700273 elif msg.err_type == ofp.OFPET_BAD_REQUEST:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800274 code_map = ofp.ofp_bad_request_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700275 elif msg.err_type == ofp.OFPET_BAD_ACTION:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800276 code_map = ofp.ofp_bad_action_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700277 elif msg.err_type == ofp.OFPET_FLOW_MOD_FAILED:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800278 code_map = ofp.ofp_flow_mod_failed_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700279 elif msg.err_type == ofp.OFPET_PORT_MOD_FAILED:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800280 code_map = ofp.ofp_port_mod_failed_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700281 elif msg.err_type == ofp.OFPET_QUEUE_OP_FAILED:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800282 code_map = ofp.ofp_queue_op_failed_code_map
283 else:
284 code_map = None
285
286 if code_map and msg.code in code_map:
287 code_str = code_map[msg.code]
288 else:
289 code_str = "unknown"
290 else:
291 type_str = "unknown"
Rich Lane1879dc72013-03-11 22:08:51 -0700292 code_str = "unknown"
Rich Lane5d63b9c2013-01-11 14:12:37 -0800293 self.logger.warn("Received error message: xid=%d type=%s (%d) code=%s (%d)",
Rich Lane1622bbb2013-03-11 17:11:53 -0700294 hdr_xid, type_str, msg.err_type, code_str, msg.code)
Rich Lane5d63b9c2013-01-11 14:12:37 -0800295
Rich Lanec4f071b2012-07-11 17:25:57 -0700296 # Now check for message handlers; preference is given to
297 # handlers for a specific packet
298 handled = False
Rich Lane1622bbb2013-03-11 17:11:53 -0700299 if hdr_type in self.handlers.keys():
300 handled = self.handlers[hdr_type](self, msg, rawmsg)
Rich Lanec4f071b2012-07-11 17:25:57 -0700301 if not handled and ("all" in self.handlers.keys()):
302 handled = self.handlers["all"](self, msg, rawmsg)
Glen Gibb6d467062010-07-08 16:15:08 -0700303
Rich Lanec4f071b2012-07-11 17:25:57 -0700304 if not handled: # Not handled, enqueue
Rich Lane1879dc72013-03-11 22:08:51 -0700305 self.logger.debug("Enqueuing pkt type %s (%d)",
306 ofp.ofp_type_map.get(hdr_type, "unknown"),
307 hdr_type)
Rich Lanec4f071b2012-07-11 17:25:57 -0700308 with self.packets_cv:
309 if len(self.packets) >= self.max_pkts:
310 self.packets.pop(0)
311 self.packets_expired += 1
312 self.packets.append((msg, rawmsg))
313 self.packets_cv.notify_all()
314 self.packets_total += 1
315 else:
316 self.packets_handled += 1
317 self.logger.debug("Message handled by callback")
Glen Gibb6d467062010-07-08 16:15:08 -0700318
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800319 # end of 'while offset < len(pkt)'
320 # note that if offset = len(pkt), this is
321 # appends a harmless empty string
322 self.buffered_input += pkt[offset:]
Dan Talaycod12b6612010-03-07 22:00:46 -0800323
Dan Talayco710438c2010-02-18 15:16:07 -0800324 def _socket_ready_handle(self, s):
325 """
326 Handle an input-ready socket
Dan Talaycof8de5182012-04-12 22:38:41 -0700327
Dan Talayco710438c2010-02-18 15:16:07 -0800328 @param s The socket object that is ready
Dan Talaycof8de5182012-04-12 22:38:41 -0700329 @returns 0 on success, -1 on error
Dan Talayco710438c2010-02-18 15:16:07 -0800330 """
331
Dan Talayco69ca4d62012-11-15 11:50:22 -0800332 if self.passive and s and s == self.listen_socket:
Dan Talayco710438c2010-02-18 15:16:07 -0800333 if self.switch_socket:
Rich Lanee1da7ea2012-07-26 15:58:45 -0700334 self.logger.warning("Ignoring incoming connection; already connected to switch")
Rich Laneb4f8ecb2012-09-25 09:36:26 -0700335 (sock, addr) = self.listen_socket.accept()
336 sock.close()
Rich Lanee1da7ea2012-07-26 15:58:45 -0700337 return 0
Dan Talayco710438c2010-02-18 15:16:07 -0800338
Ken Chiange875baf2012-10-09 15:24:40 -0700339 try:
340 (sock, addr) = self.listen_socket.accept()
341 except:
342 self.logger.warning("Error on listen socket accept")
343 return -1
Ken Chiang77173992012-10-30 15:44:39 -0700344 self.logger.info(self.host+":"+str(self.port)+": Incoming connection from "+str(addr))
Rich Lanee1da7ea2012-07-26 15:58:45 -0700345
Rich Laneee3586c2012-07-11 17:26:02 -0700346 with self.connect_cv:
Rich Lanee1da7ea2012-07-26 15:58:45 -0700347 (self.switch_socket, self.switch_addr) = (sock, addr)
Rich Lane82ef1832012-12-22 17:04:35 -0800348 self.switch_socket.setsockopt(socket.IPPROTO_TCP,
349 socket.TCP_NODELAY, True)
Rich Lane1a8d5aa2012-10-08 15:40:03 -0700350 if self.initial_hello:
Rich Lane78ef8b92013-01-10 12:19:23 -0800351 self.message_send(ofp.message.hello())
Rich Lanee1da7ea2012-07-26 15:58:45 -0700352 self.connect_cv.notify() # Notify anyone waiting
Rich Laned929b8d2013-04-15 15:59:14 -0700353
354 # Prevent further connections
355 self.listen_socket.close()
356 self.listen_socket = None
Dan Talaycof8de5182012-04-12 22:38:41 -0700357 elif s and s == self.switch_socket:
358 for idx in range(3): # debug: try a couple of times
359 try:
360 pkt = self.switch_socket.recv(self.rcv_size)
361 except:
362 self.logger.warning("Error on switch read")
363 return -1
364
365 if not self.active:
366 return 0
367
368 if len(pkt) == 0:
369 self.logger.warning("Zero-length switch read, %d" % idx)
370 else:
371 break
Dan Talayco710438c2010-02-18 15:16:07 -0800372
Dan Talaycof8de5182012-04-12 22:38:41 -0700373 if len(pkt) == 0: # Still no packet
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700374 self.logger.warning("Zero-length switch read; closing cxn")
Dan Talaycof8de5182012-04-12 22:38:41 -0700375 self.logger.info(str(self))
376 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800377
Dan Talaycod12b6612010-03-07 22:00:46 -0800378 self._pkt_handle(pkt)
Rich Lane4dfd5e12012-12-22 19:48:01 -0800379 elif s and s == self.waker:
380 self.waker.wait()
Dan Talayco710438c2010-02-18 15:16:07 -0800381 else:
Dan Talayco48370102010-03-03 15:17:33 -0800382 self.logger.error("Unknown socket ready: " + str(s))
Dan Talaycof8de5182012-04-12 22:38:41 -0700383 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800384
Dan Talaycof8de5182012-04-12 22:38:41 -0700385 return 0
Dan Talayco710438c2010-02-18 15:16:07 -0800386
Dan Talayco69ca4d62012-11-15 11:50:22 -0800387 def active_connect(self):
388 """
389 Actively connect to a switch IP addr
390 """
391 try:
392 self.logger.info("Trying active connection to %s" % self.switch)
393 soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
394 soc.connect((self.switch, self.port))
395 self.logger.info("Connected to " + self.switch + " on " +
396 str(self.port))
Rich Lane82ef1832012-12-22 17:04:35 -0800397 soc.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800398 self.switch_addr = (self.switch, self.port)
399 return soc
400 except (StandardError, socket.error), e:
401 self.logger.error("Could not connect to %s at %d:: %s" %
402 (self.switch, self.port, str(e)))
403 return None
404
Rich Lane32797542012-12-22 17:46:05 -0800405 def wakeup(self):
406 """
407 Wake up the event loop, presumably from another thread.
408 """
Rich Lane4dfd5e12012-12-22 19:48:01 -0800409 self.waker.notify()
Rich Lane32797542012-12-22 17:46:05 -0800410
411 def sockets(self):
412 """
413 Return list of sockets to select on.
414 """
Rich Lane4dfd5e12012-12-22 19:48:01 -0800415 socs = [self.listen_socket, self.switch_socket, self.waker]
Rich Lane32797542012-12-22 17:46:05 -0800416 return [x for x in socs if x]
417
Dan Talayco1b3f6902010-02-15 14:14:19 -0800418 def run(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800419 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800420 Activity function for class
Dan Talayco21c75c72010-02-12 22:59:24 -0800421
Dan Talayco1b3f6902010-02-15 14:14:19 -0800422 Assumes connection to switch already exists. Listens on
423 switch_socket for messages until an error (or zero len pkt)
424 occurs.
Dan Talayco21c75c72010-02-12 22:59:24 -0800425
Dan Talayco1b3f6902010-02-15 14:14:19 -0800426 When there is a message on the socket, check for handlers; queue the
427 packet if no one handles the packet.
428
429 See note for controller describing the limitation of a single
430 connection for now.
431 """
432
Rich Lane207502e2012-12-31 14:29:12 -0800433 self.dbg_state = "running"
Ken Chiangadc950f2012-10-05 13:50:03 -0700434
Dan Talayco710438c2010-02-18 15:16:07 -0800435 while self.active:
Dan Talayco710438c2010-02-18 15:16:07 -0800436 try:
437 sel_in, sel_out, sel_err = \
Rich Lane32797542012-12-22 17:46:05 -0800438 select.select(self.sockets(), [], self.sockets(), 1)
Dan Talayco710438c2010-02-18 15:16:07 -0800439 except:
440 print sys.exc_info()
Ken Chiangadc950f2012-10-05 13:50:03 -0700441 self.logger.error("Select error, disconnecting")
442 self.disconnect()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800443
Dan Talayco710438c2010-02-18 15:16:07 -0800444 for s in sel_err:
Ken Chiangadc950f2012-10-05 13:50:03 -0700445 self.logger.error("Got socket error on: " + str(s) + ", disconnecting")
446 self.disconnect()
Dan Talaycof8de5182012-04-12 22:38:41 -0700447
448 for s in sel_in:
449 if self._socket_ready_handle(s) == -1:
Ken Chiangadc950f2012-10-05 13:50:03 -0700450 self.disconnect()
Dan Talayco710438c2010-02-18 15:16:07 -0800451
Dan Talayco710438c2010-02-18 15:16:07 -0800452 # End of main loop
453 self.dbg_state = "closing"
Dan Talayco48370102010-03-03 15:17:33 -0800454 self.logger.info("Exiting controller thread")
Dan Talayco710438c2010-02-18 15:16:07 -0800455 self.shutdown()
456
Rich Lane8806bc42012-07-26 19:18:37 -0700457 def connect(self, timeout=-1):
Dan Talayco710438c2010-02-18 15:16:07 -0800458 """
459 Connect to the switch
460
Rich Lane8806bc42012-07-26 19:18:37 -0700461 @param timeout Block for up to timeout seconds. Pass -1 for the default.
Dan Talayco710438c2010-02-18 15:16:07 -0800462 @return Boolean, True if connected
463 """
464
Dan Talayco69ca4d62012-11-15 11:50:22 -0800465 if not self.passive: # Do active connection now
466 self.logger.info("Attempting to connect to %s on port %s" %
467 (self.switch, str(self.port)))
468 soc = self.active_connect()
469 if soc:
470 self.logger.info("Connected to %s", self.switch)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800471 self.dbg_state = "running"
472 self.switch_socket = soc
Rich Lane32797542012-12-22 17:46:05 -0800473 self.wakeup()
Dan Talayco69ca4d62012-11-15 11:50:22 -0800474 with self.connect_cv:
475 if self.initial_hello:
Rich Lane720eaf22013-08-09 18:00:45 -0700476 self.message_send(ofp.message.hello())
Dan Talayco69ca4d62012-11-15 11:50:22 -0800477 self.connect_cv.notify() # Notify anyone waiting
478 else:
479 self.logger.error("Could not actively connect to switch %s",
480 self.switch)
481 self.active = False
482 else:
483 with self.connect_cv:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800484 ofutils.timed_wait(self.connect_cv, lambda: self.switch_socket,
485 timeout=timeout)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800486
Dan Talayco710438c2010-02-18 15:16:07 -0800487 return self.switch_socket is not None
488
Ken Chiangadc950f2012-10-05 13:50:03 -0700489 def disconnect(self, timeout=-1):
490 """
491 If connected to a switch, disconnect.
492 """
493 if self.switch_socket:
Ken Chiangadc950f2012-10-05 13:50:03 -0700494 self.switch_socket.close()
495 self.switch_socket = None
496 self.switch_addr = None
Ken Chiang74be4722012-12-21 13:07:03 -0800497 with self.packets_cv:
498 self.packets = []
Ken Chiange875baf2012-10-09 15:24:40 -0700499 with self.connect_cv:
500 self.connect_cv.notifyAll()
Ken Chiangadc950f2012-10-05 13:50:03 -0700501
502 def wait_disconnected(self, timeout=-1):
503 """
504 @param timeout Block for up to timeout seconds. Pass -1 for the default.
505 @return Boolean, True if disconnected
506 """
507
Ken Chiange875baf2012-10-09 15:24:40 -0700508 with self.connect_cv:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800509 ofutils.timed_wait(self.connect_cv,
510 lambda: True if not self.switch_socket else None,
511 timeout=timeout)
Ken Chiangadc950f2012-10-05 13:50:03 -0700512 return self.switch_socket is None
513
Dan Talayco710438c2010-02-18 15:16:07 -0800514 def kill(self):
515 """
516 Force the controller thread to quit
Dan Talayco710438c2010-02-18 15:16:07 -0800517 """
518 self.active = False
Rich Lane32797542012-12-22 17:46:05 -0800519 self.wakeup()
Rich Lane376bb402012-12-31 15:20:16 -0800520 self.join()
Dan Talayco21c75c72010-02-12 22:59:24 -0800521
Dan Talayco1b3f6902010-02-15 14:14:19 -0800522 def shutdown(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800523 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800524 Shutdown the controller closing all sockets
Dan Talayco21c75c72010-02-12 22:59:24 -0800525
Dan Talayco1b3f6902010-02-15 14:14:19 -0800526 @todo Might want to synchronize shutdown with self.sync...
527 """
Dan Talaycof8de5182012-04-12 22:38:41 -0700528
Dan Talayco710438c2010-02-18 15:16:07 -0800529 self.active = False
530 try:
531 self.switch_socket.shutdown(socket.SHUT_RDWR)
532 except:
Dan Talayco48370102010-03-03 15:17:33 -0800533 self.logger.info("Ignoring switch soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800534 self.switch_socket = None
Dan Talayco1b3f6902010-02-15 14:14:19 -0800535
Dan Talayco710438c2010-02-18 15:16:07 -0800536 try:
537 self.listen_socket.shutdown(socket.SHUT_RDWR)
538 except:
Dan Talayco48370102010-03-03 15:17:33 -0800539 self.logger.info("Ignoring listen soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800540 self.listen_socket = None
Dan Talaycof8de5182012-04-12 22:38:41 -0700541
Rich Laneee3586c2012-07-11 17:26:02 -0700542 # Wakeup condition variables on which controller may be wait
543 with self.xid_cv:
544 self.xid_cv.notifyAll()
Dan Talaycof8de5182012-04-12 22:38:41 -0700545
Rich Laneee3586c2012-07-11 17:26:02 -0700546 with self.connect_cv:
547 self.connect_cv.notifyAll()
Dan Talaycof8de5182012-04-12 22:38:41 -0700548
Rich Lane32797542012-12-22 17:46:05 -0800549 self.wakeup()
Dan Talayco710438c2010-02-18 15:16:07 -0800550 self.dbg_state = "down"
551
Dan Talayco34089522010-02-07 23:07:41 -0800552 def register(self, msg_type, handler):
553 """
554 Register a callback to receive a specific message type.
555
556 Only one handler may be registered for a given message type.
Dan Talaycod12b6612010-03-07 22:00:46 -0800557
558 WARNING: A lock is held during the handler call back, so
559 the handler should not make any blocking calls
560
Dan Talayco34089522010-02-07 23:07:41 -0800561 @param msg_type The type of message to receive. May be DEFAULT
Dan Talayco21c75c72010-02-12 22:59:24 -0800562 for all non-handled packets. The special type, the string "all"
563 will send all packets to the handler.
Dan Talayco34089522010-02-07 23:07:41 -0800564 @param handler The function to call when a message of the given
565 type is received.
566 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800567 # Should check type is valid
568 if not handler and msg_type in self.handlers.keys():
569 del self.handlers[msg_type]
570 return
571 self.handlers[msg_type] = handler
Dan Talayco34089522010-02-07 23:07:41 -0800572
sumithdev095542cf52013-07-12 14:56:28 -0400573 def poll(self, exp_msg=None, timeout=-1):
Dan Talayco34089522010-02-07 23:07:41 -0800574 """
575 Wait for the next OF message received from the switch.
576
577 @param exp_msg If set, return only when this type of message
Dan Talayco48370102010-03-03 15:17:33 -0800578 is received (unless timeout occurs).
Rich Laneb64ce3d2012-07-26 15:37:57 -0700579
580 @param timeout Maximum number of seconds to wait for the message.
581 Pass -1 for the default timeout.
Dan Talayco34089522010-02-07 23:07:41 -0800582
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800583 @retval A pair (msg, pkt) where msg is a message object and pkt
584 the string representing the packet as received from the socket.
585 This allows additional parsing by the receiver if necessary.
586
Dan Talayco34089522010-02-07 23:07:41 -0800587 The data members in the message are in host endian order.
Dan Talayco48370102010-03-03 15:17:33 -0800588 If an error occurs, (None, None) is returned
Dan Talayco34089522010-02-07 23:07:41 -0800589 """
Dan Talayco34089522010-02-07 23:07:41 -0800590
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700591 exp_msg_str = "unspecified"
sumithdev095542cf52013-07-12 14:56:28 -0400592 if exp_msg is not None:
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700593 exp_msg_str = ofp.ofp_type_map.get(exp_msg, "unknown (%d)" %
594 exp_msg)
Rich Lane1879dc72013-03-11 22:08:51 -0700595
sumithdev095542cf52013-07-12 14:56:28 -0400596 if exp_msg is not None:
Rich Lane1879dc72013-03-11 22:08:51 -0700597 self.logger.debug("Poll for %s", exp_msg_str)
Ed Swierk9e55e282012-08-22 06:57:28 -0700598 else:
599 self.logger.debug("Poll for any OF message")
Rich Laneb64ce3d2012-07-26 15:37:57 -0700600
601 # Take the packet from the queue
Rich Lanec4f071b2012-07-11 17:25:57 -0700602 def grab():
603 if len(self.packets) > 0:
sumithdev095542cf52013-07-12 14:56:28 -0400604 if exp_msg is None:
Rich Lanec4f071b2012-07-11 17:25:57 -0700605 self.logger.debug("Looking for any packet")
606 (msg, pkt) = self.packets.pop(0)
607 return (msg, pkt)
608 else:
Rich Lane1879dc72013-03-11 22:08:51 -0700609 self.logger.debug("Looking for %s", exp_msg_str)
Rich Lanec4f071b2012-07-11 17:25:57 -0700610 for i in range(len(self.packets)):
611 msg = self.packets[i][0]
Rich Lane7094ff12013-05-07 14:57:53 -0700612 msg_str = ofp.ofp_type_map.get(msg.type, "unknown (%d)" % msg.type)
613 self.logger.debug("Checking packets[%d] %s) against %s", i, msg_str, exp_msg_str)
Rich Laneb73808c2013-03-11 15:22:23 -0700614 if msg.type == exp_msg:
Rich Lanec4f071b2012-07-11 17:25:57 -0700615 (msg, pkt) = self.packets.pop(i)
616 return (msg, pkt)
617 # Not found
618 self.logger.debug("Packet not in queue")
Rich Laneb64ce3d2012-07-26 15:37:57 -0700619 return None
Dan Talayco21c75c72010-02-12 22:59:24 -0800620
Rich Lanec4f071b2012-07-11 17:25:57 -0700621 with self.packets_cv:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800622 ret = ofutils.timed_wait(self.packets_cv, grab, timeout=timeout)
Rich Lanec4f071b2012-07-11 17:25:57 -0700623
Rich Laneb64ce3d2012-07-26 15:37:57 -0700624 if ret != None:
625 (msg, pkt) = ret
626 self.logger.debug("Got message %s" % str(msg))
627 return (msg, pkt)
628 else:
629 return (None, None)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800630
Rich Lane8fbfd662013-03-11 15:30:44 -0700631 def transact(self, msg, timeout=-1):
Dan Talayco34089522010-02-07 23:07:41 -0800632 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800633 Run a message transaction with the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800634
635 Send the message in msg and wait for a reply with a matching
Dan Talayco21c75c72010-02-12 22:59:24 -0800636 transaction id. Transactions have the highest priority in
637 received message handling.
Dan Talaycoe37999f2010-02-09 15:27:12 -0800638
Dan Talayco21c75c72010-02-12 22:59:24 -0800639 @param msg The message object to send; must not be a string
Rich Lanee1da7ea2012-07-26 15:58:45 -0700640 @param timeout The timeout in seconds; if -1 use default.
Dan Talayco34089522010-02-07 23:07:41 -0800641 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800642
Rich Lane8fbfd662013-03-11 15:30:44 -0700643 if msg.xid == None:
Rich Laneb73808c2013-03-11 15:22:23 -0700644 msg.xid = ofutils.gen_xid()
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800645
Rich Laneb73808c2013-03-11 15:22:23 -0700646 self.logger.debug("Running transaction %d" % msg.xid)
Dan Talayco21c75c72010-02-12 22:59:24 -0800647
Rich Lane9aca1992012-07-11 17:26:31 -0700648 with self.xid_cv:
649 if self.xid:
650 self.logger.error("Can only run one transaction at a time")
651 return (None, None)
Dan Talaycof8de5182012-04-12 22:38:41 -0700652
Rich Laneb73808c2013-03-11 15:22:23 -0700653 self.xid = msg.xid
Dan Talaycod12b6612010-03-07 22:00:46 -0800654 self.xid_response = None
Rich Lane5c3151c2013-01-03 17:15:41 -0800655 self.message_send(msg.pack())
Rich Lane9aca1992012-07-11 17:26:31 -0700656
Rich Laneb73808c2013-03-11 15:22:23 -0700657 self.logger.debug("Waiting for transaction %d" % msg.xid)
Rich Lanecd97d3d2013-01-07 18:50:06 -0800658 ofutils.timed_wait(self.xid_cv, lambda: self.xid_response, timeout=timeout)
Rich Lane9aca1992012-07-11 17:26:31 -0700659
660 if self.xid_response:
661 (resp, pkt) = self.xid_response
662 self.xid_response = None
663 else:
664 (resp, pkt) = (None, None)
665
Dan Talayco09c2c592010-05-13 14:21:52 -0700666 if resp is None:
667 self.logger.warning("No response for xid " + str(self.xid))
668 return (resp, pkt)
Dan Talayco34089522010-02-07 23:07:41 -0800669
Rich Lane8fbfd662013-03-11 15:30:44 -0700670 def message_send(self, msg):
Dan Talayco34089522010-02-07 23:07:41 -0800671 """
672 Send the message to the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800673
Dan Talayco11c26e72010-03-07 22:03:57 -0800674 @param msg A string or OpenFlow message object to be forwarded to
675 the switch.
Dan Talayco21c75c72010-02-12 22:59:24 -0800676 """
677
Dan Talayco1b3f6902010-02-15 14:14:19 -0800678 if not self.switch_socket:
679 # Sending a string indicates the message is ready to go
Ed Swierk9e55e282012-08-22 06:57:28 -0700680 raise Exception("no socket")
Dan Talayco710438c2010-02-18 15:16:07 -0800681 #@todo If not string, try to pack
Dan Talayco21c75c72010-02-12 22:59:24 -0800682 if type(msg) != type(""):
Rich Lane8fbfd662013-03-11 15:30:44 -0700683 if msg.xid == None:
Rich Laneb73808c2013-03-11 15:22:23 -0700684 msg.xid = ofutils.gen_xid()
Ed Swierk9e55e282012-08-22 06:57:28 -0700685 outpkt = msg.pack()
Dan Talayco710438c2010-02-18 15:16:07 -0800686 else:
687 outpkt = msg
Dan Talayco21c75c72010-02-12 22:59:24 -0800688
Rich Lanef18980d2012-12-31 17:11:41 -0800689 msg_version, msg_type, msg_len, msg_xid = struct.unpack_from("!BBHL", outpkt)
Rich Lane5d63b9c2013-01-11 14:12:37 -0800690 self.logger.debug("Msg out: buf len %d. hdr.type %s. hdr.len %d hdr.version %d hdr.xid %d",
Rich Lanef18980d2012-12-31 17:11:41 -0800691 len(outpkt),
Rich Laned7b0ffa2013-03-08 15:53:42 -0800692 ofp.ofp_type_map.get(msg_type, "unknown (%d)" % msg_type),
Rich Lanecd97d3d2013-01-07 18:50:06 -0800693 msg_len,
Rich Lane5d63b9c2013-01-11 14:12:37 -0800694 msg_version,
695 msg_xid)
Ed Swierk9e55e282012-08-22 06:57:28 -0700696 if self.switch_socket.sendall(outpkt) is not None:
Rich Lane5c3151c2013-01-03 17:15:41 -0800697 raise AssertionError("failed to send message to switch")
Dan Talayco710438c2010-02-18 15:16:07 -0800698
Rich Lane5c3151c2013-01-03 17:15:41 -0800699 return 0 # for backwards compatibility
Dan Talayco21c75c72010-02-12 22:59:24 -0800700
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700701 def clear_queue(self):
702 """
703 Clear the input queue and report the number of messages
704 that were in it
705 """
Dan Talayco7071cf12013-04-16 11:02:13 -0700706 enqueued_pkt_count = len(self.packets)
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700707 with self.packets_cv:
708 self.packets = []
Dan Talayco7071cf12013-04-16 11:02:13 -0700709 return enqueued_pkt_count
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700710
Dan Talayco21c75c72010-02-12 22:59:24 -0800711 def __str__(self):
712 string = "Controller:\n"
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800713 string += " state " + self.dbg_state + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800714 string += " switch_addr " + str(self.switch_addr) + "\n"
715 string += " pending pkts " + str(len(self.packets)) + "\n"
716 string += " total pkts " + str(self.packets_total) + "\n"
717 string += " expired pkts " + str(self.packets_expired) + "\n"
718 string += " handled pkts " + str(self.packets_handled) + "\n"
Dan Talaycoe226eb12010-02-18 23:06:30 -0800719 string += " poll discards " + str(self.poll_discards) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800720 string += " parse errors " + str(self.parse_errors) + "\n"
721 string += " sock errrors " + str(self.socket_errors) + "\n"
722 string += " max pkts " + str(self.max_pkts) + "\n"
Dan Talayco69ca4d62012-11-15 11:50:22 -0800723 string += " target switch " + str(self.switch) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800724 string += " host " + str(self.host) + "\n"
725 string += " port " + str(self.port) + "\n"
726 string += " keep_alive " + str(self.keep_alive) + "\n"
Dan Talaycof8de5182012-04-12 22:38:41 -0700727 string += " pkt_in_run " + str(self.pkt_in_run) + "\n"
728 string += " pkt_in_dropped " + str(self.pkt_in_dropped) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800729 return string
730
731 def show(self):
732 print str(self)
733
734def sample_handler(controller, msg, pkt):
735 """
736 Sample message handler
737
738 This is the prototype for functions registered with the controller
739 class for packet reception
740
741 @param controller The controller calling the handler
742 @param msg The parsed message object
743 @param pkt The raw packet that was received on the socket. This is
744 in case the packet contains extra unparsed data.
745 @returns Boolean value indicating if the packet was handled. If
746 not handled, the packet is placed in the queue for pollers to received
747 """
748 pass