blob: f26b2d4f7840947cb381560a09034f5ad58e5378 [file] [log] [blame]
Dan Talayco34089522010-02-07 23:07:41 -08001"""
2OpenFlow Test Framework
3
4Controller class
5
6Provide the interface to the control channel to the switch under test.
7
8Class inherits from thread so as to run in background allowing
9asynchronous callbacks (if needed, not required). Also supports
10polling.
11
12The controller thread maintains a queue. Incoming messages that
13are not handled by a callback function are placed in this queue for
14poll calls.
15
16Callbacks and polling support specifying the message type
17
18@todo Support transaction semantics via xid
Dan Talayco1b3f6902010-02-15 14:14:19 -080019@todo Support select and listen on an administrative socket (or
20use a timeout to support clean shutdown).
21
22Currently only one connection is accepted during the life of
23the controller. There seems
24to be no clean way to interrupt an accept call. Using select that also listens
25on an administrative socket and can shut down the socket might work.
26
Dan Talayco34089522010-02-07 23:07:41 -080027"""
28
Dan Talayco34089522010-02-07 23:07:41 -080029import os
30import socket
31import time
Rich Lanecd97d3d2013-01-07 18:50:06 -080032import struct
33import select
34import logging
Dan Talayco34089522010-02-07 23:07:41 -080035from threading import Thread
36from threading import Lock
Dan Talayco21c75c72010-02-12 22:59:24 -080037from threading import Condition
Rich Lane9fd05682013-01-10 15:30:38 -080038import ofp
Rich Lanecd97d3d2013-01-07 18:50:06 -080039import ofutils
Dan Talayco48370102010-03-03 15:17:33 -080040
Dan Talaycof8de5182012-04-12 22:38:41 -070041
42FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.'
43 for x in range(256)])
44
45def hex_dump_buffer(src, length=16):
46 """
47 Convert src to a hex dump string and return the string
48 @param src The source buffer
49 @param length The number of bytes shown in each line
50 @returns A string showing the hex dump
51 """
52 result = ["\n"]
53 for i in xrange(0, len(src), length):
54 chars = src[i:i+length]
55 hex = ' '.join(["%02x" % ord(x) for x in chars])
56 printable = ''.join(["%s" % ((ord(x) <= 127 and
57 FILTER[ord(x)]) or '.') for x in chars])
58 result.append("%04x %-*s %s\n" % (i, length*3, hex, printable))
59 return ''.join(result)
60
Dan Talayco48370102010-03-03 15:17:33 -080061##@todo Find a better home for these identifiers (controller)
Glen Gibb741b1182010-07-08 16:43:58 -070062RCV_SIZE_DEFAULT = 32768
Dan Talayco48370102010-03-03 15:17:33 -080063LISTEN_QUEUE_SIZE = 1
Dan Talayco34089522010-02-07 23:07:41 -080064
65class Controller(Thread):
66 """
67 Class abstracting the control interface to the switch.
68
69 For receiving messages, two mechanism will be implemented. First,
70 query the interface with poll. Second, register to have a
71 function called by message type. The callback is passed the
72 message type as well as the raw packet (or message object)
73
74 One of the main purposes of this object is to translate between network
75 and host byte order. 'Above' this object, things should be in host
76 byte order.
Dan Talayco21c75c72010-02-12 22:59:24 -080077
78 @todo Consider using SocketServer for listening socket
79 @todo Test transaction code
80
81 @var rcv_size The receive size to use for receive calls
82 @var max_pkts The max size of the receive queue
83 @var keep_alive If true, listen for echo requests and respond w/
84 echo replies
Dan Talayco710438c2010-02-18 15:16:07 -080085 @var initial_hello If true, will send a hello message immediately
86 upon connecting to the switch
Dan Talayco69ca4d62012-11-15 11:50:22 -080087 @var switch If not None, do an active connection to the switch
Dan Talayco21c75c72010-02-12 22:59:24 -080088 @var host The host to use for connect
89 @var port The port to connect on
90 @var packets_total Total number of packets received
91 @var packets_expired Number of packets popped from queue as queue full
92 @var packets_handled Number of packets handled by something
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080093 @var dbg_state Debug indication of state
Dan Talayco34089522010-02-07 23:07:41 -080094 """
95
Dan Talayco69ca4d62012-11-15 11:50:22 -080096 def __init__(self, switch=None, host='127.0.0.1', port=6633, max_pkts=1024):
Dan Talayco21c75c72010-02-12 22:59:24 -080097 Thread.__init__(self)
Dan Talayco1b3f6902010-02-15 14:14:19 -080098 # Socket related
Dan Talayco21c75c72010-02-12 22:59:24 -080099 self.rcv_size = RCV_SIZE_DEFAULT
Dan Talayco1b3f6902010-02-15 14:14:19 -0800100 self.listen_socket = None
101 self.switch_socket = None
102 self.switch_addr = None
Dan Talayco710438c2010-02-18 15:16:07 -0800103 self.connect_cv = Condition()
Dan Talaycoe226eb12010-02-18 23:06:30 -0800104 self.message_cv = Condition()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800105
Rich Lane4dfd5e12012-12-22 19:48:01 -0800106 # Used to wake up the event loop from another thread
Rich Lanecd97d3d2013-01-07 18:50:06 -0800107 self.waker = ofutils.EventDescriptor()
Rich Lane32797542012-12-22 17:46:05 -0800108
Dan Talayco1b3f6902010-02-15 14:14:19 -0800109 # Counters
Dan Talayco21c75c72010-02-12 22:59:24 -0800110 self.socket_errors = 0
111 self.parse_errors = 0
Dan Talayco21c75c72010-02-12 22:59:24 -0800112 self.packets_total = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -0800113 self.packets_expired = 0
114 self.packets_handled = 0
Dan Talaycoe226eb12010-02-18 23:06:30 -0800115 self.poll_discards = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -0800116
117 # State
Dan Talayco21c75c72010-02-12 22:59:24 -0800118 self.sync = Lock()
119 self.handlers = {}
120 self.keep_alive = False
Dan Talayco710438c2010-02-18 15:16:07 -0800121 self.active = True
122 self.initial_hello = True
Dan Talayco1b3f6902010-02-15 14:14:19 -0800123
Rich Lanec4f071b2012-07-11 17:25:57 -0700124 # OpenFlow message/packet queue
125 # Protected by the packets_cv lock / condition variable
126 self.packets = []
127 self.packets_cv = Condition()
128
Dan Talayco1b3f6902010-02-15 14:14:19 -0800129 # Settings
130 self.max_pkts = max_pkts
Dan Talayco69ca4d62012-11-15 11:50:22 -0800131 self.switch = switch
132 self.passive = not self.switch
Dan Talayco48370102010-03-03 15:17:33 -0800133 self.host = host
134 self.port = port
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800135 self.dbg_state = "init"
Dan Talayco48370102010-03-03 15:17:33 -0800136 self.logger = logging.getLogger("controller")
Dan Talaycof8de5182012-04-12 22:38:41 -0700137 self.filter_packet_in = False # Drop "excessive" packet ins
138 self.pkt_in_run = 0 # Count on run of packet ins
139 self.pkt_in_filter_limit = 50 # Count on run of packet ins
140 self.pkt_in_dropped = 0 # Total dropped packet ins
141 self.transact_to = 15 # Transact timeout default value; add to config
Dan Talayco1b3f6902010-02-15 14:14:19 -0800142
Dan Talaycoe226eb12010-02-18 23:06:30 -0800143 # Transaction and message type waiting variables
144 # xid_cv: Condition variable (semaphore) for packet waiters
Dan Talayco21c75c72010-02-12 22:59:24 -0800145 # xid: Transaction ID being waited on
146 # xid_response: Transaction response message
147 self.xid_cv = Condition()
148 self.xid = None
149 self.xid_response = None
150
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800151 self.buffered_input = ""
Dan Talaycoe226eb12010-02-18 23:06:30 -0800152
Rich Lane207502e2012-12-31 14:29:12 -0800153 # Create listen socket
154 if self.passive:
155 self.logger.info("Create/listen at " + self.host + ":" +
156 str(self.port))
157 self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
158 self.listen_socket.setsockopt(socket.SOL_SOCKET,
159 socket.SO_REUSEADDR, 1)
160 self.listen_socket.bind((self.host, self.port))
161 self.listen_socket.listen(LISTEN_QUEUE_SIZE)
162
Dan Talaycof8de5182012-04-12 22:38:41 -0700163 def filter_packet(self, rawmsg, hdr):
164 """
165 Check if packet should be filtered
166
167 Currently filters packet in messages
168 @return Boolean, True if packet should be dropped
169 """
Rich Lane1622bbb2013-03-11 17:11:53 -0700170 # XXX didn't actually check for packet-in...
171 return False
Dan Talaycof8de5182012-04-12 22:38:41 -0700172 # Add check for packet in and rate limit
173 if self.filter_packet_in:
Rich Lanec4f071b2012-07-11 17:25:57 -0700174 # If we were dropping packets, report number dropped
175 # TODO dont drop expected packet ins
176 if self.pkt_in_run > self.pkt_in_filter_limit:
177 self.logger.debug("Dropped %d packet ins (%d total)"
178 % ((self.pkt_in_run -
179 self.pkt_in_filter_limit),
180 self.pkt_in_dropped))
181 self.pkt_in_run = 0
Dan Talaycof8de5182012-04-12 22:38:41 -0700182
183 return False
184
Dan Talaycod12b6612010-03-07 22:00:46 -0800185 def _pkt_handle(self, pkt):
186 """
187 Check for all packet handling conditions
188
189 Parse and verify message
190 Check if XID matches something waiting
191 Check if message is being expected for a poll operation
192 Check if keep alive is on and message is an echo request
193 Check if any registered handler wants the packet
194 Enqueue if none of those conditions is met
195
196 an echo request in case keep_alive is true, followed by
197 registered message handlers.
Glen Gibb6d467062010-07-08 16:15:08 -0700198 @param pkt The raw packet (string) which may contain multiple OF msgs
Dan Talaycod12b6612010-03-07 22:00:46 -0800199 """
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800200
201 # snag any left over data from last read()
202 pkt = self.buffered_input + pkt
203 self.buffered_input = ""
204
Glen Gibb6d467062010-07-08 16:15:08 -0700205 # Process each of the OF msgs inside the pkt
206 offset = 0
207 while offset < len(pkt):
Rich Lane1622bbb2013-03-11 17:11:53 -0700208 if offset + 8 > len(pkt):
209 break
210
Glen Gibb6d467062010-07-08 16:15:08 -0700211 # Parse the header to get type
Rich Lane1622bbb2013-03-11 17:11:53 -0700212 hdr_version, hdr_type, hdr_length, hdr_xid = ofp.message.parse_header(pkt[offset:])
Dan Talaycod12b6612010-03-07 22:00:46 -0800213
Glen Gibb6d467062010-07-08 16:15:08 -0700214 # Extract the raw message bytes
Rich Lane1622bbb2013-03-11 17:11:53 -0700215 if (offset + hdr_length) > len(pkt):
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800216 break
Rich Lane1622bbb2013-03-11 17:11:53 -0700217 rawmsg = pkt[offset : offset + hdr_length]
218 offset += hdr_length
Dan Talaycof8de5182012-04-12 22:38:41 -0700219
Rich Lane1622bbb2013-03-11 17:11:53 -0700220 #if self.filter_packet(rawmsg, hdr):
221 # continue
Dan Talaycof8de5182012-04-12 22:38:41 -0700222
Rich Lane1622bbb2013-03-11 17:11:53 -0700223 self.logger.debug("Msg in: buf len %d. hdr_type %s. hdr_len %d hdr_version %d hdr_xid %d" %
224 (len(pkt), ofp.ofp_type_map[hdr_type], hdr_length, hdr_version, hdr_xid))
225 if hdr_version < ofp.OFP_VERSION:
Rich Lanec44b6242013-01-10 12:23:54 -0800226 self.logger.error("Switch only supports up to OpenFlow version %d (OFTest version is %d)",
Rich Lane1622bbb2013-03-11 17:11:53 -0700227 hdr_version, ofp.OFP_VERSION)
Rich Lanec44b6242013-01-10 12:23:54 -0800228 print "Switch only supports up to OpenFlow version %d (OFTest version is %d)" % \
Rich Lane1622bbb2013-03-11 17:11:53 -0700229 (hdr_version, ofp.OFP_VERSION)
Ken Chiangadc950f2012-10-05 13:50:03 -0700230 self.disconnect()
Dan Talaycof8de5182012-04-12 22:38:41 -0700231 return
Dan Talaycod12b6612010-03-07 22:00:46 -0800232
Rich Lanef6883512013-03-11 17:00:09 -0700233 msg = ofp.message.parse_message(rawmsg)
Glen Gibb6d467062010-07-08 16:15:08 -0700234 if not msg:
235 self.parse_errors += 1
236 self.logger.warn("Could not parse message")
237 continue
238
Rich Lanec4f071b2012-07-11 17:25:57 -0700239 with self.sync:
240 # Check if transaction is waiting
241 with self.xid_cv:
Rich Lane1622bbb2013-03-11 17:11:53 -0700242 if self.xid and hdr_xid == self.xid:
243 self.logger.debug("Matched expected XID " + str(hdr_xid))
Rich Lanec4f071b2012-07-11 17:25:57 -0700244 self.xid_response = (msg, rawmsg)
245 self.xid = None
246 self.xid_cv.notify()
247 continue
Glen Gibb6d467062010-07-08 16:15:08 -0700248
Rich Lanec4f071b2012-07-11 17:25:57 -0700249 # Check if keep alive is set; if so, respond to echo requests
250 if self.keep_alive:
Rich Lane1622bbb2013-03-11 17:11:53 -0700251 if hdr_type == ofp.OFPT_ECHO_REQUEST:
Rich Lanec4f071b2012-07-11 17:25:57 -0700252 self.logger.debug("Responding to echo request")
Rich Lane78ef8b92013-01-10 12:19:23 -0800253 rep = ofp.message.echo_reply()
Rich Lane1622bbb2013-03-11 17:11:53 -0700254 rep.xid = hdr_xid
Rich Lanec4f071b2012-07-11 17:25:57 -0700255 # Ignoring additional data
Rich Lane8fbfd662013-03-11 15:30:44 -0700256 self.message_send(rep.pack())
Rich Lanec4f071b2012-07-11 17:25:57 -0700257 continue
Glen Gibb6d467062010-07-08 16:15:08 -0700258
Rich Lane5d63b9c2013-01-11 14:12:37 -0800259 # Log error messages
Rich Lane1622bbb2013-03-11 17:11:53 -0700260 if hdr_type == ofp.OFPT_ERROR:
Rich Laneb73808c2013-03-11 15:22:23 -0700261 if msg.err_type in ofp.ofp_error_type_map:
262 type_str = ofp.ofp_error_type_map[msg.err_type]
263 if msg.err_type == ofp.OFPET_HELLO_FAILED:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800264 code_map = ofp.ofp_hello_failed_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700265 elif msg.err_type == ofp.OFPET_BAD_REQUEST:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800266 code_map = ofp.ofp_bad_request_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700267 elif msg.err_type == ofp.OFPET_BAD_ACTION:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800268 code_map = ofp.ofp_bad_action_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700269 elif msg.err_type == ofp.OFPET_FLOW_MOD_FAILED:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800270 code_map = ofp.ofp_flow_mod_failed_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700271 elif msg.err_type == ofp.OFPET_PORT_MOD_FAILED:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800272 code_map = ofp.ofp_port_mod_failed_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700273 elif msg.err_type == ofp.OFPET_QUEUE_OP_FAILED:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800274 code_map = ofp.ofp_queue_op_failed_code_map
275 else:
276 code_map = None
277
278 if code_map and msg.code in code_map:
279 code_str = code_map[msg.code]
280 else:
281 code_str = "unknown"
282 else:
283 type_str = "unknown"
284 self.logger.warn("Received error message: xid=%d type=%s (%d) code=%s (%d)",
Rich Lane1622bbb2013-03-11 17:11:53 -0700285 hdr_xid, type_str, msg.err_type, code_str, msg.code)
Rich Lane5d63b9c2013-01-11 14:12:37 -0800286
Rich Lanec4f071b2012-07-11 17:25:57 -0700287 # Now check for message handlers; preference is given to
288 # handlers for a specific packet
289 handled = False
Rich Lane1622bbb2013-03-11 17:11:53 -0700290 if hdr_type in self.handlers.keys():
291 handled = self.handlers[hdr_type](self, msg, rawmsg)
Rich Lanec4f071b2012-07-11 17:25:57 -0700292 if not handled and ("all" in self.handlers.keys()):
293 handled = self.handlers["all"](self, msg, rawmsg)
Glen Gibb6d467062010-07-08 16:15:08 -0700294
Rich Lanec4f071b2012-07-11 17:25:57 -0700295 if not handled: # Not handled, enqueue
Rich Lane1622bbb2013-03-11 17:11:53 -0700296 self.logger.debug("Enqueuing pkt type " + ofp.ofp_type_map[hdr_type])
Rich Lanec4f071b2012-07-11 17:25:57 -0700297 with self.packets_cv:
298 if len(self.packets) >= self.max_pkts:
299 self.packets.pop(0)
300 self.packets_expired += 1
301 self.packets.append((msg, rawmsg))
302 self.packets_cv.notify_all()
303 self.packets_total += 1
304 else:
305 self.packets_handled += 1
306 self.logger.debug("Message handled by callback")
Glen Gibb6d467062010-07-08 16:15:08 -0700307
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800308 # end of 'while offset < len(pkt)'
309 # note that if offset = len(pkt), this is
310 # appends a harmless empty string
311 self.buffered_input += pkt[offset:]
Dan Talaycod12b6612010-03-07 22:00:46 -0800312
Dan Talayco710438c2010-02-18 15:16:07 -0800313 def _socket_ready_handle(self, s):
314 """
315 Handle an input-ready socket
Dan Talaycof8de5182012-04-12 22:38:41 -0700316
Dan Talayco710438c2010-02-18 15:16:07 -0800317 @param s The socket object that is ready
Dan Talaycof8de5182012-04-12 22:38:41 -0700318 @returns 0 on success, -1 on error
Dan Talayco710438c2010-02-18 15:16:07 -0800319 """
320
Dan Talayco69ca4d62012-11-15 11:50:22 -0800321 if self.passive and s and s == self.listen_socket:
Dan Talayco710438c2010-02-18 15:16:07 -0800322 if self.switch_socket:
Rich Lanee1da7ea2012-07-26 15:58:45 -0700323 self.logger.warning("Ignoring incoming connection; already connected to switch")
Rich Laneb4f8ecb2012-09-25 09:36:26 -0700324 (sock, addr) = self.listen_socket.accept()
325 sock.close()
Rich Lanee1da7ea2012-07-26 15:58:45 -0700326 return 0
Dan Talayco710438c2010-02-18 15:16:07 -0800327
Ken Chiange875baf2012-10-09 15:24:40 -0700328 try:
329 (sock, addr) = self.listen_socket.accept()
330 except:
331 self.logger.warning("Error on listen socket accept")
332 return -1
Ken Chiang77173992012-10-30 15:44:39 -0700333 self.logger.info(self.host+":"+str(self.port)+": Incoming connection from "+str(addr))
Rich Lanee1da7ea2012-07-26 15:58:45 -0700334
Rich Laneee3586c2012-07-11 17:26:02 -0700335 with self.connect_cv:
Rich Lanee1da7ea2012-07-26 15:58:45 -0700336 (self.switch_socket, self.switch_addr) = (sock, addr)
Rich Lane82ef1832012-12-22 17:04:35 -0800337 self.switch_socket.setsockopt(socket.IPPROTO_TCP,
338 socket.TCP_NODELAY, True)
Rich Lane1a8d5aa2012-10-08 15:40:03 -0700339 if self.initial_hello:
Rich Lane78ef8b92013-01-10 12:19:23 -0800340 self.message_send(ofp.message.hello())
Rich Lanee1da7ea2012-07-26 15:58:45 -0700341 self.connect_cv.notify() # Notify anyone waiting
Dan Talaycof8de5182012-04-12 22:38:41 -0700342 elif s and s == self.switch_socket:
343 for idx in range(3): # debug: try a couple of times
344 try:
345 pkt = self.switch_socket.recv(self.rcv_size)
346 except:
347 self.logger.warning("Error on switch read")
348 return -1
349
350 if not self.active:
351 return 0
352
353 if len(pkt) == 0:
354 self.logger.warning("Zero-length switch read, %d" % idx)
355 else:
356 break
Dan Talayco710438c2010-02-18 15:16:07 -0800357
Dan Talaycof8de5182012-04-12 22:38:41 -0700358 if len(pkt) == 0: # Still no packet
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700359 self.logger.warning("Zero-length switch read; closing cxn")
Dan Talaycof8de5182012-04-12 22:38:41 -0700360 self.logger.info(str(self))
361 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800362
Dan Talaycod12b6612010-03-07 22:00:46 -0800363 self._pkt_handle(pkt)
Rich Lane4dfd5e12012-12-22 19:48:01 -0800364 elif s and s == self.waker:
365 self.waker.wait()
Dan Talayco710438c2010-02-18 15:16:07 -0800366 else:
Dan Talayco48370102010-03-03 15:17:33 -0800367 self.logger.error("Unknown socket ready: " + str(s))
Dan Talaycof8de5182012-04-12 22:38:41 -0700368 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800369
Dan Talaycof8de5182012-04-12 22:38:41 -0700370 return 0
Dan Talayco710438c2010-02-18 15:16:07 -0800371
Dan Talayco69ca4d62012-11-15 11:50:22 -0800372 def active_connect(self):
373 """
374 Actively connect to a switch IP addr
375 """
376 try:
377 self.logger.info("Trying active connection to %s" % self.switch)
378 soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
379 soc.connect((self.switch, self.port))
380 self.logger.info("Connected to " + self.switch + " on " +
381 str(self.port))
Rich Lane82ef1832012-12-22 17:04:35 -0800382 soc.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800383 self.switch_addr = (self.switch, self.port)
384 return soc
385 except (StandardError, socket.error), e:
386 self.logger.error("Could not connect to %s at %d:: %s" %
387 (self.switch, self.port, str(e)))
388 return None
389
Rich Lane32797542012-12-22 17:46:05 -0800390 def wakeup(self):
391 """
392 Wake up the event loop, presumably from another thread.
393 """
Rich Lane4dfd5e12012-12-22 19:48:01 -0800394 self.waker.notify()
Rich Lane32797542012-12-22 17:46:05 -0800395
396 def sockets(self):
397 """
398 Return list of sockets to select on.
399 """
Rich Lane4dfd5e12012-12-22 19:48:01 -0800400 socs = [self.listen_socket, self.switch_socket, self.waker]
Rich Lane32797542012-12-22 17:46:05 -0800401 return [x for x in socs if x]
402
Dan Talayco1b3f6902010-02-15 14:14:19 -0800403 def run(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800404 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800405 Activity function for class
Dan Talayco21c75c72010-02-12 22:59:24 -0800406
Dan Talayco1b3f6902010-02-15 14:14:19 -0800407 Assumes connection to switch already exists. Listens on
408 switch_socket for messages until an error (or zero len pkt)
409 occurs.
Dan Talayco21c75c72010-02-12 22:59:24 -0800410
Dan Talayco1b3f6902010-02-15 14:14:19 -0800411 When there is a message on the socket, check for handlers; queue the
412 packet if no one handles the packet.
413
414 See note for controller describing the limitation of a single
415 connection for now.
416 """
417
Rich Lane207502e2012-12-31 14:29:12 -0800418 self.dbg_state = "running"
Ken Chiangadc950f2012-10-05 13:50:03 -0700419
Dan Talayco710438c2010-02-18 15:16:07 -0800420 while self.active:
Dan Talayco710438c2010-02-18 15:16:07 -0800421 try:
422 sel_in, sel_out, sel_err = \
Rich Lane32797542012-12-22 17:46:05 -0800423 select.select(self.sockets(), [], self.sockets(), 1)
Dan Talayco710438c2010-02-18 15:16:07 -0800424 except:
425 print sys.exc_info()
Ken Chiangadc950f2012-10-05 13:50:03 -0700426 self.logger.error("Select error, disconnecting")
427 self.disconnect()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800428
Dan Talayco710438c2010-02-18 15:16:07 -0800429 for s in sel_err:
Ken Chiangadc950f2012-10-05 13:50:03 -0700430 self.logger.error("Got socket error on: " + str(s) + ", disconnecting")
431 self.disconnect()
Dan Talaycof8de5182012-04-12 22:38:41 -0700432
433 for s in sel_in:
434 if self._socket_ready_handle(s) == -1:
Ken Chiangadc950f2012-10-05 13:50:03 -0700435 self.disconnect()
Dan Talayco710438c2010-02-18 15:16:07 -0800436
Dan Talayco710438c2010-02-18 15:16:07 -0800437 # End of main loop
438 self.dbg_state = "closing"
Dan Talayco48370102010-03-03 15:17:33 -0800439 self.logger.info("Exiting controller thread")
Dan Talayco710438c2010-02-18 15:16:07 -0800440 self.shutdown()
441
Rich Lane8806bc42012-07-26 19:18:37 -0700442 def connect(self, timeout=-1):
Dan Talayco710438c2010-02-18 15:16:07 -0800443 """
444 Connect to the switch
445
Rich Lane8806bc42012-07-26 19:18:37 -0700446 @param timeout Block for up to timeout seconds. Pass -1 for the default.
Dan Talayco710438c2010-02-18 15:16:07 -0800447 @return Boolean, True if connected
448 """
449
Dan Talayco69ca4d62012-11-15 11:50:22 -0800450 if not self.passive: # Do active connection now
451 self.logger.info("Attempting to connect to %s on port %s" %
452 (self.switch, str(self.port)))
453 soc = self.active_connect()
454 if soc:
455 self.logger.info("Connected to %s", self.switch)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800456 self.dbg_state = "running"
457 self.switch_socket = soc
Rich Lane32797542012-12-22 17:46:05 -0800458 self.wakeup()
Dan Talayco69ca4d62012-11-15 11:50:22 -0800459 with self.connect_cv:
460 if self.initial_hello:
461 self.message_send(hello())
462 self.connect_cv.notify() # Notify anyone waiting
463 else:
464 self.logger.error("Could not actively connect to switch %s",
465 self.switch)
466 self.active = False
467 else:
468 with self.connect_cv:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800469 ofutils.timed_wait(self.connect_cv, lambda: self.switch_socket,
470 timeout=timeout)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800471
Dan Talayco710438c2010-02-18 15:16:07 -0800472 return self.switch_socket is not None
473
Ken Chiangadc950f2012-10-05 13:50:03 -0700474 def disconnect(self, timeout=-1):
475 """
476 If connected to a switch, disconnect.
477 """
478 if self.switch_socket:
Ken Chiangadc950f2012-10-05 13:50:03 -0700479 self.switch_socket.close()
480 self.switch_socket = None
481 self.switch_addr = None
Ken Chiang74be4722012-12-21 13:07:03 -0800482 with self.packets_cv:
483 self.packets = []
Ken Chiange875baf2012-10-09 15:24:40 -0700484 with self.connect_cv:
485 self.connect_cv.notifyAll()
Ken Chiangadc950f2012-10-05 13:50:03 -0700486
487 def wait_disconnected(self, timeout=-1):
488 """
489 @param timeout Block for up to timeout seconds. Pass -1 for the default.
490 @return Boolean, True if disconnected
491 """
492
Ken Chiange875baf2012-10-09 15:24:40 -0700493 with self.connect_cv:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800494 ofutils.timed_wait(self.connect_cv,
495 lambda: True if not self.switch_socket else None,
496 timeout=timeout)
Ken Chiangadc950f2012-10-05 13:50:03 -0700497 return self.switch_socket is None
498
Dan Talayco710438c2010-02-18 15:16:07 -0800499 def kill(self):
500 """
501 Force the controller thread to quit
Dan Talayco710438c2010-02-18 15:16:07 -0800502 """
503 self.active = False
Rich Lane32797542012-12-22 17:46:05 -0800504 self.wakeup()
Rich Lane376bb402012-12-31 15:20:16 -0800505 self.join()
Dan Talayco21c75c72010-02-12 22:59:24 -0800506
Dan Talayco1b3f6902010-02-15 14:14:19 -0800507 def shutdown(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800508 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800509 Shutdown the controller closing all sockets
Dan Talayco21c75c72010-02-12 22:59:24 -0800510
Dan Talayco1b3f6902010-02-15 14:14:19 -0800511 @todo Might want to synchronize shutdown with self.sync...
512 """
Dan Talaycof8de5182012-04-12 22:38:41 -0700513
Dan Talayco710438c2010-02-18 15:16:07 -0800514 self.active = False
515 try:
516 self.switch_socket.shutdown(socket.SHUT_RDWR)
517 except:
Dan Talayco48370102010-03-03 15:17:33 -0800518 self.logger.info("Ignoring switch soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800519 self.switch_socket = None
Dan Talayco1b3f6902010-02-15 14:14:19 -0800520
Dan Talayco710438c2010-02-18 15:16:07 -0800521 try:
522 self.listen_socket.shutdown(socket.SHUT_RDWR)
523 except:
Dan Talayco48370102010-03-03 15:17:33 -0800524 self.logger.info("Ignoring listen soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800525 self.listen_socket = None
Dan Talaycof8de5182012-04-12 22:38:41 -0700526
Rich Laneee3586c2012-07-11 17:26:02 -0700527 # Wakeup condition variables on which controller may be wait
528 with self.xid_cv:
529 self.xid_cv.notifyAll()
Dan Talaycof8de5182012-04-12 22:38:41 -0700530
Rich Laneee3586c2012-07-11 17:26:02 -0700531 with self.connect_cv:
532 self.connect_cv.notifyAll()
Dan Talaycof8de5182012-04-12 22:38:41 -0700533
Rich Lane32797542012-12-22 17:46:05 -0800534 self.wakeup()
Dan Talayco710438c2010-02-18 15:16:07 -0800535 self.dbg_state = "down"
536
Dan Talayco34089522010-02-07 23:07:41 -0800537 def register(self, msg_type, handler):
538 """
539 Register a callback to receive a specific message type.
540
541 Only one handler may be registered for a given message type.
Dan Talaycod12b6612010-03-07 22:00:46 -0800542
543 WARNING: A lock is held during the handler call back, so
544 the handler should not make any blocking calls
545
Dan Talayco34089522010-02-07 23:07:41 -0800546 @param msg_type The type of message to receive. May be DEFAULT
Dan Talayco21c75c72010-02-12 22:59:24 -0800547 for all non-handled packets. The special type, the string "all"
548 will send all packets to the handler.
Dan Talayco34089522010-02-07 23:07:41 -0800549 @param handler The function to call when a message of the given
550 type is received.
551 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800552 # Should check type is valid
553 if not handler and msg_type in self.handlers.keys():
554 del self.handlers[msg_type]
555 return
556 self.handlers[msg_type] = handler
Dan Talayco34089522010-02-07 23:07:41 -0800557
Rich Laneb64ce3d2012-07-26 15:37:57 -0700558 def poll(self, exp_msg=None, timeout=-1):
Dan Talayco34089522010-02-07 23:07:41 -0800559 """
560 Wait for the next OF message received from the switch.
561
562 @param exp_msg If set, return only when this type of message
Dan Talayco48370102010-03-03 15:17:33 -0800563 is received (unless timeout occurs).
Rich Laneb64ce3d2012-07-26 15:37:57 -0700564
565 @param timeout Maximum number of seconds to wait for the message.
566 Pass -1 for the default timeout.
Dan Talayco34089522010-02-07 23:07:41 -0800567
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800568 @retval A pair (msg, pkt) where msg is a message object and pkt
569 the string representing the packet as received from the socket.
570 This allows additional parsing by the receiver if necessary.
571
Dan Talayco34089522010-02-07 23:07:41 -0800572 The data members in the message are in host endian order.
Dan Talayco48370102010-03-03 15:17:33 -0800573 If an error occurs, (None, None) is returned
Dan Talayco34089522010-02-07 23:07:41 -0800574 """
Dan Talayco34089522010-02-07 23:07:41 -0800575
Ken Chiang77173992012-10-30 15:44:39 -0700576 if exp_msg is not None:
Rich Laned7b0ffa2013-03-08 15:53:42 -0800577 self.logger.debug("Poll for %s" % ofp.ofp_type_map[exp_msg])
Ed Swierk9e55e282012-08-22 06:57:28 -0700578 else:
579 self.logger.debug("Poll for any OF message")
Rich Laneb64ce3d2012-07-26 15:37:57 -0700580
581 # Take the packet from the queue
Rich Lanec4f071b2012-07-11 17:25:57 -0700582 def grab():
583 if len(self.packets) > 0:
Ken Chiang77173992012-10-30 15:44:39 -0700584 if exp_msg is None:
Rich Lanec4f071b2012-07-11 17:25:57 -0700585 self.logger.debug("Looking for any packet")
586 (msg, pkt) = self.packets.pop(0)
587 return (msg, pkt)
588 else:
Rich Laned7b0ffa2013-03-08 15:53:42 -0800589 self.logger.debug("Looking for %s" % ofp.ofp_type_map[exp_msg])
Rich Lanec4f071b2012-07-11 17:25:57 -0700590 for i in range(len(self.packets)):
591 msg = self.packets[i][0]
Rich Laneb73808c2013-03-11 15:22:23 -0700592 self.logger.debug("Checking packets[%d] (%s)" % (i, ofp.ofp_type_map[msg.type]))
593 if msg.type == exp_msg:
Rich Lanec4f071b2012-07-11 17:25:57 -0700594 (msg, pkt) = self.packets.pop(i)
595 return (msg, pkt)
596 # Not found
597 self.logger.debug("Packet not in queue")
Rich Laneb64ce3d2012-07-26 15:37:57 -0700598 return None
Dan Talayco21c75c72010-02-12 22:59:24 -0800599
Rich Lanec4f071b2012-07-11 17:25:57 -0700600 with self.packets_cv:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800601 ret = ofutils.timed_wait(self.packets_cv, grab, timeout=timeout)
Rich Lanec4f071b2012-07-11 17:25:57 -0700602
Rich Laneb64ce3d2012-07-26 15:37:57 -0700603 if ret != None:
604 (msg, pkt) = ret
605 self.logger.debug("Got message %s" % str(msg))
606 return (msg, pkt)
607 else:
608 return (None, None)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800609
Rich Lane8fbfd662013-03-11 15:30:44 -0700610 def transact(self, msg, timeout=-1):
Dan Talayco34089522010-02-07 23:07:41 -0800611 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800612 Run a message transaction with the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800613
614 Send the message in msg and wait for a reply with a matching
Dan Talayco21c75c72010-02-12 22:59:24 -0800615 transaction id. Transactions have the highest priority in
616 received message handling.
Dan Talaycoe37999f2010-02-09 15:27:12 -0800617
Dan Talayco21c75c72010-02-12 22:59:24 -0800618 @param msg The message object to send; must not be a string
Rich Lanee1da7ea2012-07-26 15:58:45 -0700619 @param timeout The timeout in seconds; if -1 use default.
Dan Talayco34089522010-02-07 23:07:41 -0800620 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800621
Rich Lane8fbfd662013-03-11 15:30:44 -0700622 if msg.xid == None:
Rich Laneb73808c2013-03-11 15:22:23 -0700623 msg.xid = ofutils.gen_xid()
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800624
Rich Laneb73808c2013-03-11 15:22:23 -0700625 self.logger.debug("Running transaction %d" % msg.xid)
Dan Talayco21c75c72010-02-12 22:59:24 -0800626
Rich Lane9aca1992012-07-11 17:26:31 -0700627 with self.xid_cv:
628 if self.xid:
629 self.logger.error("Can only run one transaction at a time")
630 return (None, None)
Dan Talaycof8de5182012-04-12 22:38:41 -0700631
Rich Laneb73808c2013-03-11 15:22:23 -0700632 self.xid = msg.xid
Dan Talaycod12b6612010-03-07 22:00:46 -0800633 self.xid_response = None
Rich Lane5c3151c2013-01-03 17:15:41 -0800634 self.message_send(msg.pack())
Rich Lane9aca1992012-07-11 17:26:31 -0700635
Rich Laneb73808c2013-03-11 15:22:23 -0700636 self.logger.debug("Waiting for transaction %d" % msg.xid)
Rich Lanecd97d3d2013-01-07 18:50:06 -0800637 ofutils.timed_wait(self.xid_cv, lambda: self.xid_response, timeout=timeout)
Rich Lane9aca1992012-07-11 17:26:31 -0700638
639 if self.xid_response:
640 (resp, pkt) = self.xid_response
641 self.xid_response = None
642 else:
643 (resp, pkt) = (None, None)
644
Dan Talayco09c2c592010-05-13 14:21:52 -0700645 if resp is None:
646 self.logger.warning("No response for xid " + str(self.xid))
647 return (resp, pkt)
Dan Talayco34089522010-02-07 23:07:41 -0800648
Rich Lane8fbfd662013-03-11 15:30:44 -0700649 def message_send(self, msg):
Dan Talayco34089522010-02-07 23:07:41 -0800650 """
651 Send the message to the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800652
Dan Talayco11c26e72010-03-07 22:03:57 -0800653 @param msg A string or OpenFlow message object to be forwarded to
654 the switch.
Dan Talayco21c75c72010-02-12 22:59:24 -0800655 """
656
Dan Talayco1b3f6902010-02-15 14:14:19 -0800657 if not self.switch_socket:
658 # Sending a string indicates the message is ready to go
Ed Swierk9e55e282012-08-22 06:57:28 -0700659 raise Exception("no socket")
Dan Talayco710438c2010-02-18 15:16:07 -0800660 #@todo If not string, try to pack
Dan Talayco21c75c72010-02-12 22:59:24 -0800661 if type(msg) != type(""):
Rich Lane8fbfd662013-03-11 15:30:44 -0700662 if msg.xid == None:
Rich Laneb73808c2013-03-11 15:22:23 -0700663 msg.xid = ofutils.gen_xid()
Ed Swierk9e55e282012-08-22 06:57:28 -0700664 outpkt = msg.pack()
Dan Talayco710438c2010-02-18 15:16:07 -0800665 else:
666 outpkt = msg
Dan Talayco21c75c72010-02-12 22:59:24 -0800667
Rich Lanef18980d2012-12-31 17:11:41 -0800668 msg_version, msg_type, msg_len, msg_xid = struct.unpack_from("!BBHL", outpkt)
Rich Lane5d63b9c2013-01-11 14:12:37 -0800669 self.logger.debug("Msg out: buf len %d. hdr.type %s. hdr.len %d hdr.version %d hdr.xid %d",
Rich Lanef18980d2012-12-31 17:11:41 -0800670 len(outpkt),
Rich Laned7b0ffa2013-03-08 15:53:42 -0800671 ofp.ofp_type_map.get(msg_type, "unknown (%d)" % msg_type),
Rich Lanecd97d3d2013-01-07 18:50:06 -0800672 msg_len,
Rich Lane5d63b9c2013-01-11 14:12:37 -0800673 msg_version,
674 msg_xid)
Ed Swierk9e55e282012-08-22 06:57:28 -0700675 if self.switch_socket.sendall(outpkt) is not None:
Rich Lane5c3151c2013-01-03 17:15:41 -0800676 raise AssertionError("failed to send message to switch")
Dan Talayco710438c2010-02-18 15:16:07 -0800677
Rich Lane5c3151c2013-01-03 17:15:41 -0800678 return 0 # for backwards compatibility
Dan Talayco21c75c72010-02-12 22:59:24 -0800679
680 def __str__(self):
681 string = "Controller:\n"
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800682 string += " state " + self.dbg_state + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800683 string += " switch_addr " + str(self.switch_addr) + "\n"
684 string += " pending pkts " + str(len(self.packets)) + "\n"
685 string += " total pkts " + str(self.packets_total) + "\n"
686 string += " expired pkts " + str(self.packets_expired) + "\n"
687 string += " handled pkts " + str(self.packets_handled) + "\n"
Dan Talaycoe226eb12010-02-18 23:06:30 -0800688 string += " poll discards " + str(self.poll_discards) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800689 string += " parse errors " + str(self.parse_errors) + "\n"
690 string += " sock errrors " + str(self.socket_errors) + "\n"
691 string += " max pkts " + str(self.max_pkts) + "\n"
Dan Talayco69ca4d62012-11-15 11:50:22 -0800692 string += " target switch " + str(self.switch) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800693 string += " host " + str(self.host) + "\n"
694 string += " port " + str(self.port) + "\n"
695 string += " keep_alive " + str(self.keep_alive) + "\n"
Dan Talaycof8de5182012-04-12 22:38:41 -0700696 string += " pkt_in_run " + str(self.pkt_in_run) + "\n"
697 string += " pkt_in_dropped " + str(self.pkt_in_dropped) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800698 return string
699
700 def show(self):
701 print str(self)
702
703def sample_handler(controller, msg, pkt):
704 """
705 Sample message handler
706
707 This is the prototype for functions registered with the controller
708 class for packet reception
709
710 @param controller The controller calling the handler
711 @param msg The parsed message object
712 @param pkt The raw packet that was received on the socket. This is
713 in case the packet contains extra unparsed data.
714 @returns Boolean value indicating if the packet was handled. If
715 not handled, the packet is placed in the queue for pollers to received
716 """
717 pass