blob: 404b185e3eb9815198cbd5653a9b34d59b72da21 [file] [log] [blame]
Dan Talayco34089522010-02-07 23:07:41 -08001"""
2OpenFlow Test Framework
3
4Controller class
5
6Provide the interface to the control channel to the switch under test.
7
8Class inherits from thread so as to run in background allowing
9asynchronous callbacks (if needed, not required). Also supports
10polling.
11
12The controller thread maintains a queue. Incoming messages that
13are not handled by a callback function are placed in this queue for
14poll calls.
15
16Callbacks and polling support specifying the message type
17
18@todo Support transaction semantics via xid
Dan Talayco1b3f6902010-02-15 14:14:19 -080019@todo Support select and listen on an administrative socket (or
20use a timeout to support clean shutdown).
21
22Currently only one connection is accepted during the life of
23the controller. There seems
24to be no clean way to interrupt an accept call. Using select that also listens
25on an administrative socket and can shut down the socket might work.
26
Dan Talayco34089522010-02-07 23:07:41 -080027"""
28
Dan Talayco34089522010-02-07 23:07:41 -080029import os
30import socket
31import time
Rich Lanecd97d3d2013-01-07 18:50:06 -080032import struct
33import select
34import logging
Dan Talayco34089522010-02-07 23:07:41 -080035from threading import Thread
36from threading import Lock
Dan Talayco21c75c72010-02-12 22:59:24 -080037from threading import Condition
Rich Lane9fd05682013-01-10 15:30:38 -080038import ofp
Rich Lanecd97d3d2013-01-07 18:50:06 -080039import ofutils
Dan Talayco48370102010-03-03 15:17:33 -080040
Dan Talaycof8de5182012-04-12 22:38:41 -070041
42FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.'
43 for x in range(256)])
44
45def hex_dump_buffer(src, length=16):
46 """
47 Convert src to a hex dump string and return the string
48 @param src The source buffer
49 @param length The number of bytes shown in each line
50 @returns A string showing the hex dump
51 """
52 result = ["\n"]
53 for i in xrange(0, len(src), length):
54 chars = src[i:i+length]
55 hex = ' '.join(["%02x" % ord(x) for x in chars])
56 printable = ''.join(["%s" % ((ord(x) <= 127 and
57 FILTER[ord(x)]) or '.') for x in chars])
58 result.append("%04x %-*s %s\n" % (i, length*3, hex, printable))
59 return ''.join(result)
60
Dan Talayco48370102010-03-03 15:17:33 -080061##@todo Find a better home for these identifiers (controller)
Glen Gibb741b1182010-07-08 16:43:58 -070062RCV_SIZE_DEFAULT = 32768
Dan Talayco48370102010-03-03 15:17:33 -080063LISTEN_QUEUE_SIZE = 1
Dan Talayco34089522010-02-07 23:07:41 -080064
65class Controller(Thread):
66 """
67 Class abstracting the control interface to the switch.
68
69 For receiving messages, two mechanism will be implemented. First,
70 query the interface with poll. Second, register to have a
71 function called by message type. The callback is passed the
72 message type as well as the raw packet (or message object)
73
74 One of the main purposes of this object is to translate between network
75 and host byte order. 'Above' this object, things should be in host
76 byte order.
Dan Talayco21c75c72010-02-12 22:59:24 -080077
78 @todo Consider using SocketServer for listening socket
79 @todo Test transaction code
80
81 @var rcv_size The receive size to use for receive calls
82 @var max_pkts The max size of the receive queue
83 @var keep_alive If true, listen for echo requests and respond w/
84 echo replies
Dan Talayco710438c2010-02-18 15:16:07 -080085 @var initial_hello If true, will send a hello message immediately
86 upon connecting to the switch
Dan Talayco69ca4d62012-11-15 11:50:22 -080087 @var switch If not None, do an active connection to the switch
Dan Talayco21c75c72010-02-12 22:59:24 -080088 @var host The host to use for connect
89 @var port The port to connect on
90 @var packets_total Total number of packets received
91 @var packets_expired Number of packets popped from queue as queue full
92 @var packets_handled Number of packets handled by something
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080093 @var dbg_state Debug indication of state
Dan Talayco34089522010-02-07 23:07:41 -080094 """
95
Dan Talayco69ca4d62012-11-15 11:50:22 -080096 def __init__(self, switch=None, host='127.0.0.1', port=6633, max_pkts=1024):
Dan Talayco21c75c72010-02-12 22:59:24 -080097 Thread.__init__(self)
Dan Talayco1b3f6902010-02-15 14:14:19 -080098 # Socket related
Dan Talayco21c75c72010-02-12 22:59:24 -080099 self.rcv_size = RCV_SIZE_DEFAULT
Dan Talayco1b3f6902010-02-15 14:14:19 -0800100 self.listen_socket = None
101 self.switch_socket = None
102 self.switch_addr = None
Dan Talayco710438c2010-02-18 15:16:07 -0800103 self.connect_cv = Condition()
Dan Talaycoe226eb12010-02-18 23:06:30 -0800104 self.message_cv = Condition()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800105
Rich Lane4dfd5e12012-12-22 19:48:01 -0800106 # Used to wake up the event loop from another thread
Rich Lanecd97d3d2013-01-07 18:50:06 -0800107 self.waker = ofutils.EventDescriptor()
Rich Lane32797542012-12-22 17:46:05 -0800108
Dan Talayco1b3f6902010-02-15 14:14:19 -0800109 # Counters
Dan Talayco21c75c72010-02-12 22:59:24 -0800110 self.socket_errors = 0
111 self.parse_errors = 0
Dan Talayco21c75c72010-02-12 22:59:24 -0800112 self.packets_total = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -0800113 self.packets_expired = 0
114 self.packets_handled = 0
Dan Talaycoe226eb12010-02-18 23:06:30 -0800115 self.poll_discards = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -0800116
117 # State
Dan Talayco21c75c72010-02-12 22:59:24 -0800118 self.sync = Lock()
119 self.handlers = {}
120 self.keep_alive = False
Dan Talayco710438c2010-02-18 15:16:07 -0800121 self.active = True
122 self.initial_hello = True
Dan Talayco1b3f6902010-02-15 14:14:19 -0800123
Rich Lanec4f071b2012-07-11 17:25:57 -0700124 # OpenFlow message/packet queue
125 # Protected by the packets_cv lock / condition variable
126 self.packets = []
127 self.packets_cv = Condition()
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700128 self.packet_in_count = 0
Rich Lanec4f071b2012-07-11 17:25:57 -0700129
Dan Talayco1b3f6902010-02-15 14:14:19 -0800130 # Settings
131 self.max_pkts = max_pkts
Dan Talayco69ca4d62012-11-15 11:50:22 -0800132 self.switch = switch
133 self.passive = not self.switch
Dan Talayco48370102010-03-03 15:17:33 -0800134 self.host = host
135 self.port = port
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800136 self.dbg_state = "init"
Dan Talayco48370102010-03-03 15:17:33 -0800137 self.logger = logging.getLogger("controller")
Dan Talaycof8de5182012-04-12 22:38:41 -0700138 self.filter_packet_in = False # Drop "excessive" packet ins
139 self.pkt_in_run = 0 # Count on run of packet ins
140 self.pkt_in_filter_limit = 50 # Count on run of packet ins
141 self.pkt_in_dropped = 0 # Total dropped packet ins
142 self.transact_to = 15 # Transact timeout default value; add to config
Dan Talayco1b3f6902010-02-15 14:14:19 -0800143
Dan Talaycoe226eb12010-02-18 23:06:30 -0800144 # Transaction and message type waiting variables
145 # xid_cv: Condition variable (semaphore) for packet waiters
Dan Talayco21c75c72010-02-12 22:59:24 -0800146 # xid: Transaction ID being waited on
147 # xid_response: Transaction response message
148 self.xid_cv = Condition()
149 self.xid = None
150 self.xid_response = None
151
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800152 self.buffered_input = ""
Dan Talaycoe226eb12010-02-18 23:06:30 -0800153
Rich Lane207502e2012-12-31 14:29:12 -0800154 # Create listen socket
155 if self.passive:
156 self.logger.info("Create/listen at " + self.host + ":" +
157 str(self.port))
158 self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
159 self.listen_socket.setsockopt(socket.SOL_SOCKET,
160 socket.SO_REUSEADDR, 1)
161 self.listen_socket.bind((self.host, self.port))
162 self.listen_socket.listen(LISTEN_QUEUE_SIZE)
163
Dan Talaycof8de5182012-04-12 22:38:41 -0700164 def filter_packet(self, rawmsg, hdr):
165 """
166 Check if packet should be filtered
167
168 Currently filters packet in messages
169 @return Boolean, True if packet should be dropped
170 """
Rich Lane1622bbb2013-03-11 17:11:53 -0700171 # XXX didn't actually check for packet-in...
172 return False
Dan Talaycof8de5182012-04-12 22:38:41 -0700173 # Add check for packet in and rate limit
174 if self.filter_packet_in:
Rich Lanec4f071b2012-07-11 17:25:57 -0700175 # If we were dropping packets, report number dropped
176 # TODO dont drop expected packet ins
177 if self.pkt_in_run > self.pkt_in_filter_limit:
178 self.logger.debug("Dropped %d packet ins (%d total)"
179 % ((self.pkt_in_run -
180 self.pkt_in_filter_limit),
181 self.pkt_in_dropped))
182 self.pkt_in_run = 0
Dan Talaycof8de5182012-04-12 22:38:41 -0700183
184 return False
185
Dan Talaycod12b6612010-03-07 22:00:46 -0800186 def _pkt_handle(self, pkt):
187 """
188 Check for all packet handling conditions
189
190 Parse and verify message
191 Check if XID matches something waiting
192 Check if message is being expected for a poll operation
193 Check if keep alive is on and message is an echo request
194 Check if any registered handler wants the packet
195 Enqueue if none of those conditions is met
196
197 an echo request in case keep_alive is true, followed by
198 registered message handlers.
Glen Gibb6d467062010-07-08 16:15:08 -0700199 @param pkt The raw packet (string) which may contain multiple OF msgs
Dan Talaycod12b6612010-03-07 22:00:46 -0800200 """
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800201
202 # snag any left over data from last read()
203 pkt = self.buffered_input + pkt
204 self.buffered_input = ""
205
Glen Gibb6d467062010-07-08 16:15:08 -0700206 # Process each of the OF msgs inside the pkt
207 offset = 0
208 while offset < len(pkt):
Rich Lane1622bbb2013-03-11 17:11:53 -0700209 if offset + 8 > len(pkt):
210 break
211
Glen Gibb6d467062010-07-08 16:15:08 -0700212 # Parse the header to get type
Rich Lane1622bbb2013-03-11 17:11:53 -0700213 hdr_version, hdr_type, hdr_length, hdr_xid = ofp.message.parse_header(pkt[offset:])
Dan Talaycod12b6612010-03-07 22:00:46 -0800214
Glen Gibb6d467062010-07-08 16:15:08 -0700215 # Extract the raw message bytes
Rich Lane1622bbb2013-03-11 17:11:53 -0700216 if (offset + hdr_length) > len(pkt):
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800217 break
Rich Lane1622bbb2013-03-11 17:11:53 -0700218 rawmsg = pkt[offset : offset + hdr_length]
219 offset += hdr_length
Dan Talaycof8de5182012-04-12 22:38:41 -0700220
Rich Lane1622bbb2013-03-11 17:11:53 -0700221 #if self.filter_packet(rawmsg, hdr):
222 # continue
Dan Talaycof8de5182012-04-12 22:38:41 -0700223
Rich Lane1879dc72013-03-11 22:08:51 -0700224 self.logger.debug("Msg in: version %d type %s (%d) len %d xid %d",
225 hdr_version,
226 ofp.ofp_type_map.get(hdr_type, "unknown"), hdr_type,
227 hdr_length, hdr_version)
Rich Lane1622bbb2013-03-11 17:11:53 -0700228 if hdr_version < ofp.OFP_VERSION:
Rich Lanec44b6242013-01-10 12:23:54 -0800229 self.logger.error("Switch only supports up to OpenFlow version %d (OFTest version is %d)",
Rich Lane1622bbb2013-03-11 17:11:53 -0700230 hdr_version, ofp.OFP_VERSION)
Rich Lanec44b6242013-01-10 12:23:54 -0800231 print "Switch only supports up to OpenFlow version %d (OFTest version is %d)" % \
Rich Lane1622bbb2013-03-11 17:11:53 -0700232 (hdr_version, ofp.OFP_VERSION)
Ken Chiangadc950f2012-10-05 13:50:03 -0700233 self.disconnect()
Dan Talaycof8de5182012-04-12 22:38:41 -0700234 return
Dan Talaycod12b6612010-03-07 22:00:46 -0800235
Rich Lanef6883512013-03-11 17:00:09 -0700236 msg = ofp.message.parse_message(rawmsg)
Glen Gibb6d467062010-07-08 16:15:08 -0700237 if not msg:
238 self.parse_errors += 1
239 self.logger.warn("Could not parse message")
240 continue
241
Rich Lanec4f071b2012-07-11 17:25:57 -0700242 with self.sync:
243 # Check if transaction is waiting
244 with self.xid_cv:
Rich Lane1622bbb2013-03-11 17:11:53 -0700245 if self.xid and hdr_xid == self.xid:
246 self.logger.debug("Matched expected XID " + str(hdr_xid))
Rich Lanec4f071b2012-07-11 17:25:57 -0700247 self.xid_response = (msg, rawmsg)
248 self.xid = None
249 self.xid_cv.notify()
250 continue
Glen Gibb6d467062010-07-08 16:15:08 -0700251
Rich Lanec4f071b2012-07-11 17:25:57 -0700252 # Check if keep alive is set; if so, respond to echo requests
253 if self.keep_alive:
Rich Lane1622bbb2013-03-11 17:11:53 -0700254 if hdr_type == ofp.OFPT_ECHO_REQUEST:
Rich Lanec4f071b2012-07-11 17:25:57 -0700255 self.logger.debug("Responding to echo request")
Rich Lane78ef8b92013-01-10 12:19:23 -0800256 rep = ofp.message.echo_reply()
Rich Lane1622bbb2013-03-11 17:11:53 -0700257 rep.xid = hdr_xid
Rich Lanec4f071b2012-07-11 17:25:57 -0700258 # Ignoring additional data
Rich Lane8fbfd662013-03-11 15:30:44 -0700259 self.message_send(rep.pack())
Rich Lanec4f071b2012-07-11 17:25:57 -0700260 continue
Glen Gibb6d467062010-07-08 16:15:08 -0700261
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700262 # Generalize to counters for all packet types?
263 if msg.type == ofp.OFPT_PACKET_IN:
264 self.packet_in_count += 1
265
Rich Lane5d63b9c2013-01-11 14:12:37 -0800266 # Log error messages
Rich Lane1622bbb2013-03-11 17:11:53 -0700267 if hdr_type == ofp.OFPT_ERROR:
Rich Laneb73808c2013-03-11 15:22:23 -0700268 if msg.err_type in ofp.ofp_error_type_map:
269 type_str = ofp.ofp_error_type_map[msg.err_type]
270 if msg.err_type == ofp.OFPET_HELLO_FAILED:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800271 code_map = ofp.ofp_hello_failed_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700272 elif msg.err_type == ofp.OFPET_BAD_REQUEST:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800273 code_map = ofp.ofp_bad_request_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700274 elif msg.err_type == ofp.OFPET_BAD_ACTION:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800275 code_map = ofp.ofp_bad_action_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700276 elif msg.err_type == ofp.OFPET_FLOW_MOD_FAILED:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800277 code_map = ofp.ofp_flow_mod_failed_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700278 elif msg.err_type == ofp.OFPET_PORT_MOD_FAILED:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800279 code_map = ofp.ofp_port_mod_failed_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700280 elif msg.err_type == ofp.OFPET_QUEUE_OP_FAILED:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800281 code_map = ofp.ofp_queue_op_failed_code_map
282 else:
283 code_map = None
284
285 if code_map and msg.code in code_map:
286 code_str = code_map[msg.code]
287 else:
288 code_str = "unknown"
289 else:
290 type_str = "unknown"
Rich Lane1879dc72013-03-11 22:08:51 -0700291 code_str = "unknown"
Rich Lane5d63b9c2013-01-11 14:12:37 -0800292 self.logger.warn("Received error message: xid=%d type=%s (%d) code=%s (%d)",
Rich Lane1622bbb2013-03-11 17:11:53 -0700293 hdr_xid, type_str, msg.err_type, code_str, msg.code)
Rich Lane5d63b9c2013-01-11 14:12:37 -0800294
Rich Lanec4f071b2012-07-11 17:25:57 -0700295 # Now check for message handlers; preference is given to
296 # handlers for a specific packet
297 handled = False
Rich Lane1622bbb2013-03-11 17:11:53 -0700298 if hdr_type in self.handlers.keys():
299 handled = self.handlers[hdr_type](self, msg, rawmsg)
Rich Lanec4f071b2012-07-11 17:25:57 -0700300 if not handled and ("all" in self.handlers.keys()):
301 handled = self.handlers["all"](self, msg, rawmsg)
Glen Gibb6d467062010-07-08 16:15:08 -0700302
Rich Lanec4f071b2012-07-11 17:25:57 -0700303 if not handled: # Not handled, enqueue
Rich Lane1879dc72013-03-11 22:08:51 -0700304 self.logger.debug("Enqueuing pkt type %s (%d)",
305 ofp.ofp_type_map.get(hdr_type, "unknown"),
306 hdr_type)
Rich Lanec4f071b2012-07-11 17:25:57 -0700307 with self.packets_cv:
308 if len(self.packets) >= self.max_pkts:
309 self.packets.pop(0)
310 self.packets_expired += 1
311 self.packets.append((msg, rawmsg))
312 self.packets_cv.notify_all()
313 self.packets_total += 1
314 else:
315 self.packets_handled += 1
316 self.logger.debug("Message handled by callback")
Glen Gibb6d467062010-07-08 16:15:08 -0700317
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800318 # end of 'while offset < len(pkt)'
319 # note that if offset = len(pkt), this is
320 # appends a harmless empty string
321 self.buffered_input += pkt[offset:]
Dan Talaycod12b6612010-03-07 22:00:46 -0800322
Dan Talayco710438c2010-02-18 15:16:07 -0800323 def _socket_ready_handle(self, s):
324 """
325 Handle an input-ready socket
Dan Talaycof8de5182012-04-12 22:38:41 -0700326
Dan Talayco710438c2010-02-18 15:16:07 -0800327 @param s The socket object that is ready
Dan Talaycof8de5182012-04-12 22:38:41 -0700328 @returns 0 on success, -1 on error
Dan Talayco710438c2010-02-18 15:16:07 -0800329 """
330
Dan Talayco69ca4d62012-11-15 11:50:22 -0800331 if self.passive and s and s == self.listen_socket:
Dan Talayco710438c2010-02-18 15:16:07 -0800332 if self.switch_socket:
Rich Lanee1da7ea2012-07-26 15:58:45 -0700333 self.logger.warning("Ignoring incoming connection; already connected to switch")
Rich Laneb4f8ecb2012-09-25 09:36:26 -0700334 (sock, addr) = self.listen_socket.accept()
335 sock.close()
Rich Lanee1da7ea2012-07-26 15:58:45 -0700336 return 0
Dan Talayco710438c2010-02-18 15:16:07 -0800337
Ken Chiange875baf2012-10-09 15:24:40 -0700338 try:
339 (sock, addr) = self.listen_socket.accept()
340 except:
341 self.logger.warning("Error on listen socket accept")
342 return -1
Ken Chiang77173992012-10-30 15:44:39 -0700343 self.logger.info(self.host+":"+str(self.port)+": Incoming connection from "+str(addr))
Rich Lanee1da7ea2012-07-26 15:58:45 -0700344
Rich Laneee3586c2012-07-11 17:26:02 -0700345 with self.connect_cv:
Rich Lanee1da7ea2012-07-26 15:58:45 -0700346 (self.switch_socket, self.switch_addr) = (sock, addr)
Rich Lane82ef1832012-12-22 17:04:35 -0800347 self.switch_socket.setsockopt(socket.IPPROTO_TCP,
348 socket.TCP_NODELAY, True)
Rich Lane1a8d5aa2012-10-08 15:40:03 -0700349 if self.initial_hello:
Rich Lane78ef8b92013-01-10 12:19:23 -0800350 self.message_send(ofp.message.hello())
Rich Lanee1da7ea2012-07-26 15:58:45 -0700351 self.connect_cv.notify() # Notify anyone waiting
Dan Talaycof8de5182012-04-12 22:38:41 -0700352 elif s and s == self.switch_socket:
353 for idx in range(3): # debug: try a couple of times
354 try:
355 pkt = self.switch_socket.recv(self.rcv_size)
356 except:
357 self.logger.warning("Error on switch read")
358 return -1
359
360 if not self.active:
361 return 0
362
363 if len(pkt) == 0:
364 self.logger.warning("Zero-length switch read, %d" % idx)
365 else:
366 break
Dan Talayco710438c2010-02-18 15:16:07 -0800367
Dan Talaycof8de5182012-04-12 22:38:41 -0700368 if len(pkt) == 0: # Still no packet
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700369 self.logger.warning("Zero-length switch read; closing cxn")
Dan Talaycof8de5182012-04-12 22:38:41 -0700370 self.logger.info(str(self))
371 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800372
Dan Talaycod12b6612010-03-07 22:00:46 -0800373 self._pkt_handle(pkt)
Rich Lane4dfd5e12012-12-22 19:48:01 -0800374 elif s and s == self.waker:
375 self.waker.wait()
Dan Talayco710438c2010-02-18 15:16:07 -0800376 else:
Dan Talayco48370102010-03-03 15:17:33 -0800377 self.logger.error("Unknown socket ready: " + str(s))
Dan Talaycof8de5182012-04-12 22:38:41 -0700378 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800379
Dan Talaycof8de5182012-04-12 22:38:41 -0700380 return 0
Dan Talayco710438c2010-02-18 15:16:07 -0800381
Dan Talayco69ca4d62012-11-15 11:50:22 -0800382 def active_connect(self):
383 """
384 Actively connect to a switch IP addr
385 """
386 try:
387 self.logger.info("Trying active connection to %s" % self.switch)
388 soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
389 soc.connect((self.switch, self.port))
390 self.logger.info("Connected to " + self.switch + " on " +
391 str(self.port))
Rich Lane82ef1832012-12-22 17:04:35 -0800392 soc.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800393 self.switch_addr = (self.switch, self.port)
394 return soc
395 except (StandardError, socket.error), e:
396 self.logger.error("Could not connect to %s at %d:: %s" %
397 (self.switch, self.port, str(e)))
398 return None
399
Rich Lane32797542012-12-22 17:46:05 -0800400 def wakeup(self):
401 """
402 Wake up the event loop, presumably from another thread.
403 """
Rich Lane4dfd5e12012-12-22 19:48:01 -0800404 self.waker.notify()
Rich Lane32797542012-12-22 17:46:05 -0800405
406 def sockets(self):
407 """
408 Return list of sockets to select on.
409 """
Rich Lane4dfd5e12012-12-22 19:48:01 -0800410 socs = [self.listen_socket, self.switch_socket, self.waker]
Rich Lane32797542012-12-22 17:46:05 -0800411 return [x for x in socs if x]
412
Dan Talayco1b3f6902010-02-15 14:14:19 -0800413 def run(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800414 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800415 Activity function for class
Dan Talayco21c75c72010-02-12 22:59:24 -0800416
Dan Talayco1b3f6902010-02-15 14:14:19 -0800417 Assumes connection to switch already exists. Listens on
418 switch_socket for messages until an error (or zero len pkt)
419 occurs.
Dan Talayco21c75c72010-02-12 22:59:24 -0800420
Dan Talayco1b3f6902010-02-15 14:14:19 -0800421 When there is a message on the socket, check for handlers; queue the
422 packet if no one handles the packet.
423
424 See note for controller describing the limitation of a single
425 connection for now.
426 """
427
Rich Lane207502e2012-12-31 14:29:12 -0800428 self.dbg_state = "running"
Ken Chiangadc950f2012-10-05 13:50:03 -0700429
Dan Talayco710438c2010-02-18 15:16:07 -0800430 while self.active:
Dan Talayco710438c2010-02-18 15:16:07 -0800431 try:
432 sel_in, sel_out, sel_err = \
Rich Lane32797542012-12-22 17:46:05 -0800433 select.select(self.sockets(), [], self.sockets(), 1)
Dan Talayco710438c2010-02-18 15:16:07 -0800434 except:
435 print sys.exc_info()
Ken Chiangadc950f2012-10-05 13:50:03 -0700436 self.logger.error("Select error, disconnecting")
437 self.disconnect()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800438
Dan Talayco710438c2010-02-18 15:16:07 -0800439 for s in sel_err:
Ken Chiangadc950f2012-10-05 13:50:03 -0700440 self.logger.error("Got socket error on: " + str(s) + ", disconnecting")
441 self.disconnect()
Dan Talaycof8de5182012-04-12 22:38:41 -0700442
443 for s in sel_in:
444 if self._socket_ready_handle(s) == -1:
Ken Chiangadc950f2012-10-05 13:50:03 -0700445 self.disconnect()
Dan Talayco710438c2010-02-18 15:16:07 -0800446
Dan Talayco710438c2010-02-18 15:16:07 -0800447 # End of main loop
448 self.dbg_state = "closing"
Dan Talayco48370102010-03-03 15:17:33 -0800449 self.logger.info("Exiting controller thread")
Dan Talayco710438c2010-02-18 15:16:07 -0800450 self.shutdown()
451
Rich Lane8806bc42012-07-26 19:18:37 -0700452 def connect(self, timeout=-1):
Dan Talayco710438c2010-02-18 15:16:07 -0800453 """
454 Connect to the switch
455
Rich Lane8806bc42012-07-26 19:18:37 -0700456 @param timeout Block for up to timeout seconds. Pass -1 for the default.
Dan Talayco710438c2010-02-18 15:16:07 -0800457 @return Boolean, True if connected
458 """
459
Dan Talayco69ca4d62012-11-15 11:50:22 -0800460 if not self.passive: # Do active connection now
461 self.logger.info("Attempting to connect to %s on port %s" %
462 (self.switch, str(self.port)))
463 soc = self.active_connect()
464 if soc:
465 self.logger.info("Connected to %s", self.switch)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800466 self.dbg_state = "running"
467 self.switch_socket = soc
Rich Lane32797542012-12-22 17:46:05 -0800468 self.wakeup()
Dan Talayco69ca4d62012-11-15 11:50:22 -0800469 with self.connect_cv:
470 if self.initial_hello:
471 self.message_send(hello())
472 self.connect_cv.notify() # Notify anyone waiting
473 else:
474 self.logger.error("Could not actively connect to switch %s",
475 self.switch)
476 self.active = False
477 else:
478 with self.connect_cv:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800479 ofutils.timed_wait(self.connect_cv, lambda: self.switch_socket,
480 timeout=timeout)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800481
Dan Talayco710438c2010-02-18 15:16:07 -0800482 return self.switch_socket is not None
483
Ken Chiangadc950f2012-10-05 13:50:03 -0700484 def disconnect(self, timeout=-1):
485 """
486 If connected to a switch, disconnect.
487 """
488 if self.switch_socket:
Ken Chiangadc950f2012-10-05 13:50:03 -0700489 self.switch_socket.close()
490 self.switch_socket = None
491 self.switch_addr = None
Ken Chiang74be4722012-12-21 13:07:03 -0800492 with self.packets_cv:
493 self.packets = []
Ken Chiange875baf2012-10-09 15:24:40 -0700494 with self.connect_cv:
495 self.connect_cv.notifyAll()
Ken Chiangadc950f2012-10-05 13:50:03 -0700496
497 def wait_disconnected(self, timeout=-1):
498 """
499 @param timeout Block for up to timeout seconds. Pass -1 for the default.
500 @return Boolean, True if disconnected
501 """
502
Ken Chiange875baf2012-10-09 15:24:40 -0700503 with self.connect_cv:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800504 ofutils.timed_wait(self.connect_cv,
505 lambda: True if not self.switch_socket else None,
506 timeout=timeout)
Ken Chiangadc950f2012-10-05 13:50:03 -0700507 return self.switch_socket is None
508
Dan Talayco710438c2010-02-18 15:16:07 -0800509 def kill(self):
510 """
511 Force the controller thread to quit
Dan Talayco710438c2010-02-18 15:16:07 -0800512 """
513 self.active = False
Rich Lane32797542012-12-22 17:46:05 -0800514 self.wakeup()
Rich Lane376bb402012-12-31 15:20:16 -0800515 self.join()
Dan Talayco21c75c72010-02-12 22:59:24 -0800516
Dan Talayco1b3f6902010-02-15 14:14:19 -0800517 def shutdown(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800518 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800519 Shutdown the controller closing all sockets
Dan Talayco21c75c72010-02-12 22:59:24 -0800520
Dan Talayco1b3f6902010-02-15 14:14:19 -0800521 @todo Might want to synchronize shutdown with self.sync...
522 """
Dan Talaycof8de5182012-04-12 22:38:41 -0700523
Dan Talayco710438c2010-02-18 15:16:07 -0800524 self.active = False
525 try:
526 self.switch_socket.shutdown(socket.SHUT_RDWR)
527 except:
Dan Talayco48370102010-03-03 15:17:33 -0800528 self.logger.info("Ignoring switch soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800529 self.switch_socket = None
Dan Talayco1b3f6902010-02-15 14:14:19 -0800530
Dan Talayco710438c2010-02-18 15:16:07 -0800531 try:
532 self.listen_socket.shutdown(socket.SHUT_RDWR)
533 except:
Dan Talayco48370102010-03-03 15:17:33 -0800534 self.logger.info("Ignoring listen soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800535 self.listen_socket = None
Dan Talaycof8de5182012-04-12 22:38:41 -0700536
Rich Laneee3586c2012-07-11 17:26:02 -0700537 # Wakeup condition variables on which controller may be wait
538 with self.xid_cv:
539 self.xid_cv.notifyAll()
Dan Talaycof8de5182012-04-12 22:38:41 -0700540
Rich Laneee3586c2012-07-11 17:26:02 -0700541 with self.connect_cv:
542 self.connect_cv.notifyAll()
Dan Talaycof8de5182012-04-12 22:38:41 -0700543
Rich Lane32797542012-12-22 17:46:05 -0800544 self.wakeup()
Dan Talayco710438c2010-02-18 15:16:07 -0800545 self.dbg_state = "down"
546
Dan Talayco34089522010-02-07 23:07:41 -0800547 def register(self, msg_type, handler):
548 """
549 Register a callback to receive a specific message type.
550
551 Only one handler may be registered for a given message type.
Dan Talaycod12b6612010-03-07 22:00:46 -0800552
553 WARNING: A lock is held during the handler call back, so
554 the handler should not make any blocking calls
555
Dan Talayco34089522010-02-07 23:07:41 -0800556 @param msg_type The type of message to receive. May be DEFAULT
Dan Talayco21c75c72010-02-12 22:59:24 -0800557 for all non-handled packets. The special type, the string "all"
558 will send all packets to the handler.
Dan Talayco34089522010-02-07 23:07:41 -0800559 @param handler The function to call when a message of the given
560 type is received.
561 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800562 # Should check type is valid
563 if not handler and msg_type in self.handlers.keys():
564 del self.handlers[msg_type]
565 return
566 self.handlers[msg_type] = handler
Dan Talayco34089522010-02-07 23:07:41 -0800567
Rich Laneb64ce3d2012-07-26 15:37:57 -0700568 def poll(self, exp_msg=None, timeout=-1):
Dan Talayco34089522010-02-07 23:07:41 -0800569 """
570 Wait for the next OF message received from the switch.
571
572 @param exp_msg If set, return only when this type of message
Dan Talayco48370102010-03-03 15:17:33 -0800573 is received (unless timeout occurs).
Rich Laneb64ce3d2012-07-26 15:37:57 -0700574
575 @param timeout Maximum number of seconds to wait for the message.
576 Pass -1 for the default timeout.
Dan Talayco34089522010-02-07 23:07:41 -0800577
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800578 @retval A pair (msg, pkt) where msg is a message object and pkt
579 the string representing the packet as received from the socket.
580 This allows additional parsing by the receiver if necessary.
581
Dan Talayco34089522010-02-07 23:07:41 -0800582 The data members in the message are in host endian order.
Dan Talayco48370102010-03-03 15:17:33 -0800583 If an error occurs, (None, None) is returned
Dan Talayco34089522010-02-07 23:07:41 -0800584 """
Dan Talayco34089522010-02-07 23:07:41 -0800585
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700586 exp_msg_str = "unspecified"
587 if exp_msg:
588 exp_msg_str = ofp.ofp_type_map.get(exp_msg, "unknown (%d)" %
589 exp_msg)
Rich Lane1879dc72013-03-11 22:08:51 -0700590
Ken Chiang77173992012-10-30 15:44:39 -0700591 if exp_msg is not None:
Rich Lane1879dc72013-03-11 22:08:51 -0700592 self.logger.debug("Poll for %s", exp_msg_str)
Ed Swierk9e55e282012-08-22 06:57:28 -0700593 else:
594 self.logger.debug("Poll for any OF message")
Rich Laneb64ce3d2012-07-26 15:37:57 -0700595
596 # Take the packet from the queue
Rich Lanec4f071b2012-07-11 17:25:57 -0700597 def grab():
598 if len(self.packets) > 0:
Ken Chiang77173992012-10-30 15:44:39 -0700599 if exp_msg is None:
Rich Lanec4f071b2012-07-11 17:25:57 -0700600 self.logger.debug("Looking for any packet")
601 (msg, pkt) = self.packets.pop(0)
602 return (msg, pkt)
603 else:
Rich Lane1879dc72013-03-11 22:08:51 -0700604 self.logger.debug("Looking for %s", exp_msg_str)
Rich Lanec4f071b2012-07-11 17:25:57 -0700605 for i in range(len(self.packets)):
606 msg = self.packets[i][0]
Rich Lane1879dc72013-03-11 22:08:51 -0700607 self.logger.debug("Checking packets[%d] (%s)", i, exp_msg_str)
Rich Laneb73808c2013-03-11 15:22:23 -0700608 if msg.type == exp_msg:
Rich Lanec4f071b2012-07-11 17:25:57 -0700609 (msg, pkt) = self.packets.pop(i)
610 return (msg, pkt)
611 # Not found
612 self.logger.debug("Packet not in queue")
Rich Laneb64ce3d2012-07-26 15:37:57 -0700613 return None
Dan Talayco21c75c72010-02-12 22:59:24 -0800614
Rich Lanec4f071b2012-07-11 17:25:57 -0700615 with self.packets_cv:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800616 ret = ofutils.timed_wait(self.packets_cv, grab, timeout=timeout)
Rich Lanec4f071b2012-07-11 17:25:57 -0700617
Rich Laneb64ce3d2012-07-26 15:37:57 -0700618 if ret != None:
619 (msg, pkt) = ret
620 self.logger.debug("Got message %s" % str(msg))
621 return (msg, pkt)
622 else:
623 return (None, None)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800624
Rich Lane8fbfd662013-03-11 15:30:44 -0700625 def transact(self, msg, timeout=-1):
Dan Talayco34089522010-02-07 23:07:41 -0800626 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800627 Run a message transaction with the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800628
629 Send the message in msg and wait for a reply with a matching
Dan Talayco21c75c72010-02-12 22:59:24 -0800630 transaction id. Transactions have the highest priority in
631 received message handling.
Dan Talaycoe37999f2010-02-09 15:27:12 -0800632
Dan Talayco21c75c72010-02-12 22:59:24 -0800633 @param msg The message object to send; must not be a string
Rich Lanee1da7ea2012-07-26 15:58:45 -0700634 @param timeout The timeout in seconds; if -1 use default.
Dan Talayco34089522010-02-07 23:07:41 -0800635 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800636
Rich Lane8fbfd662013-03-11 15:30:44 -0700637 if msg.xid == None:
Rich Laneb73808c2013-03-11 15:22:23 -0700638 msg.xid = ofutils.gen_xid()
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800639
Rich Laneb73808c2013-03-11 15:22:23 -0700640 self.logger.debug("Running transaction %d" % msg.xid)
Dan Talayco21c75c72010-02-12 22:59:24 -0800641
Rich Lane9aca1992012-07-11 17:26:31 -0700642 with self.xid_cv:
643 if self.xid:
644 self.logger.error("Can only run one transaction at a time")
645 return (None, None)
Dan Talaycof8de5182012-04-12 22:38:41 -0700646
Rich Laneb73808c2013-03-11 15:22:23 -0700647 self.xid = msg.xid
Dan Talaycod12b6612010-03-07 22:00:46 -0800648 self.xid_response = None
Rich Lane5c3151c2013-01-03 17:15:41 -0800649 self.message_send(msg.pack())
Rich Lane9aca1992012-07-11 17:26:31 -0700650
Rich Laneb73808c2013-03-11 15:22:23 -0700651 self.logger.debug("Waiting for transaction %d" % msg.xid)
Rich Lanecd97d3d2013-01-07 18:50:06 -0800652 ofutils.timed_wait(self.xid_cv, lambda: self.xid_response, timeout=timeout)
Rich Lane9aca1992012-07-11 17:26:31 -0700653
654 if self.xid_response:
655 (resp, pkt) = self.xid_response
656 self.xid_response = None
657 else:
658 (resp, pkt) = (None, None)
659
Dan Talayco09c2c592010-05-13 14:21:52 -0700660 if resp is None:
661 self.logger.warning("No response for xid " + str(self.xid))
662 return (resp, pkt)
Dan Talayco34089522010-02-07 23:07:41 -0800663
Rich Lane8fbfd662013-03-11 15:30:44 -0700664 def message_send(self, msg):
Dan Talayco34089522010-02-07 23:07:41 -0800665 """
666 Send the message to the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800667
Dan Talayco11c26e72010-03-07 22:03:57 -0800668 @param msg A string or OpenFlow message object to be forwarded to
669 the switch.
Dan Talayco21c75c72010-02-12 22:59:24 -0800670 """
671
Dan Talayco1b3f6902010-02-15 14:14:19 -0800672 if not self.switch_socket:
673 # Sending a string indicates the message is ready to go
Ed Swierk9e55e282012-08-22 06:57:28 -0700674 raise Exception("no socket")
Dan Talayco710438c2010-02-18 15:16:07 -0800675 #@todo If not string, try to pack
Dan Talayco21c75c72010-02-12 22:59:24 -0800676 if type(msg) != type(""):
Rich Lane8fbfd662013-03-11 15:30:44 -0700677 if msg.xid == None:
Rich Laneb73808c2013-03-11 15:22:23 -0700678 msg.xid = ofutils.gen_xid()
Ed Swierk9e55e282012-08-22 06:57:28 -0700679 outpkt = msg.pack()
Dan Talayco710438c2010-02-18 15:16:07 -0800680 else:
681 outpkt = msg
Dan Talayco21c75c72010-02-12 22:59:24 -0800682
Rich Lanef18980d2012-12-31 17:11:41 -0800683 msg_version, msg_type, msg_len, msg_xid = struct.unpack_from("!BBHL", outpkt)
Rich Lane5d63b9c2013-01-11 14:12:37 -0800684 self.logger.debug("Msg out: buf len %d. hdr.type %s. hdr.len %d hdr.version %d hdr.xid %d",
Rich Lanef18980d2012-12-31 17:11:41 -0800685 len(outpkt),
Rich Laned7b0ffa2013-03-08 15:53:42 -0800686 ofp.ofp_type_map.get(msg_type, "unknown (%d)" % msg_type),
Rich Lanecd97d3d2013-01-07 18:50:06 -0800687 msg_len,
Rich Lane5d63b9c2013-01-11 14:12:37 -0800688 msg_version,
689 msg_xid)
Ed Swierk9e55e282012-08-22 06:57:28 -0700690 if self.switch_socket.sendall(outpkt) is not None:
Rich Lane5c3151c2013-01-03 17:15:41 -0800691 raise AssertionError("failed to send message to switch")
Dan Talayco710438c2010-02-18 15:16:07 -0800692
Rich Lane5c3151c2013-01-03 17:15:41 -0800693 return 0 # for backwards compatibility
Dan Talayco21c75c72010-02-12 22:59:24 -0800694
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700695 def clear_queue(self):
696 """
697 Clear the input queue and report the number of messages
698 that were in it
699 """
700 enqueued_pkts = len(self.packets)
701 with self.packets_cv:
702 self.packets = []
703 return enqueued_pkts
704
Dan Talayco21c75c72010-02-12 22:59:24 -0800705 def __str__(self):
706 string = "Controller:\n"
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800707 string += " state " + self.dbg_state + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800708 string += " switch_addr " + str(self.switch_addr) + "\n"
709 string += " pending pkts " + str(len(self.packets)) + "\n"
710 string += " total pkts " + str(self.packets_total) + "\n"
711 string += " expired pkts " + str(self.packets_expired) + "\n"
712 string += " handled pkts " + str(self.packets_handled) + "\n"
Dan Talaycoe226eb12010-02-18 23:06:30 -0800713 string += " poll discards " + str(self.poll_discards) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800714 string += " parse errors " + str(self.parse_errors) + "\n"
715 string += " sock errrors " + str(self.socket_errors) + "\n"
716 string += " max pkts " + str(self.max_pkts) + "\n"
Dan Talayco69ca4d62012-11-15 11:50:22 -0800717 string += " target switch " + str(self.switch) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800718 string += " host " + str(self.host) + "\n"
719 string += " port " + str(self.port) + "\n"
720 string += " keep_alive " + str(self.keep_alive) + "\n"
Dan Talaycof8de5182012-04-12 22:38:41 -0700721 string += " pkt_in_run " + str(self.pkt_in_run) + "\n"
722 string += " pkt_in_dropped " + str(self.pkt_in_dropped) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800723 return string
724
725 def show(self):
726 print str(self)
727
728def sample_handler(controller, msg, pkt):
729 """
730 Sample message handler
731
732 This is the prototype for functions registered with the controller
733 class for packet reception
734
735 @param controller The controller calling the handler
736 @param msg The parsed message object
737 @param pkt The raw packet that was received on the socket. This is
738 in case the packet contains extra unparsed data.
739 @returns Boolean value indicating if the packet was handled. If
740 not handled, the packet is placed in the queue for pollers to received
741 """
742 pass