blob: d6d2c7022b04aef22f88ef0a2d57dfbc83a86f5c [file] [log] [blame]
Dan Talayco34089522010-02-07 23:07:41 -08001"""
2OpenFlow Test Framework
3
4Controller class
5
6Provide the interface to the control channel to the switch under test.
7
8Class inherits from thread so as to run in background allowing
9asynchronous callbacks (if needed, not required). Also supports
10polling.
11
12The controller thread maintains a queue. Incoming messages that
13are not handled by a callback function are placed in this queue for
14poll calls.
15
16Callbacks and polling support specifying the message type
17
18@todo Support transaction semantics via xid
Dan Talayco1b3f6902010-02-15 14:14:19 -080019@todo Support select and listen on an administrative socket (or
20use a timeout to support clean shutdown).
21
22Currently only one connection is accepted during the life of
23the controller. There seems
24to be no clean way to interrupt an accept call. Using select that also listens
25on an administrative socket and can shut down the socket might work.
26
Dan Talayco34089522010-02-07 23:07:41 -080027"""
28
Dan Talayco34089522010-02-07 23:07:41 -080029import os
30import socket
31import time
Rich Lanecd97d3d2013-01-07 18:50:06 -080032import struct
33import select
34import logging
Dan Talayco34089522010-02-07 23:07:41 -080035from threading import Thread
36from threading import Lock
Dan Talayco21c75c72010-02-12 22:59:24 -080037from threading import Condition
Rich Lane9fd05682013-01-10 15:30:38 -080038import ofp
Rich Lanecd97d3d2013-01-07 18:50:06 -080039import ofutils
Dan Talayco48370102010-03-03 15:17:33 -080040
Dan Talaycof8de5182012-04-12 22:38:41 -070041
42FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.'
43 for x in range(256)])
44
45def hex_dump_buffer(src, length=16):
46 """
47 Convert src to a hex dump string and return the string
48 @param src The source buffer
49 @param length The number of bytes shown in each line
50 @returns A string showing the hex dump
51 """
52 result = ["\n"]
53 for i in xrange(0, len(src), length):
54 chars = src[i:i+length]
55 hex = ' '.join(["%02x" % ord(x) for x in chars])
56 printable = ''.join(["%s" % ((ord(x) <= 127 and
57 FILTER[ord(x)]) or '.') for x in chars])
58 result.append("%04x %-*s %s\n" % (i, length*3, hex, printable))
59 return ''.join(result)
60
Dan Talayco48370102010-03-03 15:17:33 -080061##@todo Find a better home for these identifiers (controller)
Glen Gibb741b1182010-07-08 16:43:58 -070062RCV_SIZE_DEFAULT = 32768
Dan Talayco48370102010-03-03 15:17:33 -080063LISTEN_QUEUE_SIZE = 1
Dan Talayco34089522010-02-07 23:07:41 -080064
65class Controller(Thread):
66 """
67 Class abstracting the control interface to the switch.
68
69 For receiving messages, two mechanism will be implemented. First,
70 query the interface with poll. Second, register to have a
71 function called by message type. The callback is passed the
72 message type as well as the raw packet (or message object)
73
74 One of the main purposes of this object is to translate between network
75 and host byte order. 'Above' this object, things should be in host
76 byte order.
Dan Talayco21c75c72010-02-12 22:59:24 -080077
78 @todo Consider using SocketServer for listening socket
79 @todo Test transaction code
80
81 @var rcv_size The receive size to use for receive calls
82 @var max_pkts The max size of the receive queue
83 @var keep_alive If true, listen for echo requests and respond w/
84 echo replies
Dan Talayco710438c2010-02-18 15:16:07 -080085 @var initial_hello If true, will send a hello message immediately
86 upon connecting to the switch
Dan Talayco69ca4d62012-11-15 11:50:22 -080087 @var switch If not None, do an active connection to the switch
Dan Talayco21c75c72010-02-12 22:59:24 -080088 @var host The host to use for connect
89 @var port The port to connect on
90 @var packets_total Total number of packets received
91 @var packets_expired Number of packets popped from queue as queue full
92 @var packets_handled Number of packets handled by something
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080093 @var dbg_state Debug indication of state
Dan Talayco34089522010-02-07 23:07:41 -080094 """
95
Dan Talayco69ca4d62012-11-15 11:50:22 -080096 def __init__(self, switch=None, host='127.0.0.1', port=6633, max_pkts=1024):
Dan Talayco21c75c72010-02-12 22:59:24 -080097 Thread.__init__(self)
Dan Talayco1b3f6902010-02-15 14:14:19 -080098 # Socket related
Dan Talayco21c75c72010-02-12 22:59:24 -080099 self.rcv_size = RCV_SIZE_DEFAULT
Dan Talayco1b3f6902010-02-15 14:14:19 -0800100 self.listen_socket = None
101 self.switch_socket = None
102 self.switch_addr = None
Dan Talayco710438c2010-02-18 15:16:07 -0800103 self.connect_cv = Condition()
Dan Talaycoe226eb12010-02-18 23:06:30 -0800104 self.message_cv = Condition()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800105
Rich Lane4dfd5e12012-12-22 19:48:01 -0800106 # Used to wake up the event loop from another thread
Rich Lanecd97d3d2013-01-07 18:50:06 -0800107 self.waker = ofutils.EventDescriptor()
Rich Lane32797542012-12-22 17:46:05 -0800108
Dan Talayco1b3f6902010-02-15 14:14:19 -0800109 # Counters
Dan Talayco21c75c72010-02-12 22:59:24 -0800110 self.socket_errors = 0
111 self.parse_errors = 0
Dan Talayco21c75c72010-02-12 22:59:24 -0800112 self.packets_total = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -0800113 self.packets_expired = 0
114 self.packets_handled = 0
Dan Talaycoe226eb12010-02-18 23:06:30 -0800115 self.poll_discards = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -0800116
117 # State
Dan Talayco21c75c72010-02-12 22:59:24 -0800118 self.sync = Lock()
119 self.handlers = {}
120 self.keep_alive = False
Dan Talayco710438c2010-02-18 15:16:07 -0800121 self.active = True
122 self.initial_hello = True
Dan Talayco1b3f6902010-02-15 14:14:19 -0800123
Rich Lanec4f071b2012-07-11 17:25:57 -0700124 # OpenFlow message/packet queue
125 # Protected by the packets_cv lock / condition variable
126 self.packets = []
127 self.packets_cv = Condition()
128
Dan Talayco1b3f6902010-02-15 14:14:19 -0800129 # Settings
130 self.max_pkts = max_pkts
Dan Talayco69ca4d62012-11-15 11:50:22 -0800131 self.switch = switch
132 self.passive = not self.switch
Dan Talayco48370102010-03-03 15:17:33 -0800133 self.host = host
134 self.port = port
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800135 self.dbg_state = "init"
Dan Talayco48370102010-03-03 15:17:33 -0800136 self.logger = logging.getLogger("controller")
Dan Talaycof8de5182012-04-12 22:38:41 -0700137 self.filter_packet_in = False # Drop "excessive" packet ins
138 self.pkt_in_run = 0 # Count on run of packet ins
139 self.pkt_in_filter_limit = 50 # Count on run of packet ins
140 self.pkt_in_dropped = 0 # Total dropped packet ins
141 self.transact_to = 15 # Transact timeout default value; add to config
Dan Talayco1b3f6902010-02-15 14:14:19 -0800142
Dan Talaycoe226eb12010-02-18 23:06:30 -0800143 # Transaction and message type waiting variables
144 # xid_cv: Condition variable (semaphore) for packet waiters
Dan Talayco21c75c72010-02-12 22:59:24 -0800145 # xid: Transaction ID being waited on
146 # xid_response: Transaction response message
147 self.xid_cv = Condition()
148 self.xid = None
149 self.xid_response = None
150
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800151 self.buffered_input = ""
Dan Talaycoe226eb12010-02-18 23:06:30 -0800152
Rich Lane207502e2012-12-31 14:29:12 -0800153 # Create listen socket
154 if self.passive:
155 self.logger.info("Create/listen at " + self.host + ":" +
156 str(self.port))
157 self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
158 self.listen_socket.setsockopt(socket.SOL_SOCKET,
159 socket.SO_REUSEADDR, 1)
160 self.listen_socket.bind((self.host, self.port))
161 self.listen_socket.listen(LISTEN_QUEUE_SIZE)
162
Dan Talaycof8de5182012-04-12 22:38:41 -0700163 def filter_packet(self, rawmsg, hdr):
164 """
165 Check if packet should be filtered
166
167 Currently filters packet in messages
168 @return Boolean, True if packet should be dropped
169 """
170 # Add check for packet in and rate limit
171 if self.filter_packet_in:
Rich Lanec4f071b2012-07-11 17:25:57 -0700172 # If we were dropping packets, report number dropped
173 # TODO dont drop expected packet ins
174 if self.pkt_in_run > self.pkt_in_filter_limit:
175 self.logger.debug("Dropped %d packet ins (%d total)"
176 % ((self.pkt_in_run -
177 self.pkt_in_filter_limit),
178 self.pkt_in_dropped))
179 self.pkt_in_run = 0
Dan Talaycof8de5182012-04-12 22:38:41 -0700180
181 return False
182
Dan Talaycod12b6612010-03-07 22:00:46 -0800183 def _pkt_handle(self, pkt):
184 """
185 Check for all packet handling conditions
186
187 Parse and verify message
188 Check if XID matches something waiting
189 Check if message is being expected for a poll operation
190 Check if keep alive is on and message is an echo request
191 Check if any registered handler wants the packet
192 Enqueue if none of those conditions is met
193
194 an echo request in case keep_alive is true, followed by
195 registered message handlers.
Glen Gibb6d467062010-07-08 16:15:08 -0700196 @param pkt The raw packet (string) which may contain multiple OF msgs
Dan Talaycod12b6612010-03-07 22:00:46 -0800197 """
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800198
199 # snag any left over data from last read()
200 pkt = self.buffered_input + pkt
201 self.buffered_input = ""
202
Glen Gibb6d467062010-07-08 16:15:08 -0700203 # Process each of the OF msgs inside the pkt
204 offset = 0
205 while offset < len(pkt):
206 # Parse the header to get type
Rich Lane78ef8b92013-01-10 12:19:23 -0800207 hdr = ofp.parse.of_header_parse(pkt[offset:])
Dan Talaycof8de5182012-04-12 22:38:41 -0700208 if not hdr or hdr.length == 0:
209 self.logger.error("Could not parse header")
210 self.logger.error("pkt len %d." % len(pkt))
211 if hdr:
212 self.logger.error("hdr len %d." % hdr.length)
213 self.logger.error("%s" % hex_dump_buffer(pkt[:200]))
Rich Lane376bb402012-12-31 15:20:16 -0800214 self.shutdown()
Dan Talaycod12b6612010-03-07 22:00:46 -0800215 return
216
Glen Gibb6d467062010-07-08 16:15:08 -0700217 # Extract the raw message bytes
Ed Swierk836e5bd2012-03-20 11:08:53 -0700218 if (offset + hdr.length) > len(pkt):
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800219 break
Glen Gibb6d467062010-07-08 16:15:08 -0700220 rawmsg = pkt[offset : offset + hdr.length]
Dan Talayco4306d3e2011-09-07 09:42:26 -0700221 offset += hdr.length
Dan Talaycof8de5182012-04-12 22:38:41 -0700222
223 if self.filter_packet(rawmsg, hdr):
224 continue
225
Rich Lanecd97d3d2013-01-07 18:50:06 -0800226 self.logger.debug("Msg in: buf len %d. hdr.type %s. hdr.len %d hdr.version %d" %
Rich Lane78ef8b92013-01-10 12:19:23 -0800227 (len(pkt), ofp.cstruct.ofp_type_map[hdr.type], hdr.length, hdr.version))
Rich Lanec44b6242013-01-10 12:23:54 -0800228 if hdr.version < ofp.cstruct.OFP_VERSION:
229 self.logger.error("Switch only supports up to OpenFlow version %d (OFTest version is %d)",
230 hdr.version, ofp.cstruct.OFP_VERSION)
231 print "Switch only supports up to OpenFlow version %d (OFTest version is %d)" % \
Rich Lane78ef8b92013-01-10 12:19:23 -0800232 (hdr.version, ofp.cstruct.OFP_VERSION)
Ken Chiangadc950f2012-10-05 13:50:03 -0700233 self.disconnect()
Dan Talaycof8de5182012-04-12 22:38:41 -0700234 return
Dan Talaycod12b6612010-03-07 22:00:46 -0800235
Rich Lane78ef8b92013-01-10 12:19:23 -0800236 msg = ofp.parse.of_message_parse(rawmsg)
Glen Gibb6d467062010-07-08 16:15:08 -0700237 if not msg:
238 self.parse_errors += 1
239 self.logger.warn("Could not parse message")
240 continue
241
Rich Lanec4f071b2012-07-11 17:25:57 -0700242 with self.sync:
243 # Check if transaction is waiting
244 with self.xid_cv:
245 if self.xid and hdr.xid == self.xid:
246 self.logger.debug("Matched expected XID " + str(hdr.xid))
247 self.xid_response = (msg, rawmsg)
248 self.xid = None
249 self.xid_cv.notify()
250 continue
Glen Gibb6d467062010-07-08 16:15:08 -0700251
Rich Lanec4f071b2012-07-11 17:25:57 -0700252 # Check if keep alive is set; if so, respond to echo requests
253 if self.keep_alive:
Rich Lane78ef8b92013-01-10 12:19:23 -0800254 if hdr.type == ofp.cstruct.OFPT_ECHO_REQUEST:
Rich Lanec4f071b2012-07-11 17:25:57 -0700255 self.logger.debug("Responding to echo request")
Rich Lane78ef8b92013-01-10 12:19:23 -0800256 rep = ofp.message.echo_reply()
Rich Lanec4f071b2012-07-11 17:25:57 -0700257 rep.header.xid = hdr.xid
258 # Ignoring additional data
Rich Lane5c3151c2013-01-03 17:15:41 -0800259 self.message_send(rep.pack(), zero_xid=True)
Rich Lanec4f071b2012-07-11 17:25:57 -0700260 continue
Glen Gibb6d467062010-07-08 16:15:08 -0700261
Rich Lanec4f071b2012-07-11 17:25:57 -0700262 # Now check for message handlers; preference is given to
263 # handlers for a specific packet
264 handled = False
265 if hdr.type in self.handlers.keys():
266 handled = self.handlers[hdr.type](self, msg, rawmsg)
267 if not handled and ("all" in self.handlers.keys()):
268 handled = self.handlers["all"](self, msg, rawmsg)
Glen Gibb6d467062010-07-08 16:15:08 -0700269
Rich Lanec4f071b2012-07-11 17:25:57 -0700270 if not handled: # Not handled, enqueue
Rich Lane78ef8b92013-01-10 12:19:23 -0800271 self.logger.debug("Enqueuing pkt type " + ofp.cstruct.ofp_type_map[hdr.type])
Rich Lanec4f071b2012-07-11 17:25:57 -0700272 with self.packets_cv:
273 if len(self.packets) >= self.max_pkts:
274 self.packets.pop(0)
275 self.packets_expired += 1
276 self.packets.append((msg, rawmsg))
277 self.packets_cv.notify_all()
278 self.packets_total += 1
279 else:
280 self.packets_handled += 1
281 self.logger.debug("Message handled by callback")
Glen Gibb6d467062010-07-08 16:15:08 -0700282
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800283 # end of 'while offset < len(pkt)'
284 # note that if offset = len(pkt), this is
285 # appends a harmless empty string
286 self.buffered_input += pkt[offset:]
Dan Talaycod12b6612010-03-07 22:00:46 -0800287
Dan Talayco710438c2010-02-18 15:16:07 -0800288 def _socket_ready_handle(self, s):
289 """
290 Handle an input-ready socket
Dan Talaycof8de5182012-04-12 22:38:41 -0700291
Dan Talayco710438c2010-02-18 15:16:07 -0800292 @param s The socket object that is ready
Dan Talaycof8de5182012-04-12 22:38:41 -0700293 @returns 0 on success, -1 on error
Dan Talayco710438c2010-02-18 15:16:07 -0800294 """
295
Dan Talayco69ca4d62012-11-15 11:50:22 -0800296 if self.passive and s and s == self.listen_socket:
Dan Talayco710438c2010-02-18 15:16:07 -0800297 if self.switch_socket:
Rich Lanee1da7ea2012-07-26 15:58:45 -0700298 self.logger.warning("Ignoring incoming connection; already connected to switch")
Rich Laneb4f8ecb2012-09-25 09:36:26 -0700299 (sock, addr) = self.listen_socket.accept()
300 sock.close()
Rich Lanee1da7ea2012-07-26 15:58:45 -0700301 return 0
Dan Talayco710438c2010-02-18 15:16:07 -0800302
Ken Chiange875baf2012-10-09 15:24:40 -0700303 try:
304 (sock, addr) = self.listen_socket.accept()
305 except:
306 self.logger.warning("Error on listen socket accept")
307 return -1
Ken Chiang77173992012-10-30 15:44:39 -0700308 self.logger.info(self.host+":"+str(self.port)+": Incoming connection from "+str(addr))
Rich Lanee1da7ea2012-07-26 15:58:45 -0700309
Rich Laneee3586c2012-07-11 17:26:02 -0700310 with self.connect_cv:
Rich Lanee1da7ea2012-07-26 15:58:45 -0700311 (self.switch_socket, self.switch_addr) = (sock, addr)
Rich Lane82ef1832012-12-22 17:04:35 -0800312 self.switch_socket.setsockopt(socket.IPPROTO_TCP,
313 socket.TCP_NODELAY, True)
Rich Lane1a8d5aa2012-10-08 15:40:03 -0700314 if self.initial_hello:
Rich Lane78ef8b92013-01-10 12:19:23 -0800315 self.message_send(ofp.message.hello())
Rich Lanee1da7ea2012-07-26 15:58:45 -0700316 self.connect_cv.notify() # Notify anyone waiting
Dan Talaycof8de5182012-04-12 22:38:41 -0700317 elif s and s == self.switch_socket:
318 for idx in range(3): # debug: try a couple of times
319 try:
320 pkt = self.switch_socket.recv(self.rcv_size)
321 except:
322 self.logger.warning("Error on switch read")
323 return -1
324
325 if not self.active:
326 return 0
327
328 if len(pkt) == 0:
329 self.logger.warning("Zero-length switch read, %d" % idx)
330 else:
331 break
Dan Talayco710438c2010-02-18 15:16:07 -0800332
Dan Talaycof8de5182012-04-12 22:38:41 -0700333 if len(pkt) == 0: # Still no packet
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700334 self.logger.warning("Zero-length switch read; closing cxn")
Dan Talaycof8de5182012-04-12 22:38:41 -0700335 self.logger.info(str(self))
336 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800337
Dan Talaycod12b6612010-03-07 22:00:46 -0800338 self._pkt_handle(pkt)
Rich Lane4dfd5e12012-12-22 19:48:01 -0800339 elif s and s == self.waker:
340 self.waker.wait()
Dan Talayco710438c2010-02-18 15:16:07 -0800341 else:
Dan Talayco48370102010-03-03 15:17:33 -0800342 self.logger.error("Unknown socket ready: " + str(s))
Dan Talaycof8de5182012-04-12 22:38:41 -0700343 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800344
Dan Talaycof8de5182012-04-12 22:38:41 -0700345 return 0
Dan Talayco710438c2010-02-18 15:16:07 -0800346
Dan Talayco69ca4d62012-11-15 11:50:22 -0800347 def active_connect(self):
348 """
349 Actively connect to a switch IP addr
350 """
351 try:
352 self.logger.info("Trying active connection to %s" % self.switch)
353 soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
354 soc.connect((self.switch, self.port))
355 self.logger.info("Connected to " + self.switch + " on " +
356 str(self.port))
Rich Lane82ef1832012-12-22 17:04:35 -0800357 soc.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800358 self.switch_addr = (self.switch, self.port)
359 return soc
360 except (StandardError, socket.error), e:
361 self.logger.error("Could not connect to %s at %d:: %s" %
362 (self.switch, self.port, str(e)))
363 return None
364
Rich Lane32797542012-12-22 17:46:05 -0800365 def wakeup(self):
366 """
367 Wake up the event loop, presumably from another thread.
368 """
Rich Lane4dfd5e12012-12-22 19:48:01 -0800369 self.waker.notify()
Rich Lane32797542012-12-22 17:46:05 -0800370
371 def sockets(self):
372 """
373 Return list of sockets to select on.
374 """
Rich Lane4dfd5e12012-12-22 19:48:01 -0800375 socs = [self.listen_socket, self.switch_socket, self.waker]
Rich Lane32797542012-12-22 17:46:05 -0800376 return [x for x in socs if x]
377
Dan Talayco1b3f6902010-02-15 14:14:19 -0800378 def run(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800379 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800380 Activity function for class
Dan Talayco21c75c72010-02-12 22:59:24 -0800381
Dan Talayco1b3f6902010-02-15 14:14:19 -0800382 Assumes connection to switch already exists. Listens on
383 switch_socket for messages until an error (or zero len pkt)
384 occurs.
Dan Talayco21c75c72010-02-12 22:59:24 -0800385
Dan Talayco1b3f6902010-02-15 14:14:19 -0800386 When there is a message on the socket, check for handlers; queue the
387 packet if no one handles the packet.
388
389 See note for controller describing the limitation of a single
390 connection for now.
391 """
392
Rich Lane207502e2012-12-31 14:29:12 -0800393 self.dbg_state = "running"
Ken Chiangadc950f2012-10-05 13:50:03 -0700394
Dan Talayco710438c2010-02-18 15:16:07 -0800395 while self.active:
Dan Talayco710438c2010-02-18 15:16:07 -0800396 try:
397 sel_in, sel_out, sel_err = \
Rich Lane32797542012-12-22 17:46:05 -0800398 select.select(self.sockets(), [], self.sockets(), 1)
Dan Talayco710438c2010-02-18 15:16:07 -0800399 except:
400 print sys.exc_info()
Ken Chiangadc950f2012-10-05 13:50:03 -0700401 self.logger.error("Select error, disconnecting")
402 self.disconnect()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800403
Dan Talayco710438c2010-02-18 15:16:07 -0800404 for s in sel_err:
Ken Chiangadc950f2012-10-05 13:50:03 -0700405 self.logger.error("Got socket error on: " + str(s) + ", disconnecting")
406 self.disconnect()
Dan Talaycof8de5182012-04-12 22:38:41 -0700407
408 for s in sel_in:
409 if self._socket_ready_handle(s) == -1:
Ken Chiangadc950f2012-10-05 13:50:03 -0700410 self.disconnect()
Dan Talayco710438c2010-02-18 15:16:07 -0800411
Dan Talayco710438c2010-02-18 15:16:07 -0800412 # End of main loop
413 self.dbg_state = "closing"
Dan Talayco48370102010-03-03 15:17:33 -0800414 self.logger.info("Exiting controller thread")
Dan Talayco710438c2010-02-18 15:16:07 -0800415 self.shutdown()
416
Rich Lane8806bc42012-07-26 19:18:37 -0700417 def connect(self, timeout=-1):
Dan Talayco710438c2010-02-18 15:16:07 -0800418 """
419 Connect to the switch
420
Rich Lane8806bc42012-07-26 19:18:37 -0700421 @param timeout Block for up to timeout seconds. Pass -1 for the default.
Dan Talayco710438c2010-02-18 15:16:07 -0800422 @return Boolean, True if connected
423 """
424
Dan Talayco69ca4d62012-11-15 11:50:22 -0800425 if not self.passive: # Do active connection now
426 self.logger.info("Attempting to connect to %s on port %s" %
427 (self.switch, str(self.port)))
428 soc = self.active_connect()
429 if soc:
430 self.logger.info("Connected to %s", self.switch)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800431 self.dbg_state = "running"
432 self.switch_socket = soc
Rich Lane32797542012-12-22 17:46:05 -0800433 self.wakeup()
Dan Talayco69ca4d62012-11-15 11:50:22 -0800434 with self.connect_cv:
435 if self.initial_hello:
436 self.message_send(hello())
437 self.connect_cv.notify() # Notify anyone waiting
438 else:
439 self.logger.error("Could not actively connect to switch %s",
440 self.switch)
441 self.active = False
442 else:
443 with self.connect_cv:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800444 ofutils.timed_wait(self.connect_cv, lambda: self.switch_socket,
445 timeout=timeout)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800446
Dan Talayco710438c2010-02-18 15:16:07 -0800447 return self.switch_socket is not None
448
Ken Chiangadc950f2012-10-05 13:50:03 -0700449 def disconnect(self, timeout=-1):
450 """
451 If connected to a switch, disconnect.
452 """
453 if self.switch_socket:
Ken Chiangadc950f2012-10-05 13:50:03 -0700454 self.switch_socket.close()
455 self.switch_socket = None
456 self.switch_addr = None
Ken Chiang74be4722012-12-21 13:07:03 -0800457 with self.packets_cv:
458 self.packets = []
Ken Chiange875baf2012-10-09 15:24:40 -0700459 with self.connect_cv:
460 self.connect_cv.notifyAll()
Ken Chiangadc950f2012-10-05 13:50:03 -0700461
462 def wait_disconnected(self, timeout=-1):
463 """
464 @param timeout Block for up to timeout seconds. Pass -1 for the default.
465 @return Boolean, True if disconnected
466 """
467
Ken Chiange875baf2012-10-09 15:24:40 -0700468 with self.connect_cv:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800469 ofutils.timed_wait(self.connect_cv,
470 lambda: True if not self.switch_socket else None,
471 timeout=timeout)
Ken Chiangadc950f2012-10-05 13:50:03 -0700472 return self.switch_socket is None
473
Dan Talayco710438c2010-02-18 15:16:07 -0800474 def kill(self):
475 """
476 Force the controller thread to quit
Dan Talayco710438c2010-02-18 15:16:07 -0800477 """
478 self.active = False
Rich Lane32797542012-12-22 17:46:05 -0800479 self.wakeup()
Rich Lane376bb402012-12-31 15:20:16 -0800480 self.join()
Dan Talayco21c75c72010-02-12 22:59:24 -0800481
Dan Talayco1b3f6902010-02-15 14:14:19 -0800482 def shutdown(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800483 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800484 Shutdown the controller closing all sockets
Dan Talayco21c75c72010-02-12 22:59:24 -0800485
Dan Talayco1b3f6902010-02-15 14:14:19 -0800486 @todo Might want to synchronize shutdown with self.sync...
487 """
Dan Talaycof8de5182012-04-12 22:38:41 -0700488
Dan Talayco710438c2010-02-18 15:16:07 -0800489 self.active = False
490 try:
491 self.switch_socket.shutdown(socket.SHUT_RDWR)
492 except:
Dan Talayco48370102010-03-03 15:17:33 -0800493 self.logger.info("Ignoring switch soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800494 self.switch_socket = None
Dan Talayco1b3f6902010-02-15 14:14:19 -0800495
Dan Talayco710438c2010-02-18 15:16:07 -0800496 try:
497 self.listen_socket.shutdown(socket.SHUT_RDWR)
498 except:
Dan Talayco48370102010-03-03 15:17:33 -0800499 self.logger.info("Ignoring listen soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800500 self.listen_socket = None
Dan Talaycof8de5182012-04-12 22:38:41 -0700501
Rich Laneee3586c2012-07-11 17:26:02 -0700502 # Wakeup condition variables on which controller may be wait
503 with self.xid_cv:
504 self.xid_cv.notifyAll()
Dan Talaycof8de5182012-04-12 22:38:41 -0700505
Rich Laneee3586c2012-07-11 17:26:02 -0700506 with self.connect_cv:
507 self.connect_cv.notifyAll()
Dan Talaycof8de5182012-04-12 22:38:41 -0700508
Rich Lane32797542012-12-22 17:46:05 -0800509 self.wakeup()
Dan Talayco710438c2010-02-18 15:16:07 -0800510 self.dbg_state = "down"
511
Dan Talayco34089522010-02-07 23:07:41 -0800512 def register(self, msg_type, handler):
513 """
514 Register a callback to receive a specific message type.
515
516 Only one handler may be registered for a given message type.
Dan Talaycod12b6612010-03-07 22:00:46 -0800517
518 WARNING: A lock is held during the handler call back, so
519 the handler should not make any blocking calls
520
Dan Talayco34089522010-02-07 23:07:41 -0800521 @param msg_type The type of message to receive. May be DEFAULT
Dan Talayco21c75c72010-02-12 22:59:24 -0800522 for all non-handled packets. The special type, the string "all"
523 will send all packets to the handler.
Dan Talayco34089522010-02-07 23:07:41 -0800524 @param handler The function to call when a message of the given
525 type is received.
526 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800527 # Should check type is valid
528 if not handler and msg_type in self.handlers.keys():
529 del self.handlers[msg_type]
530 return
531 self.handlers[msg_type] = handler
Dan Talayco34089522010-02-07 23:07:41 -0800532
Rich Laneb64ce3d2012-07-26 15:37:57 -0700533 def poll(self, exp_msg=None, timeout=-1):
Dan Talayco34089522010-02-07 23:07:41 -0800534 """
535 Wait for the next OF message received from the switch.
536
537 @param exp_msg If set, return only when this type of message
Dan Talayco48370102010-03-03 15:17:33 -0800538 is received (unless timeout occurs).
Rich Laneb64ce3d2012-07-26 15:37:57 -0700539
540 @param timeout Maximum number of seconds to wait for the message.
541 Pass -1 for the default timeout.
Dan Talayco34089522010-02-07 23:07:41 -0800542
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800543 @retval A pair (msg, pkt) where msg is a message object and pkt
544 the string representing the packet as received from the socket.
545 This allows additional parsing by the receiver if necessary.
546
Dan Talayco34089522010-02-07 23:07:41 -0800547 The data members in the message are in host endian order.
Dan Talayco48370102010-03-03 15:17:33 -0800548 If an error occurs, (None, None) is returned
Dan Talayco34089522010-02-07 23:07:41 -0800549 """
Dan Talayco34089522010-02-07 23:07:41 -0800550
Ken Chiang77173992012-10-30 15:44:39 -0700551 if exp_msg is not None:
Rich Lane78ef8b92013-01-10 12:19:23 -0800552 self.logger.debug("Poll for %s" % ofp.cstruct.ofp_type_map[exp_msg])
Ed Swierk9e55e282012-08-22 06:57:28 -0700553 else:
554 self.logger.debug("Poll for any OF message")
Rich Laneb64ce3d2012-07-26 15:37:57 -0700555
556 # Take the packet from the queue
Rich Lanec4f071b2012-07-11 17:25:57 -0700557 def grab():
558 if len(self.packets) > 0:
Ken Chiang77173992012-10-30 15:44:39 -0700559 if exp_msg is None:
Rich Lanec4f071b2012-07-11 17:25:57 -0700560 self.logger.debug("Looking for any packet")
561 (msg, pkt) = self.packets.pop(0)
562 return (msg, pkt)
563 else:
Rich Lane78ef8b92013-01-10 12:19:23 -0800564 self.logger.debug("Looking for %s" % ofp.cstruct.ofp_type_map[exp_msg])
Rich Lanec4f071b2012-07-11 17:25:57 -0700565 for i in range(len(self.packets)):
566 msg = self.packets[i][0]
Rich Lane78ef8b92013-01-10 12:19:23 -0800567 self.logger.debug("Checking packets[%d] (%s)" % (i, ofp.cstruct.ofp_type_map[msg.header.type]))
Rich Lanec4f071b2012-07-11 17:25:57 -0700568 if msg.header.type == exp_msg:
569 (msg, pkt) = self.packets.pop(i)
570 return (msg, pkt)
571 # Not found
572 self.logger.debug("Packet not in queue")
Rich Laneb64ce3d2012-07-26 15:37:57 -0700573 return None
Dan Talayco21c75c72010-02-12 22:59:24 -0800574
Rich Lanec4f071b2012-07-11 17:25:57 -0700575 with self.packets_cv:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800576 ret = ofutils.timed_wait(self.packets_cv, grab, timeout=timeout)
Rich Lanec4f071b2012-07-11 17:25:57 -0700577
Rich Laneb64ce3d2012-07-26 15:37:57 -0700578 if ret != None:
579 (msg, pkt) = ret
580 self.logger.debug("Got message %s" % str(msg))
581 return (msg, pkt)
582 else:
583 return (None, None)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800584
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700585 def transact(self, msg, timeout=-1, zero_xid=False):
Dan Talayco34089522010-02-07 23:07:41 -0800586 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800587 Run a message transaction with the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800588
589 Send the message in msg and wait for a reply with a matching
Dan Talayco21c75c72010-02-12 22:59:24 -0800590 transaction id. Transactions have the highest priority in
591 received message handling.
Dan Talaycoe37999f2010-02-09 15:27:12 -0800592
Dan Talayco21c75c72010-02-12 22:59:24 -0800593 @param msg The message object to send; must not be a string
Rich Lanee1da7ea2012-07-26 15:58:45 -0700594 @param timeout The timeout in seconds; if -1 use default.
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800595 @param zero_xid Normally, if the XID is 0 an XID will be generated
Ed Swierk9e55e282012-08-22 06:57:28 -0700596 for the message. Set zero_xid to override this behavior
Dan Talayco21c75c72010-02-12 22:59:24 -0800597 @return The matching message object or None if unsuccessful
Dan Talaycoe37999f2010-02-09 15:27:12 -0800598
Dan Talayco34089522010-02-07 23:07:41 -0800599 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800600
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800601 if not zero_xid and msg.header.xid == 0:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800602 msg.header.xid = ofutils.gen_xid()
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800603
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700604 self.logger.debug("Running transaction %d" % msg.header.xid)
Dan Talayco21c75c72010-02-12 22:59:24 -0800605
Rich Lane9aca1992012-07-11 17:26:31 -0700606 with self.xid_cv:
607 if self.xid:
608 self.logger.error("Can only run one transaction at a time")
609 return (None, None)
Dan Talaycof8de5182012-04-12 22:38:41 -0700610
Rich Lane9aca1992012-07-11 17:26:31 -0700611 self.xid = msg.header.xid
Dan Talaycod12b6612010-03-07 22:00:46 -0800612 self.xid_response = None
Rich Lane5c3151c2013-01-03 17:15:41 -0800613 self.message_send(msg.pack())
Rich Lane9aca1992012-07-11 17:26:31 -0700614
Rich Lane8806bc42012-07-26 19:18:37 -0700615 self.logger.debug("Waiting for transaction %d" % msg.header.xid)
Rich Lanecd97d3d2013-01-07 18:50:06 -0800616 ofutils.timed_wait(self.xid_cv, lambda: self.xid_response, timeout=timeout)
Rich Lane9aca1992012-07-11 17:26:31 -0700617
618 if self.xid_response:
619 (resp, pkt) = self.xid_response
620 self.xid_response = None
621 else:
622 (resp, pkt) = (None, None)
623
Dan Talayco09c2c592010-05-13 14:21:52 -0700624 if resp is None:
625 self.logger.warning("No response for xid " + str(self.xid))
626 return (resp, pkt)
Dan Talayco34089522010-02-07 23:07:41 -0800627
Dan Talayco710438c2010-02-18 15:16:07 -0800628 def message_send(self, msg, zero_xid=False):
Dan Talayco34089522010-02-07 23:07:41 -0800629 """
630 Send the message to the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800631
Dan Talayco11c26e72010-03-07 22:03:57 -0800632 @param msg A string or OpenFlow message object to be forwarded to
633 the switch.
634 @param zero_xid If msg is an OpenFlow object (not a string) and if
Dan Talayco710438c2010-02-18 15:16:07 -0800635 the XID in the header is 0, then an XID will be generated
Ed Swierk9e55e282012-08-22 06:57:28 -0700636 for the message. Set zero_xid to override this behavior (and keep an
Dan Talayco710438c2010-02-18 15:16:07 -0800637 existing 0 xid)
Dan Talayco21c75c72010-02-12 22:59:24 -0800638 """
639
Dan Talayco1b3f6902010-02-15 14:14:19 -0800640 if not self.switch_socket:
641 # Sending a string indicates the message is ready to go
Ed Swierk9e55e282012-08-22 06:57:28 -0700642 raise Exception("no socket")
Dan Talayco710438c2010-02-18 15:16:07 -0800643 #@todo If not string, try to pack
Dan Talayco21c75c72010-02-12 22:59:24 -0800644 if type(msg) != type(""):
Ed Swierk9e55e282012-08-22 06:57:28 -0700645 if msg.header.xid == 0 and not zero_xid:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800646 msg.header.xid = ofutils.gen_xid()
Ed Swierk9e55e282012-08-22 06:57:28 -0700647 outpkt = msg.pack()
Dan Talayco710438c2010-02-18 15:16:07 -0800648 else:
649 outpkt = msg
Dan Talayco21c75c72010-02-12 22:59:24 -0800650
Rich Lanef18980d2012-12-31 17:11:41 -0800651 msg_version, msg_type, msg_len, msg_xid = struct.unpack_from("!BBHL", outpkt)
Rich Lanecd97d3d2013-01-07 18:50:06 -0800652 self.logger.debug("Msg out: buf len %d. hdr.type %s. hdr.len %d hdr.version %d",
Rich Lanef18980d2012-12-31 17:11:41 -0800653 len(outpkt),
Rich Lane78ef8b92013-01-10 12:19:23 -0800654 ofp.cstruct.ofp_type_map.get(msg_type, "unknown (%d)" % msg_type),
Rich Lanecd97d3d2013-01-07 18:50:06 -0800655 msg_len,
656 msg_version)
Ed Swierk9e55e282012-08-22 06:57:28 -0700657 if self.switch_socket.sendall(outpkt) is not None:
Rich Lane5c3151c2013-01-03 17:15:41 -0800658 raise AssertionError("failed to send message to switch")
Dan Talayco710438c2010-02-18 15:16:07 -0800659
Rich Lane5c3151c2013-01-03 17:15:41 -0800660 return 0 # for backwards compatibility
Dan Talayco21c75c72010-02-12 22:59:24 -0800661
662 def __str__(self):
663 string = "Controller:\n"
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800664 string += " state " + self.dbg_state + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800665 string += " switch_addr " + str(self.switch_addr) + "\n"
666 string += " pending pkts " + str(len(self.packets)) + "\n"
667 string += " total pkts " + str(self.packets_total) + "\n"
668 string += " expired pkts " + str(self.packets_expired) + "\n"
669 string += " handled pkts " + str(self.packets_handled) + "\n"
Dan Talaycoe226eb12010-02-18 23:06:30 -0800670 string += " poll discards " + str(self.poll_discards) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800671 string += " parse errors " + str(self.parse_errors) + "\n"
672 string += " sock errrors " + str(self.socket_errors) + "\n"
673 string += " max pkts " + str(self.max_pkts) + "\n"
Dan Talayco69ca4d62012-11-15 11:50:22 -0800674 string += " target switch " + str(self.switch) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800675 string += " host " + str(self.host) + "\n"
676 string += " port " + str(self.port) + "\n"
677 string += " keep_alive " + str(self.keep_alive) + "\n"
Dan Talaycof8de5182012-04-12 22:38:41 -0700678 string += " pkt_in_run " + str(self.pkt_in_run) + "\n"
679 string += " pkt_in_dropped " + str(self.pkt_in_dropped) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800680 return string
681
682 def show(self):
683 print str(self)
684
685def sample_handler(controller, msg, pkt):
686 """
687 Sample message handler
688
689 This is the prototype for functions registered with the controller
690 class for packet reception
691
692 @param controller The controller calling the handler
693 @param msg The parsed message object
694 @param pkt The raw packet that was received on the socket. This is
695 in case the packet contains extra unparsed data.
696 @returns Boolean value indicating if the packet was handled. If
697 not handled, the packet is placed in the queue for pollers to received
698 """
699 pass