blob: 31da7226a5bf4cd3a4c0b015792e387f9b9cbbca [file] [log] [blame]
Dan Talayco34089522010-02-07 23:07:41 -08001"""
2OpenFlow Test Framework
3
4Controller class
5
6Provide the interface to the control channel to the switch under test.
7
8Class inherits from thread so as to run in background allowing
9asynchronous callbacks (if needed, not required). Also supports
10polling.
11
12The controller thread maintains a queue. Incoming messages that
13are not handled by a callback function are placed in this queue for
14poll calls.
15
16Callbacks and polling support specifying the message type
17
18@todo Support transaction semantics via xid
Dan Talayco1b3f6902010-02-15 14:14:19 -080019@todo Support select and listen on an administrative socket (or
20use a timeout to support clean shutdown).
21
22Currently only one connection is accepted during the life of
23the controller. There seems
24to be no clean way to interrupt an accept call. Using select that also listens
25on an administrative socket and can shut down the socket might work.
26
Dan Talayco34089522010-02-07 23:07:41 -080027"""
28
Dan Talayco34089522010-02-07 23:07:41 -080029import os
30import socket
31import time
Rich Lanecd97d3d2013-01-07 18:50:06 -080032import struct
33import select
34import logging
Dan Talayco34089522010-02-07 23:07:41 -080035from threading import Thread
36from threading import Lock
Dan Talayco21c75c72010-02-12 22:59:24 -080037from threading import Condition
Rich Lanecd97d3d2013-01-07 18:50:06 -080038import of10.message
39import of10.parse
40import of10.cstruct
41import ofutils
Dan Talayco48370102010-03-03 15:17:33 -080042
Dan Talaycof8de5182012-04-12 22:38:41 -070043
44FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.'
45 for x in range(256)])
46
47def hex_dump_buffer(src, length=16):
48 """
49 Convert src to a hex dump string and return the string
50 @param src The source buffer
51 @param length The number of bytes shown in each line
52 @returns A string showing the hex dump
53 """
54 result = ["\n"]
55 for i in xrange(0, len(src), length):
56 chars = src[i:i+length]
57 hex = ' '.join(["%02x" % ord(x) for x in chars])
58 printable = ''.join(["%s" % ((ord(x) <= 127 and
59 FILTER[ord(x)]) or '.') for x in chars])
60 result.append("%04x %-*s %s\n" % (i, length*3, hex, printable))
61 return ''.join(result)
62
Dan Talayco48370102010-03-03 15:17:33 -080063##@todo Find a better home for these identifiers (controller)
Glen Gibb741b1182010-07-08 16:43:58 -070064RCV_SIZE_DEFAULT = 32768
Dan Talayco48370102010-03-03 15:17:33 -080065LISTEN_QUEUE_SIZE = 1
Dan Talayco34089522010-02-07 23:07:41 -080066
67class Controller(Thread):
68 """
69 Class abstracting the control interface to the switch.
70
71 For receiving messages, two mechanism will be implemented. First,
72 query the interface with poll. Second, register to have a
73 function called by message type. The callback is passed the
74 message type as well as the raw packet (or message object)
75
76 One of the main purposes of this object is to translate between network
77 and host byte order. 'Above' this object, things should be in host
78 byte order.
Dan Talayco21c75c72010-02-12 22:59:24 -080079
80 @todo Consider using SocketServer for listening socket
81 @todo Test transaction code
82
83 @var rcv_size The receive size to use for receive calls
84 @var max_pkts The max size of the receive queue
85 @var keep_alive If true, listen for echo requests and respond w/
86 echo replies
Dan Talayco710438c2010-02-18 15:16:07 -080087 @var initial_hello If true, will send a hello message immediately
88 upon connecting to the switch
Dan Talayco69ca4d62012-11-15 11:50:22 -080089 @var switch If not None, do an active connection to the switch
Dan Talayco21c75c72010-02-12 22:59:24 -080090 @var host The host to use for connect
91 @var port The port to connect on
92 @var packets_total Total number of packets received
93 @var packets_expired Number of packets popped from queue as queue full
94 @var packets_handled Number of packets handled by something
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080095 @var dbg_state Debug indication of state
Dan Talayco34089522010-02-07 23:07:41 -080096 """
97
Dan Talayco69ca4d62012-11-15 11:50:22 -080098 def __init__(self, switch=None, host='127.0.0.1', port=6633, max_pkts=1024):
Dan Talayco21c75c72010-02-12 22:59:24 -080099 Thread.__init__(self)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800100 # Socket related
Dan Talayco21c75c72010-02-12 22:59:24 -0800101 self.rcv_size = RCV_SIZE_DEFAULT
Dan Talayco1b3f6902010-02-15 14:14:19 -0800102 self.listen_socket = None
103 self.switch_socket = None
104 self.switch_addr = None
Dan Talayco710438c2010-02-18 15:16:07 -0800105 self.connect_cv = Condition()
Dan Talaycoe226eb12010-02-18 23:06:30 -0800106 self.message_cv = Condition()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800107
Rich Lane4dfd5e12012-12-22 19:48:01 -0800108 # Used to wake up the event loop from another thread
Rich Lanecd97d3d2013-01-07 18:50:06 -0800109 self.waker = ofutils.EventDescriptor()
Rich Lane32797542012-12-22 17:46:05 -0800110
Dan Talayco1b3f6902010-02-15 14:14:19 -0800111 # Counters
Dan Talayco21c75c72010-02-12 22:59:24 -0800112 self.socket_errors = 0
113 self.parse_errors = 0
Dan Talayco21c75c72010-02-12 22:59:24 -0800114 self.packets_total = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -0800115 self.packets_expired = 0
116 self.packets_handled = 0
Dan Talaycoe226eb12010-02-18 23:06:30 -0800117 self.poll_discards = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -0800118
119 # State
Dan Talayco21c75c72010-02-12 22:59:24 -0800120 self.sync = Lock()
121 self.handlers = {}
122 self.keep_alive = False
Dan Talayco710438c2010-02-18 15:16:07 -0800123 self.active = True
124 self.initial_hello = True
Dan Talayco1b3f6902010-02-15 14:14:19 -0800125
Rich Lanec4f071b2012-07-11 17:25:57 -0700126 # OpenFlow message/packet queue
127 # Protected by the packets_cv lock / condition variable
128 self.packets = []
129 self.packets_cv = Condition()
130
Dan Talayco1b3f6902010-02-15 14:14:19 -0800131 # Settings
132 self.max_pkts = max_pkts
Dan Talayco69ca4d62012-11-15 11:50:22 -0800133 self.switch = switch
134 self.passive = not self.switch
Dan Talayco48370102010-03-03 15:17:33 -0800135 self.host = host
136 self.port = port
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800137 self.dbg_state = "init"
Dan Talayco48370102010-03-03 15:17:33 -0800138 self.logger = logging.getLogger("controller")
Dan Talaycof8de5182012-04-12 22:38:41 -0700139 self.filter_packet_in = False # Drop "excessive" packet ins
140 self.pkt_in_run = 0 # Count on run of packet ins
141 self.pkt_in_filter_limit = 50 # Count on run of packet ins
142 self.pkt_in_dropped = 0 # Total dropped packet ins
143 self.transact_to = 15 # Transact timeout default value; add to config
Dan Talayco1b3f6902010-02-15 14:14:19 -0800144
Dan Talaycoe226eb12010-02-18 23:06:30 -0800145 # Transaction and message type waiting variables
146 # xid_cv: Condition variable (semaphore) for packet waiters
Dan Talayco21c75c72010-02-12 22:59:24 -0800147 # xid: Transaction ID being waited on
148 # xid_response: Transaction response message
149 self.xid_cv = Condition()
150 self.xid = None
151 self.xid_response = None
152
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800153 self.buffered_input = ""
Dan Talaycoe226eb12010-02-18 23:06:30 -0800154
Rich Lane207502e2012-12-31 14:29:12 -0800155 # Create listen socket
156 if self.passive:
157 self.logger.info("Create/listen at " + self.host + ":" +
158 str(self.port))
159 self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
160 self.listen_socket.setsockopt(socket.SOL_SOCKET,
161 socket.SO_REUSEADDR, 1)
162 self.listen_socket.bind((self.host, self.port))
163 self.listen_socket.listen(LISTEN_QUEUE_SIZE)
164
Dan Talaycof8de5182012-04-12 22:38:41 -0700165 def filter_packet(self, rawmsg, hdr):
166 """
167 Check if packet should be filtered
168
169 Currently filters packet in messages
170 @return Boolean, True if packet should be dropped
171 """
172 # Add check for packet in and rate limit
173 if self.filter_packet_in:
Rich Lanec4f071b2012-07-11 17:25:57 -0700174 # If we were dropping packets, report number dropped
175 # TODO dont drop expected packet ins
176 if self.pkt_in_run > self.pkt_in_filter_limit:
177 self.logger.debug("Dropped %d packet ins (%d total)"
178 % ((self.pkt_in_run -
179 self.pkt_in_filter_limit),
180 self.pkt_in_dropped))
181 self.pkt_in_run = 0
Dan Talaycof8de5182012-04-12 22:38:41 -0700182
183 return False
184
Dan Talaycod12b6612010-03-07 22:00:46 -0800185 def _pkt_handle(self, pkt):
186 """
187 Check for all packet handling conditions
188
189 Parse and verify message
190 Check if XID matches something waiting
191 Check if message is being expected for a poll operation
192 Check if keep alive is on and message is an echo request
193 Check if any registered handler wants the packet
194 Enqueue if none of those conditions is met
195
196 an echo request in case keep_alive is true, followed by
197 registered message handlers.
Glen Gibb6d467062010-07-08 16:15:08 -0700198 @param pkt The raw packet (string) which may contain multiple OF msgs
Dan Talaycod12b6612010-03-07 22:00:46 -0800199 """
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800200
201 # snag any left over data from last read()
202 pkt = self.buffered_input + pkt
203 self.buffered_input = ""
204
Glen Gibb6d467062010-07-08 16:15:08 -0700205 # Process each of the OF msgs inside the pkt
206 offset = 0
207 while offset < len(pkt):
208 # Parse the header to get type
Rich Lanecd97d3d2013-01-07 18:50:06 -0800209 hdr = of10.parse.of_header_parse(pkt[offset:])
Dan Talaycof8de5182012-04-12 22:38:41 -0700210 if not hdr or hdr.length == 0:
211 self.logger.error("Could not parse header")
212 self.logger.error("pkt len %d." % len(pkt))
213 if hdr:
214 self.logger.error("hdr len %d." % hdr.length)
215 self.logger.error("%s" % hex_dump_buffer(pkt[:200]))
Rich Lane376bb402012-12-31 15:20:16 -0800216 self.shutdown()
Dan Talaycod12b6612010-03-07 22:00:46 -0800217 return
218
Glen Gibb6d467062010-07-08 16:15:08 -0700219 # Extract the raw message bytes
Ed Swierk836e5bd2012-03-20 11:08:53 -0700220 if (offset + hdr.length) > len(pkt):
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800221 break
Glen Gibb6d467062010-07-08 16:15:08 -0700222 rawmsg = pkt[offset : offset + hdr.length]
Dan Talayco4306d3e2011-09-07 09:42:26 -0700223 offset += hdr.length
Dan Talaycof8de5182012-04-12 22:38:41 -0700224
225 if self.filter_packet(rawmsg, hdr):
226 continue
227
Rich Lanecd97d3d2013-01-07 18:50:06 -0800228 self.logger.debug("Msg in: buf len %d. hdr.type %s. hdr.len %d hdr.version %d" %
229 (len(pkt), of10.cstruct.ofp_type_map[hdr.type], hdr.length, hdr.version))
230 if hdr.version != of10.cstruct.OFP_VERSION:
Glen Gibb6d467062010-07-08 16:15:08 -0700231 self.logger.error("Version %d does not match OFTest version %d"
Rich Lanecd97d3d2013-01-07 18:50:06 -0800232 % (hdr.version, of10.cstruct.OFP_VERSION))
Glen Gibb6d467062010-07-08 16:15:08 -0700233 print "Version %d does not match OFTest version %d" % \
Rich Lanecd97d3d2013-01-07 18:50:06 -0800234 (hdr.version, of10.cstruct.OFP_VERSION)
Ken Chiangadc950f2012-10-05 13:50:03 -0700235 self.disconnect()
Dan Talaycof8de5182012-04-12 22:38:41 -0700236 return
Dan Talaycod12b6612010-03-07 22:00:46 -0800237
Rich Lanecd97d3d2013-01-07 18:50:06 -0800238 msg = of10.parse.of_message_parse(rawmsg)
Glen Gibb6d467062010-07-08 16:15:08 -0700239 if not msg:
240 self.parse_errors += 1
241 self.logger.warn("Could not parse message")
242 continue
243
Rich Lanec4f071b2012-07-11 17:25:57 -0700244 with self.sync:
245 # Check if transaction is waiting
246 with self.xid_cv:
247 if self.xid and hdr.xid == self.xid:
248 self.logger.debug("Matched expected XID " + str(hdr.xid))
249 self.xid_response = (msg, rawmsg)
250 self.xid = None
251 self.xid_cv.notify()
252 continue
Glen Gibb6d467062010-07-08 16:15:08 -0700253
Rich Lanec4f071b2012-07-11 17:25:57 -0700254 # Check if keep alive is set; if so, respond to echo requests
255 if self.keep_alive:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800256 if hdr.type == of10.cstruct.OFPT_ECHO_REQUEST:
Rich Lanec4f071b2012-07-11 17:25:57 -0700257 self.logger.debug("Responding to echo request")
258 rep = echo_reply()
259 rep.header.xid = hdr.xid
260 # Ignoring additional data
Rich Lane5c3151c2013-01-03 17:15:41 -0800261 self.message_send(rep.pack(), zero_xid=True)
Rich Lanec4f071b2012-07-11 17:25:57 -0700262 continue
Glen Gibb6d467062010-07-08 16:15:08 -0700263
Rich Lanec4f071b2012-07-11 17:25:57 -0700264 # Now check for message handlers; preference is given to
265 # handlers for a specific packet
266 handled = False
267 if hdr.type in self.handlers.keys():
268 handled = self.handlers[hdr.type](self, msg, rawmsg)
269 if not handled and ("all" in self.handlers.keys()):
270 handled = self.handlers["all"](self, msg, rawmsg)
Glen Gibb6d467062010-07-08 16:15:08 -0700271
Rich Lanec4f071b2012-07-11 17:25:57 -0700272 if not handled: # Not handled, enqueue
Rich Lanecd97d3d2013-01-07 18:50:06 -0800273 self.logger.debug("Enqueuing pkt type " + of10.cstruct.ofp_type_map[hdr.type])
Rich Lanec4f071b2012-07-11 17:25:57 -0700274 with self.packets_cv:
275 if len(self.packets) >= self.max_pkts:
276 self.packets.pop(0)
277 self.packets_expired += 1
278 self.packets.append((msg, rawmsg))
279 self.packets_cv.notify_all()
280 self.packets_total += 1
281 else:
282 self.packets_handled += 1
283 self.logger.debug("Message handled by callback")
Glen Gibb6d467062010-07-08 16:15:08 -0700284
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800285 # end of 'while offset < len(pkt)'
286 # note that if offset = len(pkt), this is
287 # appends a harmless empty string
288 self.buffered_input += pkt[offset:]
Dan Talaycod12b6612010-03-07 22:00:46 -0800289
Dan Talayco710438c2010-02-18 15:16:07 -0800290 def _socket_ready_handle(self, s):
291 """
292 Handle an input-ready socket
Dan Talaycof8de5182012-04-12 22:38:41 -0700293
Dan Talayco710438c2010-02-18 15:16:07 -0800294 @param s The socket object that is ready
Dan Talaycof8de5182012-04-12 22:38:41 -0700295 @returns 0 on success, -1 on error
Dan Talayco710438c2010-02-18 15:16:07 -0800296 """
297
Dan Talayco69ca4d62012-11-15 11:50:22 -0800298 if self.passive and s and s == self.listen_socket:
Dan Talayco710438c2010-02-18 15:16:07 -0800299 if self.switch_socket:
Rich Lanee1da7ea2012-07-26 15:58:45 -0700300 self.logger.warning("Ignoring incoming connection; already connected to switch")
Rich Laneb4f8ecb2012-09-25 09:36:26 -0700301 (sock, addr) = self.listen_socket.accept()
302 sock.close()
Rich Lanee1da7ea2012-07-26 15:58:45 -0700303 return 0
Dan Talayco710438c2010-02-18 15:16:07 -0800304
Ken Chiange875baf2012-10-09 15:24:40 -0700305 try:
306 (sock, addr) = self.listen_socket.accept()
307 except:
308 self.logger.warning("Error on listen socket accept")
309 return -1
Ken Chiang77173992012-10-30 15:44:39 -0700310 self.logger.info(self.host+":"+str(self.port)+": Incoming connection from "+str(addr))
Rich Lanee1da7ea2012-07-26 15:58:45 -0700311
Rich Laneee3586c2012-07-11 17:26:02 -0700312 with self.connect_cv:
Rich Lanee1da7ea2012-07-26 15:58:45 -0700313 (self.switch_socket, self.switch_addr) = (sock, addr)
Rich Lane82ef1832012-12-22 17:04:35 -0800314 self.switch_socket.setsockopt(socket.IPPROTO_TCP,
315 socket.TCP_NODELAY, True)
Rich Lane1a8d5aa2012-10-08 15:40:03 -0700316 if self.initial_hello:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800317 self.message_send(of10.message.hello())
Rich Lanee1da7ea2012-07-26 15:58:45 -0700318 self.connect_cv.notify() # Notify anyone waiting
Dan Talaycof8de5182012-04-12 22:38:41 -0700319 elif s and s == self.switch_socket:
320 for idx in range(3): # debug: try a couple of times
321 try:
322 pkt = self.switch_socket.recv(self.rcv_size)
323 except:
324 self.logger.warning("Error on switch read")
325 return -1
326
327 if not self.active:
328 return 0
329
330 if len(pkt) == 0:
331 self.logger.warning("Zero-length switch read, %d" % idx)
332 else:
333 break
Dan Talayco710438c2010-02-18 15:16:07 -0800334
Dan Talaycof8de5182012-04-12 22:38:41 -0700335 if len(pkt) == 0: # Still no packet
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700336 self.logger.warning("Zero-length switch read; closing cxn")
Dan Talaycof8de5182012-04-12 22:38:41 -0700337 self.logger.info(str(self))
338 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800339
Dan Talaycod12b6612010-03-07 22:00:46 -0800340 self._pkt_handle(pkt)
Rich Lane4dfd5e12012-12-22 19:48:01 -0800341 elif s and s == self.waker:
342 self.waker.wait()
Dan Talayco710438c2010-02-18 15:16:07 -0800343 else:
Dan Talayco48370102010-03-03 15:17:33 -0800344 self.logger.error("Unknown socket ready: " + str(s))
Dan Talaycof8de5182012-04-12 22:38:41 -0700345 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800346
Dan Talaycof8de5182012-04-12 22:38:41 -0700347 return 0
Dan Talayco710438c2010-02-18 15:16:07 -0800348
Dan Talayco69ca4d62012-11-15 11:50:22 -0800349 def active_connect(self):
350 """
351 Actively connect to a switch IP addr
352 """
353 try:
354 self.logger.info("Trying active connection to %s" % self.switch)
355 soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
356 soc.connect((self.switch, self.port))
357 self.logger.info("Connected to " + self.switch + " on " +
358 str(self.port))
Rich Lane82ef1832012-12-22 17:04:35 -0800359 soc.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800360 self.switch_addr = (self.switch, self.port)
361 return soc
362 except (StandardError, socket.error), e:
363 self.logger.error("Could not connect to %s at %d:: %s" %
364 (self.switch, self.port, str(e)))
365 return None
366
Rich Lane32797542012-12-22 17:46:05 -0800367 def wakeup(self):
368 """
369 Wake up the event loop, presumably from another thread.
370 """
Rich Lane4dfd5e12012-12-22 19:48:01 -0800371 self.waker.notify()
Rich Lane32797542012-12-22 17:46:05 -0800372
373 def sockets(self):
374 """
375 Return list of sockets to select on.
376 """
Rich Lane4dfd5e12012-12-22 19:48:01 -0800377 socs = [self.listen_socket, self.switch_socket, self.waker]
Rich Lane32797542012-12-22 17:46:05 -0800378 return [x for x in socs if x]
379
Dan Talayco1b3f6902010-02-15 14:14:19 -0800380 def run(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800381 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800382 Activity function for class
Dan Talayco21c75c72010-02-12 22:59:24 -0800383
Dan Talayco1b3f6902010-02-15 14:14:19 -0800384 Assumes connection to switch already exists. Listens on
385 switch_socket for messages until an error (or zero len pkt)
386 occurs.
Dan Talayco21c75c72010-02-12 22:59:24 -0800387
Dan Talayco1b3f6902010-02-15 14:14:19 -0800388 When there is a message on the socket, check for handlers; queue the
389 packet if no one handles the packet.
390
391 See note for controller describing the limitation of a single
392 connection for now.
393 """
394
Rich Lane207502e2012-12-31 14:29:12 -0800395 self.dbg_state = "running"
Ken Chiangadc950f2012-10-05 13:50:03 -0700396
Dan Talayco710438c2010-02-18 15:16:07 -0800397 while self.active:
Dan Talayco710438c2010-02-18 15:16:07 -0800398 try:
399 sel_in, sel_out, sel_err = \
Rich Lane32797542012-12-22 17:46:05 -0800400 select.select(self.sockets(), [], self.sockets(), 1)
Dan Talayco710438c2010-02-18 15:16:07 -0800401 except:
402 print sys.exc_info()
Ken Chiangadc950f2012-10-05 13:50:03 -0700403 self.logger.error("Select error, disconnecting")
404 self.disconnect()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800405
Dan Talayco710438c2010-02-18 15:16:07 -0800406 for s in sel_err:
Ken Chiangadc950f2012-10-05 13:50:03 -0700407 self.logger.error("Got socket error on: " + str(s) + ", disconnecting")
408 self.disconnect()
Dan Talaycof8de5182012-04-12 22:38:41 -0700409
410 for s in sel_in:
411 if self._socket_ready_handle(s) == -1:
Ken Chiangadc950f2012-10-05 13:50:03 -0700412 self.disconnect()
Dan Talayco710438c2010-02-18 15:16:07 -0800413
Dan Talayco710438c2010-02-18 15:16:07 -0800414 # End of main loop
415 self.dbg_state = "closing"
Dan Talayco48370102010-03-03 15:17:33 -0800416 self.logger.info("Exiting controller thread")
Dan Talayco710438c2010-02-18 15:16:07 -0800417 self.shutdown()
418
Rich Lane8806bc42012-07-26 19:18:37 -0700419 def connect(self, timeout=-1):
Dan Talayco710438c2010-02-18 15:16:07 -0800420 """
421 Connect to the switch
422
Rich Lane8806bc42012-07-26 19:18:37 -0700423 @param timeout Block for up to timeout seconds. Pass -1 for the default.
Dan Talayco710438c2010-02-18 15:16:07 -0800424 @return Boolean, True if connected
425 """
426
Dan Talayco69ca4d62012-11-15 11:50:22 -0800427 if not self.passive: # Do active connection now
428 self.logger.info("Attempting to connect to %s on port %s" %
429 (self.switch, str(self.port)))
430 soc = self.active_connect()
431 if soc:
432 self.logger.info("Connected to %s", self.switch)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800433 self.dbg_state = "running"
434 self.switch_socket = soc
Rich Lane32797542012-12-22 17:46:05 -0800435 self.wakeup()
Dan Talayco69ca4d62012-11-15 11:50:22 -0800436 with self.connect_cv:
437 if self.initial_hello:
438 self.message_send(hello())
439 self.connect_cv.notify() # Notify anyone waiting
440 else:
441 self.logger.error("Could not actively connect to switch %s",
442 self.switch)
443 self.active = False
444 else:
445 with self.connect_cv:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800446 ofutils.timed_wait(self.connect_cv, lambda: self.switch_socket,
447 timeout=timeout)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800448
Dan Talayco710438c2010-02-18 15:16:07 -0800449 return self.switch_socket is not None
450
Ken Chiangadc950f2012-10-05 13:50:03 -0700451 def disconnect(self, timeout=-1):
452 """
453 If connected to a switch, disconnect.
454 """
455 if self.switch_socket:
Ken Chiangadc950f2012-10-05 13:50:03 -0700456 self.switch_socket.close()
457 self.switch_socket = None
458 self.switch_addr = None
Ken Chiang74be4722012-12-21 13:07:03 -0800459 with self.packets_cv:
460 self.packets = []
Ken Chiange875baf2012-10-09 15:24:40 -0700461 with self.connect_cv:
462 self.connect_cv.notifyAll()
Ken Chiangadc950f2012-10-05 13:50:03 -0700463
464 def wait_disconnected(self, timeout=-1):
465 """
466 @param timeout Block for up to timeout seconds. Pass -1 for the default.
467 @return Boolean, True if disconnected
468 """
469
Ken Chiange875baf2012-10-09 15:24:40 -0700470 with self.connect_cv:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800471 ofutils.timed_wait(self.connect_cv,
472 lambda: True if not self.switch_socket else None,
473 timeout=timeout)
Ken Chiangadc950f2012-10-05 13:50:03 -0700474 return self.switch_socket is None
475
Dan Talayco710438c2010-02-18 15:16:07 -0800476 def kill(self):
477 """
478 Force the controller thread to quit
Dan Talayco710438c2010-02-18 15:16:07 -0800479 """
480 self.active = False
Rich Lane32797542012-12-22 17:46:05 -0800481 self.wakeup()
Rich Lane376bb402012-12-31 15:20:16 -0800482 self.join()
Dan Talayco21c75c72010-02-12 22:59:24 -0800483
Dan Talayco1b3f6902010-02-15 14:14:19 -0800484 def shutdown(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800485 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800486 Shutdown the controller closing all sockets
Dan Talayco21c75c72010-02-12 22:59:24 -0800487
Dan Talayco1b3f6902010-02-15 14:14:19 -0800488 @todo Might want to synchronize shutdown with self.sync...
489 """
Dan Talaycof8de5182012-04-12 22:38:41 -0700490
Dan Talayco710438c2010-02-18 15:16:07 -0800491 self.active = False
492 try:
493 self.switch_socket.shutdown(socket.SHUT_RDWR)
494 except:
Dan Talayco48370102010-03-03 15:17:33 -0800495 self.logger.info("Ignoring switch soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800496 self.switch_socket = None
Dan Talayco1b3f6902010-02-15 14:14:19 -0800497
Dan Talayco710438c2010-02-18 15:16:07 -0800498 try:
499 self.listen_socket.shutdown(socket.SHUT_RDWR)
500 except:
Dan Talayco48370102010-03-03 15:17:33 -0800501 self.logger.info("Ignoring listen soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800502 self.listen_socket = None
Dan Talaycof8de5182012-04-12 22:38:41 -0700503
Rich Laneee3586c2012-07-11 17:26:02 -0700504 # Wakeup condition variables on which controller may be wait
505 with self.xid_cv:
506 self.xid_cv.notifyAll()
Dan Talaycof8de5182012-04-12 22:38:41 -0700507
Rich Laneee3586c2012-07-11 17:26:02 -0700508 with self.connect_cv:
509 self.connect_cv.notifyAll()
Dan Talaycof8de5182012-04-12 22:38:41 -0700510
Rich Lane32797542012-12-22 17:46:05 -0800511 self.wakeup()
Dan Talayco710438c2010-02-18 15:16:07 -0800512 self.dbg_state = "down"
513
Dan Talayco34089522010-02-07 23:07:41 -0800514 def register(self, msg_type, handler):
515 """
516 Register a callback to receive a specific message type.
517
518 Only one handler may be registered for a given message type.
Dan Talaycod12b6612010-03-07 22:00:46 -0800519
520 WARNING: A lock is held during the handler call back, so
521 the handler should not make any blocking calls
522
Dan Talayco34089522010-02-07 23:07:41 -0800523 @param msg_type The type of message to receive. May be DEFAULT
Dan Talayco21c75c72010-02-12 22:59:24 -0800524 for all non-handled packets. The special type, the string "all"
525 will send all packets to the handler.
Dan Talayco34089522010-02-07 23:07:41 -0800526 @param handler The function to call when a message of the given
527 type is received.
528 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800529 # Should check type is valid
530 if not handler and msg_type in self.handlers.keys():
531 del self.handlers[msg_type]
532 return
533 self.handlers[msg_type] = handler
Dan Talayco34089522010-02-07 23:07:41 -0800534
Rich Laneb64ce3d2012-07-26 15:37:57 -0700535 def poll(self, exp_msg=None, timeout=-1):
Dan Talayco34089522010-02-07 23:07:41 -0800536 """
537 Wait for the next OF message received from the switch.
538
539 @param exp_msg If set, return only when this type of message
Dan Talayco48370102010-03-03 15:17:33 -0800540 is received (unless timeout occurs).
Rich Laneb64ce3d2012-07-26 15:37:57 -0700541
542 @param timeout Maximum number of seconds to wait for the message.
543 Pass -1 for the default timeout.
Dan Talayco34089522010-02-07 23:07:41 -0800544
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800545 @retval A pair (msg, pkt) where msg is a message object and pkt
546 the string representing the packet as received from the socket.
547 This allows additional parsing by the receiver if necessary.
548
Dan Talayco34089522010-02-07 23:07:41 -0800549 The data members in the message are in host endian order.
Dan Talayco48370102010-03-03 15:17:33 -0800550 If an error occurs, (None, None) is returned
Dan Talayco34089522010-02-07 23:07:41 -0800551 """
Dan Talayco34089522010-02-07 23:07:41 -0800552
Ken Chiang77173992012-10-30 15:44:39 -0700553 if exp_msg is not None:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800554 self.logger.debug("Poll for %s" % of10.cstruct.ofp_type_map[exp_msg])
Ed Swierk9e55e282012-08-22 06:57:28 -0700555 else:
556 self.logger.debug("Poll for any OF message")
Rich Laneb64ce3d2012-07-26 15:37:57 -0700557
558 # Take the packet from the queue
Rich Lanec4f071b2012-07-11 17:25:57 -0700559 def grab():
560 if len(self.packets) > 0:
Ken Chiang77173992012-10-30 15:44:39 -0700561 if exp_msg is None:
Rich Lanec4f071b2012-07-11 17:25:57 -0700562 self.logger.debug("Looking for any packet")
563 (msg, pkt) = self.packets.pop(0)
564 return (msg, pkt)
565 else:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800566 self.logger.debug("Looking for %s" % of10.cstruct.ofp_type_map[exp_msg])
Rich Lanec4f071b2012-07-11 17:25:57 -0700567 for i in range(len(self.packets)):
568 msg = self.packets[i][0]
Rich Lanecd97d3d2013-01-07 18:50:06 -0800569 self.logger.debug("Checking packets[%d] (%s)" % (i, of10.cstruct.ofp_type_map[msg.header.type]))
Rich Lanec4f071b2012-07-11 17:25:57 -0700570 if msg.header.type == exp_msg:
571 (msg, pkt) = self.packets.pop(i)
572 return (msg, pkt)
573 # Not found
574 self.logger.debug("Packet not in queue")
Rich Laneb64ce3d2012-07-26 15:37:57 -0700575 return None
Dan Talayco21c75c72010-02-12 22:59:24 -0800576
Rich Lanec4f071b2012-07-11 17:25:57 -0700577 with self.packets_cv:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800578 ret = ofutils.timed_wait(self.packets_cv, grab, timeout=timeout)
Rich Lanec4f071b2012-07-11 17:25:57 -0700579
Rich Laneb64ce3d2012-07-26 15:37:57 -0700580 if ret != None:
581 (msg, pkt) = ret
582 self.logger.debug("Got message %s" % str(msg))
583 return (msg, pkt)
584 else:
585 return (None, None)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800586
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700587 def transact(self, msg, timeout=-1, zero_xid=False):
Dan Talayco34089522010-02-07 23:07:41 -0800588 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800589 Run a message transaction with the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800590
591 Send the message in msg and wait for a reply with a matching
Dan Talayco21c75c72010-02-12 22:59:24 -0800592 transaction id. Transactions have the highest priority in
593 received message handling.
Dan Talaycoe37999f2010-02-09 15:27:12 -0800594
Dan Talayco21c75c72010-02-12 22:59:24 -0800595 @param msg The message object to send; must not be a string
Rich Lanee1da7ea2012-07-26 15:58:45 -0700596 @param timeout The timeout in seconds; if -1 use default.
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800597 @param zero_xid Normally, if the XID is 0 an XID will be generated
Ed Swierk9e55e282012-08-22 06:57:28 -0700598 for the message. Set zero_xid to override this behavior
Dan Talayco21c75c72010-02-12 22:59:24 -0800599 @return The matching message object or None if unsuccessful
Dan Talaycoe37999f2010-02-09 15:27:12 -0800600
Dan Talayco34089522010-02-07 23:07:41 -0800601 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800602
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800603 if not zero_xid and msg.header.xid == 0:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800604 msg.header.xid = ofutils.gen_xid()
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800605
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700606 self.logger.debug("Running transaction %d" % msg.header.xid)
Dan Talayco21c75c72010-02-12 22:59:24 -0800607
Rich Lane9aca1992012-07-11 17:26:31 -0700608 with self.xid_cv:
609 if self.xid:
610 self.logger.error("Can only run one transaction at a time")
611 return (None, None)
Dan Talaycof8de5182012-04-12 22:38:41 -0700612
Rich Lane9aca1992012-07-11 17:26:31 -0700613 self.xid = msg.header.xid
Dan Talaycod12b6612010-03-07 22:00:46 -0800614 self.xid_response = None
Rich Lane5c3151c2013-01-03 17:15:41 -0800615 self.message_send(msg.pack())
Rich Lane9aca1992012-07-11 17:26:31 -0700616
Rich Lane8806bc42012-07-26 19:18:37 -0700617 self.logger.debug("Waiting for transaction %d" % msg.header.xid)
Rich Lanecd97d3d2013-01-07 18:50:06 -0800618 ofutils.timed_wait(self.xid_cv, lambda: self.xid_response, timeout=timeout)
Rich Lane9aca1992012-07-11 17:26:31 -0700619
620 if self.xid_response:
621 (resp, pkt) = self.xid_response
622 self.xid_response = None
623 else:
624 (resp, pkt) = (None, None)
625
Dan Talayco09c2c592010-05-13 14:21:52 -0700626 if resp is None:
627 self.logger.warning("No response for xid " + str(self.xid))
628 return (resp, pkt)
Dan Talayco34089522010-02-07 23:07:41 -0800629
Dan Talayco710438c2010-02-18 15:16:07 -0800630 def message_send(self, msg, zero_xid=False):
Dan Talayco34089522010-02-07 23:07:41 -0800631 """
632 Send the message to the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800633
Dan Talayco11c26e72010-03-07 22:03:57 -0800634 @param msg A string or OpenFlow message object to be forwarded to
635 the switch.
636 @param zero_xid If msg is an OpenFlow object (not a string) and if
Dan Talayco710438c2010-02-18 15:16:07 -0800637 the XID in the header is 0, then an XID will be generated
Ed Swierk9e55e282012-08-22 06:57:28 -0700638 for the message. Set zero_xid to override this behavior (and keep an
Dan Talayco710438c2010-02-18 15:16:07 -0800639 existing 0 xid)
Dan Talayco21c75c72010-02-12 22:59:24 -0800640 """
641
Dan Talayco1b3f6902010-02-15 14:14:19 -0800642 if not self.switch_socket:
643 # Sending a string indicates the message is ready to go
Ed Swierk9e55e282012-08-22 06:57:28 -0700644 raise Exception("no socket")
Dan Talayco710438c2010-02-18 15:16:07 -0800645 #@todo If not string, try to pack
Dan Talayco21c75c72010-02-12 22:59:24 -0800646 if type(msg) != type(""):
Ed Swierk9e55e282012-08-22 06:57:28 -0700647 if msg.header.xid == 0 and not zero_xid:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800648 msg.header.xid = ofutils.gen_xid()
Ed Swierk9e55e282012-08-22 06:57:28 -0700649 outpkt = msg.pack()
Dan Talayco710438c2010-02-18 15:16:07 -0800650 else:
651 outpkt = msg
Dan Talayco21c75c72010-02-12 22:59:24 -0800652
Rich Lanef18980d2012-12-31 17:11:41 -0800653 msg_version, msg_type, msg_len, msg_xid = struct.unpack_from("!BBHL", outpkt)
Rich Lanecd97d3d2013-01-07 18:50:06 -0800654 self.logger.debug("Msg out: buf len %d. hdr.type %s. hdr.len %d hdr.version %d",
Rich Lanef18980d2012-12-31 17:11:41 -0800655 len(outpkt),
Rich Lanecd97d3d2013-01-07 18:50:06 -0800656 of10.cstruct.ofp_type_map.get(msg_type, "unknown (%d)" % msg_type),
657 msg_len,
658 msg_version)
Ed Swierk9e55e282012-08-22 06:57:28 -0700659 if self.switch_socket.sendall(outpkt) is not None:
Rich Lane5c3151c2013-01-03 17:15:41 -0800660 raise AssertionError("failed to send message to switch")
Dan Talayco710438c2010-02-18 15:16:07 -0800661
Rich Lane5c3151c2013-01-03 17:15:41 -0800662 return 0 # for backwards compatibility
Dan Talayco21c75c72010-02-12 22:59:24 -0800663
664 def __str__(self):
665 string = "Controller:\n"
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800666 string += " state " + self.dbg_state + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800667 string += " switch_addr " + str(self.switch_addr) + "\n"
668 string += " pending pkts " + str(len(self.packets)) + "\n"
669 string += " total pkts " + str(self.packets_total) + "\n"
670 string += " expired pkts " + str(self.packets_expired) + "\n"
671 string += " handled pkts " + str(self.packets_handled) + "\n"
Dan Talaycoe226eb12010-02-18 23:06:30 -0800672 string += " poll discards " + str(self.poll_discards) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800673 string += " parse errors " + str(self.parse_errors) + "\n"
674 string += " sock errrors " + str(self.socket_errors) + "\n"
675 string += " max pkts " + str(self.max_pkts) + "\n"
Dan Talayco69ca4d62012-11-15 11:50:22 -0800676 string += " target switch " + str(self.switch) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800677 string += " host " + str(self.host) + "\n"
678 string += " port " + str(self.port) + "\n"
679 string += " keep_alive " + str(self.keep_alive) + "\n"
Dan Talaycof8de5182012-04-12 22:38:41 -0700680 string += " pkt_in_run " + str(self.pkt_in_run) + "\n"
681 string += " pkt_in_dropped " + str(self.pkt_in_dropped) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800682 return string
683
684 def show(self):
685 print str(self)
686
687def sample_handler(controller, msg, pkt):
688 """
689 Sample message handler
690
691 This is the prototype for functions registered with the controller
692 class for packet reception
693
694 @param controller The controller calling the handler
695 @param msg The parsed message object
696 @param pkt The raw packet that was received on the socket. This is
697 in case the packet contains extra unparsed data.
698 @returns Boolean value indicating if the packet was handled. If
699 not handled, the packet is placed in the queue for pollers to received
700 """
701 pass