blob: 1191c37f9fe6c943c3ded09d70efe4ec0dcd6b0d [file] [log] [blame]
Dan Talayco34089522010-02-07 23:07:41 -08001"""
2OpenFlow Test Framework
3
4Controller class
5
6Provide the interface to the control channel to the switch under test.
7
8Class inherits from thread so as to run in background allowing
9asynchronous callbacks (if needed, not required). Also supports
10polling.
11
12The controller thread maintains a queue. Incoming messages that
13are not handled by a callback function are placed in this queue for
14poll calls.
15
16Callbacks and polling support specifying the message type
17
18@todo Support transaction semantics via xid
Dan Talayco1b3f6902010-02-15 14:14:19 -080019@todo Support select and listen on an administrative socket (or
20use a timeout to support clean shutdown).
21
22Currently only one connection is accepted during the life of
23the controller. There seems
24to be no clean way to interrupt an accept call. Using select that also listens
25on an administrative socket and can shut down the socket might work.
26
Dan Talayco34089522010-02-07 23:07:41 -080027"""
28
Dan Talayco34089522010-02-07 23:07:41 -080029import os
30import socket
31import time
Rich Lanecd97d3d2013-01-07 18:50:06 -080032import struct
33import select
34import logging
Dan Talayco34089522010-02-07 23:07:41 -080035from threading import Thread
36from threading import Lock
Dan Talayco21c75c72010-02-12 22:59:24 -080037from threading import Condition
Rich Lane9fd05682013-01-10 15:30:38 -080038import ofp
Rich Lanecd97d3d2013-01-07 18:50:06 -080039import ofutils
Dan Talayco48370102010-03-03 15:17:33 -080040
Dan Talaycof8de5182012-04-12 22:38:41 -070041
42FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.'
43 for x in range(256)])
44
45def hex_dump_buffer(src, length=16):
46 """
47 Convert src to a hex dump string and return the string
48 @param src The source buffer
49 @param length The number of bytes shown in each line
50 @returns A string showing the hex dump
51 """
52 result = ["\n"]
53 for i in xrange(0, len(src), length):
54 chars = src[i:i+length]
55 hex = ' '.join(["%02x" % ord(x) for x in chars])
56 printable = ''.join(["%s" % ((ord(x) <= 127 and
57 FILTER[ord(x)]) or '.') for x in chars])
58 result.append("%04x %-*s %s\n" % (i, length*3, hex, printable))
59 return ''.join(result)
60
Dan Talayco48370102010-03-03 15:17:33 -080061##@todo Find a better home for these identifiers (controller)
Glen Gibb741b1182010-07-08 16:43:58 -070062RCV_SIZE_DEFAULT = 32768
Dan Talayco48370102010-03-03 15:17:33 -080063LISTEN_QUEUE_SIZE = 1
Dan Talayco34089522010-02-07 23:07:41 -080064
65class Controller(Thread):
66 """
67 Class abstracting the control interface to the switch.
68
69 For receiving messages, two mechanism will be implemented. First,
70 query the interface with poll. Second, register to have a
71 function called by message type. The callback is passed the
72 message type as well as the raw packet (or message object)
73
74 One of the main purposes of this object is to translate between network
75 and host byte order. 'Above' this object, things should be in host
76 byte order.
Dan Talayco21c75c72010-02-12 22:59:24 -080077
78 @todo Consider using SocketServer for listening socket
79 @todo Test transaction code
80
81 @var rcv_size The receive size to use for receive calls
82 @var max_pkts The max size of the receive queue
83 @var keep_alive If true, listen for echo requests and respond w/
84 echo replies
Dan Talayco710438c2010-02-18 15:16:07 -080085 @var initial_hello If true, will send a hello message immediately
86 upon connecting to the switch
Dan Talayco69ca4d62012-11-15 11:50:22 -080087 @var switch If not None, do an active connection to the switch
Dan Talayco21c75c72010-02-12 22:59:24 -080088 @var host The host to use for connect
89 @var port The port to connect on
90 @var packets_total Total number of packets received
91 @var packets_expired Number of packets popped from queue as queue full
92 @var packets_handled Number of packets handled by something
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080093 @var dbg_state Debug indication of state
Dan Talayco34089522010-02-07 23:07:41 -080094 """
95
Dan Talayco69ca4d62012-11-15 11:50:22 -080096 def __init__(self, switch=None, host='127.0.0.1', port=6633, max_pkts=1024):
Dan Talayco21c75c72010-02-12 22:59:24 -080097 Thread.__init__(self)
Dan Talayco1b3f6902010-02-15 14:14:19 -080098 # Socket related
Dan Talayco21c75c72010-02-12 22:59:24 -080099 self.rcv_size = RCV_SIZE_DEFAULT
Dan Talayco1b3f6902010-02-15 14:14:19 -0800100 self.listen_socket = None
101 self.switch_socket = None
102 self.switch_addr = None
Dan Talayco710438c2010-02-18 15:16:07 -0800103 self.connect_cv = Condition()
Dan Talaycoe226eb12010-02-18 23:06:30 -0800104 self.message_cv = Condition()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800105
Rich Lane4dfd5e12012-12-22 19:48:01 -0800106 # Used to wake up the event loop from another thread
Rich Lanecd97d3d2013-01-07 18:50:06 -0800107 self.waker = ofutils.EventDescriptor()
Rich Lane32797542012-12-22 17:46:05 -0800108
Dan Talayco1b3f6902010-02-15 14:14:19 -0800109 # Counters
Dan Talayco21c75c72010-02-12 22:59:24 -0800110 self.socket_errors = 0
111 self.parse_errors = 0
Dan Talayco21c75c72010-02-12 22:59:24 -0800112 self.packets_total = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -0800113 self.packets_expired = 0
114 self.packets_handled = 0
Dan Talaycoe226eb12010-02-18 23:06:30 -0800115 self.poll_discards = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -0800116
117 # State
Dan Talayco21c75c72010-02-12 22:59:24 -0800118 self.sync = Lock()
119 self.handlers = {}
120 self.keep_alive = False
Dan Talayco710438c2010-02-18 15:16:07 -0800121 self.active = True
122 self.initial_hello = True
Dan Talayco1b3f6902010-02-15 14:14:19 -0800123
Rich Lanec4f071b2012-07-11 17:25:57 -0700124 # OpenFlow message/packet queue
125 # Protected by the packets_cv lock / condition variable
126 self.packets = []
127 self.packets_cv = Condition()
128
Dan Talayco1b3f6902010-02-15 14:14:19 -0800129 # Settings
130 self.max_pkts = max_pkts
Dan Talayco69ca4d62012-11-15 11:50:22 -0800131 self.switch = switch
132 self.passive = not self.switch
Dan Talayco48370102010-03-03 15:17:33 -0800133 self.host = host
134 self.port = port
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800135 self.dbg_state = "init"
Dan Talayco48370102010-03-03 15:17:33 -0800136 self.logger = logging.getLogger("controller")
Dan Talaycof8de5182012-04-12 22:38:41 -0700137 self.filter_packet_in = False # Drop "excessive" packet ins
138 self.pkt_in_run = 0 # Count on run of packet ins
139 self.pkt_in_filter_limit = 50 # Count on run of packet ins
140 self.pkt_in_dropped = 0 # Total dropped packet ins
141 self.transact_to = 15 # Transact timeout default value; add to config
Dan Talayco1b3f6902010-02-15 14:14:19 -0800142
Dan Talaycoe226eb12010-02-18 23:06:30 -0800143 # Transaction and message type waiting variables
144 # xid_cv: Condition variable (semaphore) for packet waiters
Dan Talayco21c75c72010-02-12 22:59:24 -0800145 # xid: Transaction ID being waited on
146 # xid_response: Transaction response message
147 self.xid_cv = Condition()
148 self.xid = None
149 self.xid_response = None
150
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800151 self.buffered_input = ""
Dan Talaycoe226eb12010-02-18 23:06:30 -0800152
Rich Lane207502e2012-12-31 14:29:12 -0800153 # Create listen socket
154 if self.passive:
155 self.logger.info("Create/listen at " + self.host + ":" +
156 str(self.port))
157 self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
158 self.listen_socket.setsockopt(socket.SOL_SOCKET,
159 socket.SO_REUSEADDR, 1)
160 self.listen_socket.bind((self.host, self.port))
161 self.listen_socket.listen(LISTEN_QUEUE_SIZE)
162
Dan Talaycof8de5182012-04-12 22:38:41 -0700163 def filter_packet(self, rawmsg, hdr):
164 """
165 Check if packet should be filtered
166
167 Currently filters packet in messages
168 @return Boolean, True if packet should be dropped
169 """
Rich Lane1622bbb2013-03-11 17:11:53 -0700170 # XXX didn't actually check for packet-in...
171 return False
Dan Talaycof8de5182012-04-12 22:38:41 -0700172 # Add check for packet in and rate limit
173 if self.filter_packet_in:
Rich Lanec4f071b2012-07-11 17:25:57 -0700174 # If we were dropping packets, report number dropped
175 # TODO dont drop expected packet ins
176 if self.pkt_in_run > self.pkt_in_filter_limit:
177 self.logger.debug("Dropped %d packet ins (%d total)"
178 % ((self.pkt_in_run -
179 self.pkt_in_filter_limit),
180 self.pkt_in_dropped))
181 self.pkt_in_run = 0
Dan Talaycof8de5182012-04-12 22:38:41 -0700182
183 return False
184
Dan Talaycod12b6612010-03-07 22:00:46 -0800185 def _pkt_handle(self, pkt):
186 """
187 Check for all packet handling conditions
188
189 Parse and verify message
190 Check if XID matches something waiting
191 Check if message is being expected for a poll operation
192 Check if keep alive is on and message is an echo request
193 Check if any registered handler wants the packet
194 Enqueue if none of those conditions is met
195
196 an echo request in case keep_alive is true, followed by
197 registered message handlers.
Glen Gibb6d467062010-07-08 16:15:08 -0700198 @param pkt The raw packet (string) which may contain multiple OF msgs
Dan Talaycod12b6612010-03-07 22:00:46 -0800199 """
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800200
201 # snag any left over data from last read()
202 pkt = self.buffered_input + pkt
203 self.buffered_input = ""
204
Glen Gibb6d467062010-07-08 16:15:08 -0700205 # Process each of the OF msgs inside the pkt
206 offset = 0
207 while offset < len(pkt):
Rich Lane1622bbb2013-03-11 17:11:53 -0700208 if offset + 8 > len(pkt):
209 break
210
Glen Gibb6d467062010-07-08 16:15:08 -0700211 # Parse the header to get type
Rich Lane1622bbb2013-03-11 17:11:53 -0700212 hdr_version, hdr_type, hdr_length, hdr_xid = ofp.message.parse_header(pkt[offset:])
Dan Talaycod12b6612010-03-07 22:00:46 -0800213
Glen Gibb6d467062010-07-08 16:15:08 -0700214 # Extract the raw message bytes
Rich Lane1622bbb2013-03-11 17:11:53 -0700215 if (offset + hdr_length) > len(pkt):
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800216 break
Rich Lane1622bbb2013-03-11 17:11:53 -0700217 rawmsg = pkt[offset : offset + hdr_length]
218 offset += hdr_length
Dan Talaycof8de5182012-04-12 22:38:41 -0700219
Rich Lane1622bbb2013-03-11 17:11:53 -0700220 #if self.filter_packet(rawmsg, hdr):
221 # continue
Dan Talaycof8de5182012-04-12 22:38:41 -0700222
Rich Lane1879dc72013-03-11 22:08:51 -0700223 self.logger.debug("Msg in: version %d type %s (%d) len %d xid %d",
224 hdr_version,
225 ofp.ofp_type_map.get(hdr_type, "unknown"), hdr_type,
226 hdr_length, hdr_version)
Rich Lane1622bbb2013-03-11 17:11:53 -0700227 if hdr_version < ofp.OFP_VERSION:
Rich Lanec44b6242013-01-10 12:23:54 -0800228 self.logger.error("Switch only supports up to OpenFlow version %d (OFTest version is %d)",
Rich Lane1622bbb2013-03-11 17:11:53 -0700229 hdr_version, ofp.OFP_VERSION)
Rich Lanec44b6242013-01-10 12:23:54 -0800230 print "Switch only supports up to OpenFlow version %d (OFTest version is %d)" % \
Rich Lane1622bbb2013-03-11 17:11:53 -0700231 (hdr_version, ofp.OFP_VERSION)
Ken Chiangadc950f2012-10-05 13:50:03 -0700232 self.disconnect()
Dan Talaycof8de5182012-04-12 22:38:41 -0700233 return
Dan Talaycod12b6612010-03-07 22:00:46 -0800234
Rich Lanef6883512013-03-11 17:00:09 -0700235 msg = ofp.message.parse_message(rawmsg)
Glen Gibb6d467062010-07-08 16:15:08 -0700236 if not msg:
237 self.parse_errors += 1
238 self.logger.warn("Could not parse message")
239 continue
240
Rich Lanec4f071b2012-07-11 17:25:57 -0700241 with self.sync:
242 # Check if transaction is waiting
243 with self.xid_cv:
Rich Lane1622bbb2013-03-11 17:11:53 -0700244 if self.xid and hdr_xid == self.xid:
245 self.logger.debug("Matched expected XID " + str(hdr_xid))
Rich Lanec4f071b2012-07-11 17:25:57 -0700246 self.xid_response = (msg, rawmsg)
247 self.xid = None
248 self.xid_cv.notify()
249 continue
Glen Gibb6d467062010-07-08 16:15:08 -0700250
Rich Lanec4f071b2012-07-11 17:25:57 -0700251 # Check if keep alive is set; if so, respond to echo requests
252 if self.keep_alive:
Rich Lane1622bbb2013-03-11 17:11:53 -0700253 if hdr_type == ofp.OFPT_ECHO_REQUEST:
Rich Lanec4f071b2012-07-11 17:25:57 -0700254 self.logger.debug("Responding to echo request")
Rich Lane78ef8b92013-01-10 12:19:23 -0800255 rep = ofp.message.echo_reply()
Rich Lane1622bbb2013-03-11 17:11:53 -0700256 rep.xid = hdr_xid
Rich Lanec4f071b2012-07-11 17:25:57 -0700257 # Ignoring additional data
Rich Lane8fbfd662013-03-11 15:30:44 -0700258 self.message_send(rep.pack())
Rich Lanec4f071b2012-07-11 17:25:57 -0700259 continue
Glen Gibb6d467062010-07-08 16:15:08 -0700260
Rich Lane5d63b9c2013-01-11 14:12:37 -0800261 # Log error messages
Rich Lane1622bbb2013-03-11 17:11:53 -0700262 if hdr_type == ofp.OFPT_ERROR:
Rich Laneb73808c2013-03-11 15:22:23 -0700263 if msg.err_type in ofp.ofp_error_type_map:
264 type_str = ofp.ofp_error_type_map[msg.err_type]
265 if msg.err_type == ofp.OFPET_HELLO_FAILED:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800266 code_map = ofp.ofp_hello_failed_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700267 elif msg.err_type == ofp.OFPET_BAD_REQUEST:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800268 code_map = ofp.ofp_bad_request_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700269 elif msg.err_type == ofp.OFPET_BAD_ACTION:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800270 code_map = ofp.ofp_bad_action_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700271 elif msg.err_type == ofp.OFPET_FLOW_MOD_FAILED:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800272 code_map = ofp.ofp_flow_mod_failed_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700273 elif msg.err_type == ofp.OFPET_PORT_MOD_FAILED:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800274 code_map = ofp.ofp_port_mod_failed_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700275 elif msg.err_type == ofp.OFPET_QUEUE_OP_FAILED:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800276 code_map = ofp.ofp_queue_op_failed_code_map
277 else:
278 code_map = None
279
280 if code_map and msg.code in code_map:
281 code_str = code_map[msg.code]
282 else:
283 code_str = "unknown"
284 else:
285 type_str = "unknown"
Rich Lane1879dc72013-03-11 22:08:51 -0700286 code_str = "unknown"
Rich Lane5d63b9c2013-01-11 14:12:37 -0800287 self.logger.warn("Received error message: xid=%d type=%s (%d) code=%s (%d)",
Rich Lane1622bbb2013-03-11 17:11:53 -0700288 hdr_xid, type_str, msg.err_type, code_str, msg.code)
Rich Lane5d63b9c2013-01-11 14:12:37 -0800289
Rich Lanec4f071b2012-07-11 17:25:57 -0700290 # Now check for message handlers; preference is given to
291 # handlers for a specific packet
292 handled = False
Rich Lane1622bbb2013-03-11 17:11:53 -0700293 if hdr_type in self.handlers.keys():
294 handled = self.handlers[hdr_type](self, msg, rawmsg)
Rich Lanec4f071b2012-07-11 17:25:57 -0700295 if not handled and ("all" in self.handlers.keys()):
296 handled = self.handlers["all"](self, msg, rawmsg)
Glen Gibb6d467062010-07-08 16:15:08 -0700297
Rich Lanec4f071b2012-07-11 17:25:57 -0700298 if not handled: # Not handled, enqueue
Rich Lane1879dc72013-03-11 22:08:51 -0700299 self.logger.debug("Enqueuing pkt type %s (%d)",
300 ofp.ofp_type_map.get(hdr_type, "unknown"),
301 hdr_type)
Rich Lanec4f071b2012-07-11 17:25:57 -0700302 with self.packets_cv:
303 if len(self.packets) >= self.max_pkts:
304 self.packets.pop(0)
305 self.packets_expired += 1
306 self.packets.append((msg, rawmsg))
307 self.packets_cv.notify_all()
308 self.packets_total += 1
309 else:
310 self.packets_handled += 1
311 self.logger.debug("Message handled by callback")
Glen Gibb6d467062010-07-08 16:15:08 -0700312
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800313 # end of 'while offset < len(pkt)'
314 # note that if offset = len(pkt), this is
315 # appends a harmless empty string
316 self.buffered_input += pkt[offset:]
Dan Talaycod12b6612010-03-07 22:00:46 -0800317
Dan Talayco710438c2010-02-18 15:16:07 -0800318 def _socket_ready_handle(self, s):
319 """
320 Handle an input-ready socket
Dan Talaycof8de5182012-04-12 22:38:41 -0700321
Dan Talayco710438c2010-02-18 15:16:07 -0800322 @param s The socket object that is ready
Dan Talaycof8de5182012-04-12 22:38:41 -0700323 @returns 0 on success, -1 on error
Dan Talayco710438c2010-02-18 15:16:07 -0800324 """
325
Dan Talayco69ca4d62012-11-15 11:50:22 -0800326 if self.passive and s and s == self.listen_socket:
Dan Talayco710438c2010-02-18 15:16:07 -0800327 if self.switch_socket:
Rich Lanee1da7ea2012-07-26 15:58:45 -0700328 self.logger.warning("Ignoring incoming connection; already connected to switch")
Rich Laneb4f8ecb2012-09-25 09:36:26 -0700329 (sock, addr) = self.listen_socket.accept()
330 sock.close()
Rich Lanee1da7ea2012-07-26 15:58:45 -0700331 return 0
Dan Talayco710438c2010-02-18 15:16:07 -0800332
Ken Chiange875baf2012-10-09 15:24:40 -0700333 try:
334 (sock, addr) = self.listen_socket.accept()
335 except:
336 self.logger.warning("Error on listen socket accept")
337 return -1
Ken Chiang77173992012-10-30 15:44:39 -0700338 self.logger.info(self.host+":"+str(self.port)+": Incoming connection from "+str(addr))
Rich Lanee1da7ea2012-07-26 15:58:45 -0700339
Rich Laneee3586c2012-07-11 17:26:02 -0700340 with self.connect_cv:
Rich Lanee1da7ea2012-07-26 15:58:45 -0700341 (self.switch_socket, self.switch_addr) = (sock, addr)
Rich Lane82ef1832012-12-22 17:04:35 -0800342 self.switch_socket.setsockopt(socket.IPPROTO_TCP,
343 socket.TCP_NODELAY, True)
Rich Lane1a8d5aa2012-10-08 15:40:03 -0700344 if self.initial_hello:
Rich Lane78ef8b92013-01-10 12:19:23 -0800345 self.message_send(ofp.message.hello())
Rich Lanee1da7ea2012-07-26 15:58:45 -0700346 self.connect_cv.notify() # Notify anyone waiting
Rich Laned929b8d2013-04-15 15:59:14 -0700347
348 # Prevent further connections
349 self.listen_socket.close()
350 self.listen_socket = None
Dan Talaycof8de5182012-04-12 22:38:41 -0700351 elif s and s == self.switch_socket:
352 for idx in range(3): # debug: try a couple of times
353 try:
354 pkt = self.switch_socket.recv(self.rcv_size)
355 except:
356 self.logger.warning("Error on switch read")
357 return -1
358
359 if not self.active:
360 return 0
361
362 if len(pkt) == 0:
363 self.logger.warning("Zero-length switch read, %d" % idx)
364 else:
365 break
Dan Talayco710438c2010-02-18 15:16:07 -0800366
Dan Talaycof8de5182012-04-12 22:38:41 -0700367 if len(pkt) == 0: # Still no packet
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700368 self.logger.warning("Zero-length switch read; closing cxn")
Dan Talaycof8de5182012-04-12 22:38:41 -0700369 self.logger.info(str(self))
370 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800371
Dan Talaycod12b6612010-03-07 22:00:46 -0800372 self._pkt_handle(pkt)
Rich Lane4dfd5e12012-12-22 19:48:01 -0800373 elif s and s == self.waker:
374 self.waker.wait()
Dan Talayco710438c2010-02-18 15:16:07 -0800375 else:
Dan Talayco48370102010-03-03 15:17:33 -0800376 self.logger.error("Unknown socket ready: " + str(s))
Dan Talaycof8de5182012-04-12 22:38:41 -0700377 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800378
Dan Talaycof8de5182012-04-12 22:38:41 -0700379 return 0
Dan Talayco710438c2010-02-18 15:16:07 -0800380
Dan Talayco69ca4d62012-11-15 11:50:22 -0800381 def active_connect(self):
382 """
383 Actively connect to a switch IP addr
384 """
385 try:
386 self.logger.info("Trying active connection to %s" % self.switch)
387 soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
388 soc.connect((self.switch, self.port))
389 self.logger.info("Connected to " + self.switch + " on " +
390 str(self.port))
Rich Lane82ef1832012-12-22 17:04:35 -0800391 soc.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800392 self.switch_addr = (self.switch, self.port)
393 return soc
394 except (StandardError, socket.error), e:
395 self.logger.error("Could not connect to %s at %d:: %s" %
396 (self.switch, self.port, str(e)))
397 return None
398
Rich Lane32797542012-12-22 17:46:05 -0800399 def wakeup(self):
400 """
401 Wake up the event loop, presumably from another thread.
402 """
Rich Lane4dfd5e12012-12-22 19:48:01 -0800403 self.waker.notify()
Rich Lane32797542012-12-22 17:46:05 -0800404
405 def sockets(self):
406 """
407 Return list of sockets to select on.
408 """
Rich Lane4dfd5e12012-12-22 19:48:01 -0800409 socs = [self.listen_socket, self.switch_socket, self.waker]
Rich Lane32797542012-12-22 17:46:05 -0800410 return [x for x in socs if x]
411
Dan Talayco1b3f6902010-02-15 14:14:19 -0800412 def run(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800413 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800414 Activity function for class
Dan Talayco21c75c72010-02-12 22:59:24 -0800415
Dan Talayco1b3f6902010-02-15 14:14:19 -0800416 Assumes connection to switch already exists. Listens on
417 switch_socket for messages until an error (or zero len pkt)
418 occurs.
Dan Talayco21c75c72010-02-12 22:59:24 -0800419
Dan Talayco1b3f6902010-02-15 14:14:19 -0800420 When there is a message on the socket, check for handlers; queue the
421 packet if no one handles the packet.
422
423 See note for controller describing the limitation of a single
424 connection for now.
425 """
426
Rich Lane207502e2012-12-31 14:29:12 -0800427 self.dbg_state = "running"
Ken Chiangadc950f2012-10-05 13:50:03 -0700428
Dan Talayco710438c2010-02-18 15:16:07 -0800429 while self.active:
Dan Talayco710438c2010-02-18 15:16:07 -0800430 try:
431 sel_in, sel_out, sel_err = \
Rich Lane32797542012-12-22 17:46:05 -0800432 select.select(self.sockets(), [], self.sockets(), 1)
Dan Talayco710438c2010-02-18 15:16:07 -0800433 except:
434 print sys.exc_info()
Ken Chiangadc950f2012-10-05 13:50:03 -0700435 self.logger.error("Select error, disconnecting")
436 self.disconnect()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800437
Dan Talayco710438c2010-02-18 15:16:07 -0800438 for s in sel_err:
Ken Chiangadc950f2012-10-05 13:50:03 -0700439 self.logger.error("Got socket error on: " + str(s) + ", disconnecting")
440 self.disconnect()
Dan Talaycof8de5182012-04-12 22:38:41 -0700441
442 for s in sel_in:
443 if self._socket_ready_handle(s) == -1:
Ken Chiangadc950f2012-10-05 13:50:03 -0700444 self.disconnect()
Dan Talayco710438c2010-02-18 15:16:07 -0800445
Dan Talayco710438c2010-02-18 15:16:07 -0800446 # End of main loop
447 self.dbg_state = "closing"
Dan Talayco48370102010-03-03 15:17:33 -0800448 self.logger.info("Exiting controller thread")
Dan Talayco710438c2010-02-18 15:16:07 -0800449 self.shutdown()
450
Rich Lane8806bc42012-07-26 19:18:37 -0700451 def connect(self, timeout=-1):
Dan Talayco710438c2010-02-18 15:16:07 -0800452 """
453 Connect to the switch
454
Rich Lane8806bc42012-07-26 19:18:37 -0700455 @param timeout Block for up to timeout seconds. Pass -1 for the default.
Dan Talayco710438c2010-02-18 15:16:07 -0800456 @return Boolean, True if connected
457 """
458
Dan Talayco69ca4d62012-11-15 11:50:22 -0800459 if not self.passive: # Do active connection now
460 self.logger.info("Attempting to connect to %s on port %s" %
461 (self.switch, str(self.port)))
462 soc = self.active_connect()
463 if soc:
464 self.logger.info("Connected to %s", self.switch)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800465 self.dbg_state = "running"
466 self.switch_socket = soc
Rich Lane32797542012-12-22 17:46:05 -0800467 self.wakeup()
Dan Talayco69ca4d62012-11-15 11:50:22 -0800468 with self.connect_cv:
469 if self.initial_hello:
470 self.message_send(hello())
471 self.connect_cv.notify() # Notify anyone waiting
472 else:
473 self.logger.error("Could not actively connect to switch %s",
474 self.switch)
475 self.active = False
476 else:
477 with self.connect_cv:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800478 ofutils.timed_wait(self.connect_cv, lambda: self.switch_socket,
479 timeout=timeout)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800480
Dan Talayco710438c2010-02-18 15:16:07 -0800481 return self.switch_socket is not None
482
Ken Chiangadc950f2012-10-05 13:50:03 -0700483 def disconnect(self, timeout=-1):
484 """
485 If connected to a switch, disconnect.
486 """
487 if self.switch_socket:
Ken Chiangadc950f2012-10-05 13:50:03 -0700488 self.switch_socket.close()
489 self.switch_socket = None
490 self.switch_addr = None
Ken Chiang74be4722012-12-21 13:07:03 -0800491 with self.packets_cv:
492 self.packets = []
Ken Chiange875baf2012-10-09 15:24:40 -0700493 with self.connect_cv:
494 self.connect_cv.notifyAll()
Ken Chiangadc950f2012-10-05 13:50:03 -0700495
496 def wait_disconnected(self, timeout=-1):
497 """
498 @param timeout Block for up to timeout seconds. Pass -1 for the default.
499 @return Boolean, True if disconnected
500 """
501
Ken Chiange875baf2012-10-09 15:24:40 -0700502 with self.connect_cv:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800503 ofutils.timed_wait(self.connect_cv,
504 lambda: True if not self.switch_socket else None,
505 timeout=timeout)
Ken Chiangadc950f2012-10-05 13:50:03 -0700506 return self.switch_socket is None
507
Dan Talayco710438c2010-02-18 15:16:07 -0800508 def kill(self):
509 """
510 Force the controller thread to quit
Dan Talayco710438c2010-02-18 15:16:07 -0800511 """
512 self.active = False
Rich Lane32797542012-12-22 17:46:05 -0800513 self.wakeup()
Rich Lane376bb402012-12-31 15:20:16 -0800514 self.join()
Dan Talayco21c75c72010-02-12 22:59:24 -0800515
Dan Talayco1b3f6902010-02-15 14:14:19 -0800516 def shutdown(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800517 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800518 Shutdown the controller closing all sockets
Dan Talayco21c75c72010-02-12 22:59:24 -0800519
Dan Talayco1b3f6902010-02-15 14:14:19 -0800520 @todo Might want to synchronize shutdown with self.sync...
521 """
Dan Talaycof8de5182012-04-12 22:38:41 -0700522
Dan Talayco710438c2010-02-18 15:16:07 -0800523 self.active = False
524 try:
525 self.switch_socket.shutdown(socket.SHUT_RDWR)
526 except:
Dan Talayco48370102010-03-03 15:17:33 -0800527 self.logger.info("Ignoring switch soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800528 self.switch_socket = None
Dan Talayco1b3f6902010-02-15 14:14:19 -0800529
Dan Talayco710438c2010-02-18 15:16:07 -0800530 try:
531 self.listen_socket.shutdown(socket.SHUT_RDWR)
532 except:
Dan Talayco48370102010-03-03 15:17:33 -0800533 self.logger.info("Ignoring listen soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800534 self.listen_socket = None
Dan Talaycof8de5182012-04-12 22:38:41 -0700535
Rich Laneee3586c2012-07-11 17:26:02 -0700536 # Wakeup condition variables on which controller may be wait
537 with self.xid_cv:
538 self.xid_cv.notifyAll()
Dan Talaycof8de5182012-04-12 22:38:41 -0700539
Rich Laneee3586c2012-07-11 17:26:02 -0700540 with self.connect_cv:
541 self.connect_cv.notifyAll()
Dan Talaycof8de5182012-04-12 22:38:41 -0700542
Rich Lane32797542012-12-22 17:46:05 -0800543 self.wakeup()
Dan Talayco710438c2010-02-18 15:16:07 -0800544 self.dbg_state = "down"
545
Dan Talayco34089522010-02-07 23:07:41 -0800546 def register(self, msg_type, handler):
547 """
548 Register a callback to receive a specific message type.
549
550 Only one handler may be registered for a given message type.
Dan Talaycod12b6612010-03-07 22:00:46 -0800551
552 WARNING: A lock is held during the handler call back, so
553 the handler should not make any blocking calls
554
Dan Talayco34089522010-02-07 23:07:41 -0800555 @param msg_type The type of message to receive. May be DEFAULT
Dan Talayco21c75c72010-02-12 22:59:24 -0800556 for all non-handled packets. The special type, the string "all"
557 will send all packets to the handler.
Dan Talayco34089522010-02-07 23:07:41 -0800558 @param handler The function to call when a message of the given
559 type is received.
560 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800561 # Should check type is valid
562 if not handler and msg_type in self.handlers.keys():
563 del self.handlers[msg_type]
564 return
565 self.handlers[msg_type] = handler
Dan Talayco34089522010-02-07 23:07:41 -0800566
Rich Laneb64ce3d2012-07-26 15:37:57 -0700567 def poll(self, exp_msg=None, timeout=-1):
Dan Talayco34089522010-02-07 23:07:41 -0800568 """
569 Wait for the next OF message received from the switch.
570
571 @param exp_msg If set, return only when this type of message
Dan Talayco48370102010-03-03 15:17:33 -0800572 is received (unless timeout occurs).
Rich Laneb64ce3d2012-07-26 15:37:57 -0700573
574 @param timeout Maximum number of seconds to wait for the message.
575 Pass -1 for the default timeout.
Dan Talayco34089522010-02-07 23:07:41 -0800576
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800577 @retval A pair (msg, pkt) where msg is a message object and pkt
578 the string representing the packet as received from the socket.
579 This allows additional parsing by the receiver if necessary.
580
Dan Talayco34089522010-02-07 23:07:41 -0800581 The data members in the message are in host endian order.
Dan Talayco48370102010-03-03 15:17:33 -0800582 If an error occurs, (None, None) is returned
Dan Talayco34089522010-02-07 23:07:41 -0800583 """
Dan Talayco34089522010-02-07 23:07:41 -0800584
Rich Lane1879dc72013-03-11 22:08:51 -0700585 exp_msg_str = ofp.ofp_type_map.get(exp_msg, "unknown (%d)" % exp_msg)
586
Ken Chiang77173992012-10-30 15:44:39 -0700587 if exp_msg is not None:
Rich Lane1879dc72013-03-11 22:08:51 -0700588 self.logger.debug("Poll for %s", exp_msg_str)
Ed Swierk9e55e282012-08-22 06:57:28 -0700589 else:
590 self.logger.debug("Poll for any OF message")
Rich Laneb64ce3d2012-07-26 15:37:57 -0700591
592 # Take the packet from the queue
Rich Lanec4f071b2012-07-11 17:25:57 -0700593 def grab():
594 if len(self.packets) > 0:
Ken Chiang77173992012-10-30 15:44:39 -0700595 if exp_msg is None:
Rich Lanec4f071b2012-07-11 17:25:57 -0700596 self.logger.debug("Looking for any packet")
597 (msg, pkt) = self.packets.pop(0)
598 return (msg, pkt)
599 else:
Rich Lane1879dc72013-03-11 22:08:51 -0700600 self.logger.debug("Looking for %s", exp_msg_str)
Rich Lanec4f071b2012-07-11 17:25:57 -0700601 for i in range(len(self.packets)):
602 msg = self.packets[i][0]
Rich Lane1879dc72013-03-11 22:08:51 -0700603 self.logger.debug("Checking packets[%d] (%s)", i, exp_msg_str)
Rich Laneb73808c2013-03-11 15:22:23 -0700604 if msg.type == exp_msg:
Rich Lanec4f071b2012-07-11 17:25:57 -0700605 (msg, pkt) = self.packets.pop(i)
606 return (msg, pkt)
607 # Not found
608 self.logger.debug("Packet not in queue")
Rich Laneb64ce3d2012-07-26 15:37:57 -0700609 return None
Dan Talayco21c75c72010-02-12 22:59:24 -0800610
Rich Lanec4f071b2012-07-11 17:25:57 -0700611 with self.packets_cv:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800612 ret = ofutils.timed_wait(self.packets_cv, grab, timeout=timeout)
Rich Lanec4f071b2012-07-11 17:25:57 -0700613
Rich Laneb64ce3d2012-07-26 15:37:57 -0700614 if ret != None:
615 (msg, pkt) = ret
616 self.logger.debug("Got message %s" % str(msg))
617 return (msg, pkt)
618 else:
619 return (None, None)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800620
Rich Lane8fbfd662013-03-11 15:30:44 -0700621 def transact(self, msg, timeout=-1):
Dan Talayco34089522010-02-07 23:07:41 -0800622 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800623 Run a message transaction with the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800624
625 Send the message in msg and wait for a reply with a matching
Dan Talayco21c75c72010-02-12 22:59:24 -0800626 transaction id. Transactions have the highest priority in
627 received message handling.
Dan Talaycoe37999f2010-02-09 15:27:12 -0800628
Dan Talayco21c75c72010-02-12 22:59:24 -0800629 @param msg The message object to send; must not be a string
Rich Lanee1da7ea2012-07-26 15:58:45 -0700630 @param timeout The timeout in seconds; if -1 use default.
Dan Talayco34089522010-02-07 23:07:41 -0800631 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800632
Rich Lane8fbfd662013-03-11 15:30:44 -0700633 if msg.xid == None:
Rich Laneb73808c2013-03-11 15:22:23 -0700634 msg.xid = ofutils.gen_xid()
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800635
Rich Laneb73808c2013-03-11 15:22:23 -0700636 self.logger.debug("Running transaction %d" % msg.xid)
Dan Talayco21c75c72010-02-12 22:59:24 -0800637
Rich Lane9aca1992012-07-11 17:26:31 -0700638 with self.xid_cv:
639 if self.xid:
640 self.logger.error("Can only run one transaction at a time")
641 return (None, None)
Dan Talaycof8de5182012-04-12 22:38:41 -0700642
Rich Laneb73808c2013-03-11 15:22:23 -0700643 self.xid = msg.xid
Dan Talaycod12b6612010-03-07 22:00:46 -0800644 self.xid_response = None
Rich Lane5c3151c2013-01-03 17:15:41 -0800645 self.message_send(msg.pack())
Rich Lane9aca1992012-07-11 17:26:31 -0700646
Rich Laneb73808c2013-03-11 15:22:23 -0700647 self.logger.debug("Waiting for transaction %d" % msg.xid)
Rich Lanecd97d3d2013-01-07 18:50:06 -0800648 ofutils.timed_wait(self.xid_cv, lambda: self.xid_response, timeout=timeout)
Rich Lane9aca1992012-07-11 17:26:31 -0700649
650 if self.xid_response:
651 (resp, pkt) = self.xid_response
652 self.xid_response = None
653 else:
654 (resp, pkt) = (None, None)
655
Dan Talayco09c2c592010-05-13 14:21:52 -0700656 if resp is None:
657 self.logger.warning("No response for xid " + str(self.xid))
658 return (resp, pkt)
Dan Talayco34089522010-02-07 23:07:41 -0800659
Rich Lane8fbfd662013-03-11 15:30:44 -0700660 def message_send(self, msg):
Dan Talayco34089522010-02-07 23:07:41 -0800661 """
662 Send the message to the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800663
Dan Talayco11c26e72010-03-07 22:03:57 -0800664 @param msg A string or OpenFlow message object to be forwarded to
665 the switch.
Dan Talayco21c75c72010-02-12 22:59:24 -0800666 """
667
Dan Talayco1b3f6902010-02-15 14:14:19 -0800668 if not self.switch_socket:
669 # Sending a string indicates the message is ready to go
Ed Swierk9e55e282012-08-22 06:57:28 -0700670 raise Exception("no socket")
Dan Talayco710438c2010-02-18 15:16:07 -0800671 #@todo If not string, try to pack
Dan Talayco21c75c72010-02-12 22:59:24 -0800672 if type(msg) != type(""):
Rich Lane8fbfd662013-03-11 15:30:44 -0700673 if msg.xid == None:
Rich Laneb73808c2013-03-11 15:22:23 -0700674 msg.xid = ofutils.gen_xid()
Ed Swierk9e55e282012-08-22 06:57:28 -0700675 outpkt = msg.pack()
Dan Talayco710438c2010-02-18 15:16:07 -0800676 else:
677 outpkt = msg
Dan Talayco21c75c72010-02-12 22:59:24 -0800678
Rich Lanef18980d2012-12-31 17:11:41 -0800679 msg_version, msg_type, msg_len, msg_xid = struct.unpack_from("!BBHL", outpkt)
Rich Lane5d63b9c2013-01-11 14:12:37 -0800680 self.logger.debug("Msg out: buf len %d. hdr.type %s. hdr.len %d hdr.version %d hdr.xid %d",
Rich Lanef18980d2012-12-31 17:11:41 -0800681 len(outpkt),
Rich Laned7b0ffa2013-03-08 15:53:42 -0800682 ofp.ofp_type_map.get(msg_type, "unknown (%d)" % msg_type),
Rich Lanecd97d3d2013-01-07 18:50:06 -0800683 msg_len,
Rich Lane5d63b9c2013-01-11 14:12:37 -0800684 msg_version,
685 msg_xid)
Ed Swierk9e55e282012-08-22 06:57:28 -0700686 if self.switch_socket.sendall(outpkt) is not None:
Rich Lane5c3151c2013-01-03 17:15:41 -0800687 raise AssertionError("failed to send message to switch")
Dan Talayco710438c2010-02-18 15:16:07 -0800688
Rich Lane5c3151c2013-01-03 17:15:41 -0800689 return 0 # for backwards compatibility
Dan Talayco21c75c72010-02-12 22:59:24 -0800690
691 def __str__(self):
692 string = "Controller:\n"
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800693 string += " state " + self.dbg_state + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800694 string += " switch_addr " + str(self.switch_addr) + "\n"
695 string += " pending pkts " + str(len(self.packets)) + "\n"
696 string += " total pkts " + str(self.packets_total) + "\n"
697 string += " expired pkts " + str(self.packets_expired) + "\n"
698 string += " handled pkts " + str(self.packets_handled) + "\n"
Dan Talaycoe226eb12010-02-18 23:06:30 -0800699 string += " poll discards " + str(self.poll_discards) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800700 string += " parse errors " + str(self.parse_errors) + "\n"
701 string += " sock errrors " + str(self.socket_errors) + "\n"
702 string += " max pkts " + str(self.max_pkts) + "\n"
Dan Talayco69ca4d62012-11-15 11:50:22 -0800703 string += " target switch " + str(self.switch) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800704 string += " host " + str(self.host) + "\n"
705 string += " port " + str(self.port) + "\n"
706 string += " keep_alive " + str(self.keep_alive) + "\n"
Dan Talaycof8de5182012-04-12 22:38:41 -0700707 string += " pkt_in_run " + str(self.pkt_in_run) + "\n"
708 string += " pkt_in_dropped " + str(self.pkt_in_dropped) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800709 return string
710
711 def show(self):
712 print str(self)
713
714def sample_handler(controller, msg, pkt):
715 """
716 Sample message handler
717
718 This is the prototype for functions registered with the controller
719 class for packet reception
720
721 @param controller The controller calling the handler
722 @param msg The parsed message object
723 @param pkt The raw packet that was received on the socket. This is
724 in case the packet contains extra unparsed data.
725 @returns Boolean value indicating if the packet was handled. If
726 not handled, the packet is placed in the queue for pollers to received
727 """
728 pass