blob: d63519db327487e00384c2edb3488178f39eda20 [file] [log] [blame]
Dan Talayco34089522010-02-07 23:07:41 -08001"""
2OpenFlow Test Framework
3
4Controller class
5
6Provide the interface to the control channel to the switch under test.
7
8Class inherits from thread so as to run in background allowing
9asynchronous callbacks (if needed, not required). Also supports
10polling.
11
12The controller thread maintains a queue. Incoming messages that
13are not handled by a callback function are placed in this queue for
14poll calls.
15
16Callbacks and polling support specifying the message type
17
18@todo Support transaction semantics via xid
Dan Talayco1b3f6902010-02-15 14:14:19 -080019@todo Support select and listen on an administrative socket (or
20use a timeout to support clean shutdown).
21
22Currently only one connection is accepted during the life of
23the controller. There seems
24to be no clean way to interrupt an accept call. Using select that also listens
25on an administrative socket and can shut down the socket might work.
26
Dan Talayco34089522010-02-07 23:07:41 -080027"""
28
Dan Talayco34089522010-02-07 23:07:41 -080029import os
30import socket
31import time
Dan Talayco34089522010-02-07 23:07:41 -080032from threading import Thread
33from threading import Lock
Dan Talayco21c75c72010-02-12 22:59:24 -080034from threading import Condition
Dan Talayco34089522010-02-07 23:07:41 -080035from message import *
Dan Talaycoe37999f2010-02-09 15:27:12 -080036from parse import *
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080037from ofutils import *
Dan Talayco710438c2010-02-18 15:16:07 -080038# For some reason, it seems select to be last (or later).
39# Otherwise get an attribute error when calling select.select
40import select
Dan Talayco48370102010-03-03 15:17:33 -080041import logging
42
Dan Talaycof8de5182012-04-12 22:38:41 -070043
44FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.'
45 for x in range(256)])
46
47def hex_dump_buffer(src, length=16):
48 """
49 Convert src to a hex dump string and return the string
50 @param src The source buffer
51 @param length The number of bytes shown in each line
52 @returns A string showing the hex dump
53 """
54 result = ["\n"]
55 for i in xrange(0, len(src), length):
56 chars = src[i:i+length]
57 hex = ' '.join(["%02x" % ord(x) for x in chars])
58 printable = ''.join(["%s" % ((ord(x) <= 127 and
59 FILTER[ord(x)]) or '.') for x in chars])
60 result.append("%04x %-*s %s\n" % (i, length*3, hex, printable))
61 return ''.join(result)
62
Dan Talayco48370102010-03-03 15:17:33 -080063##@todo Find a better home for these identifiers (controller)
Glen Gibb741b1182010-07-08 16:43:58 -070064RCV_SIZE_DEFAULT = 32768
Dan Talayco48370102010-03-03 15:17:33 -080065LISTEN_QUEUE_SIZE = 1
Dan Talayco34089522010-02-07 23:07:41 -080066
67class Controller(Thread):
68 """
69 Class abstracting the control interface to the switch.
70
71 For receiving messages, two mechanism will be implemented. First,
72 query the interface with poll. Second, register to have a
73 function called by message type. The callback is passed the
74 message type as well as the raw packet (or message object)
75
76 One of the main purposes of this object is to translate between network
77 and host byte order. 'Above' this object, things should be in host
78 byte order.
Dan Talayco21c75c72010-02-12 22:59:24 -080079
80 @todo Consider using SocketServer for listening socket
81 @todo Test transaction code
82
83 @var rcv_size The receive size to use for receive calls
84 @var max_pkts The max size of the receive queue
85 @var keep_alive If true, listen for echo requests and respond w/
86 echo replies
Dan Talayco710438c2010-02-18 15:16:07 -080087 @var initial_hello If true, will send a hello message immediately
88 upon connecting to the switch
Dan Talayco69ca4d62012-11-15 11:50:22 -080089 @var switch If not None, do an active connection to the switch
Dan Talayco21c75c72010-02-12 22:59:24 -080090 @var host The host to use for connect
91 @var port The port to connect on
92 @var packets_total Total number of packets received
93 @var packets_expired Number of packets popped from queue as queue full
94 @var packets_handled Number of packets handled by something
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080095 @var dbg_state Debug indication of state
Dan Talayco34089522010-02-07 23:07:41 -080096 """
97
Dan Talayco69ca4d62012-11-15 11:50:22 -080098 def __init__(self, switch=None, host='127.0.0.1', port=6633, max_pkts=1024):
Dan Talayco21c75c72010-02-12 22:59:24 -080099 Thread.__init__(self)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800100 # Socket related
Dan Talayco21c75c72010-02-12 22:59:24 -0800101 self.rcv_size = RCV_SIZE_DEFAULT
Dan Talayco1b3f6902010-02-15 14:14:19 -0800102 self.listen_socket = None
103 self.switch_socket = None
104 self.switch_addr = None
Dan Talayco710438c2010-02-18 15:16:07 -0800105 self.connect_cv = Condition()
Dan Talaycoe226eb12010-02-18 23:06:30 -0800106 self.message_cv = Condition()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800107
Rich Lane4dfd5e12012-12-22 19:48:01 -0800108 # Used to wake up the event loop from another thread
109 self.waker = EventDescriptor()
Rich Lane32797542012-12-22 17:46:05 -0800110
Dan Talayco1b3f6902010-02-15 14:14:19 -0800111 # Counters
Dan Talayco21c75c72010-02-12 22:59:24 -0800112 self.socket_errors = 0
113 self.parse_errors = 0
Dan Talayco21c75c72010-02-12 22:59:24 -0800114 self.packets_total = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -0800115 self.packets_expired = 0
116 self.packets_handled = 0
Dan Talaycoe226eb12010-02-18 23:06:30 -0800117 self.poll_discards = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -0800118
119 # State
Dan Talayco21c75c72010-02-12 22:59:24 -0800120 self.sync = Lock()
121 self.handlers = {}
122 self.keep_alive = False
Dan Talayco710438c2010-02-18 15:16:07 -0800123 self.active = True
124 self.initial_hello = True
Dan Talayco1b3f6902010-02-15 14:14:19 -0800125
Rich Lanec4f071b2012-07-11 17:25:57 -0700126 # OpenFlow message/packet queue
127 # Protected by the packets_cv lock / condition variable
128 self.packets = []
129 self.packets_cv = Condition()
130
Dan Talayco1b3f6902010-02-15 14:14:19 -0800131 # Settings
132 self.max_pkts = max_pkts
Dan Talayco69ca4d62012-11-15 11:50:22 -0800133 self.switch = switch
134 self.passive = not self.switch
Dan Talayco48370102010-03-03 15:17:33 -0800135 self.host = host
136 self.port = port
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800137 self.dbg_state = "init"
Dan Talayco48370102010-03-03 15:17:33 -0800138 self.logger = logging.getLogger("controller")
Dan Talaycof8de5182012-04-12 22:38:41 -0700139 self.filter_packet_in = False # Drop "excessive" packet ins
140 self.pkt_in_run = 0 # Count on run of packet ins
141 self.pkt_in_filter_limit = 50 # Count on run of packet ins
142 self.pkt_in_dropped = 0 # Total dropped packet ins
143 self.transact_to = 15 # Transact timeout default value; add to config
Dan Talayco1b3f6902010-02-15 14:14:19 -0800144
Dan Talaycoe226eb12010-02-18 23:06:30 -0800145 # Transaction and message type waiting variables
146 # xid_cv: Condition variable (semaphore) for packet waiters
Dan Talayco21c75c72010-02-12 22:59:24 -0800147 # xid: Transaction ID being waited on
148 # xid_response: Transaction response message
149 self.xid_cv = Condition()
150 self.xid = None
151 self.xid_response = None
152
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800153 self.buffered_input = ""
Dan Talaycoe226eb12010-02-18 23:06:30 -0800154
Dan Talaycof8de5182012-04-12 22:38:41 -0700155 def filter_packet(self, rawmsg, hdr):
156 """
157 Check if packet should be filtered
158
159 Currently filters packet in messages
160 @return Boolean, True if packet should be dropped
161 """
162 # Add check for packet in and rate limit
163 if self.filter_packet_in:
Rich Lanec4f071b2012-07-11 17:25:57 -0700164 # If we were dropping packets, report number dropped
165 # TODO dont drop expected packet ins
166 if self.pkt_in_run > self.pkt_in_filter_limit:
167 self.logger.debug("Dropped %d packet ins (%d total)"
168 % ((self.pkt_in_run -
169 self.pkt_in_filter_limit),
170 self.pkt_in_dropped))
171 self.pkt_in_run = 0
Dan Talaycof8de5182012-04-12 22:38:41 -0700172
173 return False
174
Dan Talaycod12b6612010-03-07 22:00:46 -0800175 def _pkt_handle(self, pkt):
176 """
177 Check for all packet handling conditions
178
179 Parse and verify message
180 Check if XID matches something waiting
181 Check if message is being expected for a poll operation
182 Check if keep alive is on and message is an echo request
183 Check if any registered handler wants the packet
184 Enqueue if none of those conditions is met
185
186 an echo request in case keep_alive is true, followed by
187 registered message handlers.
Glen Gibb6d467062010-07-08 16:15:08 -0700188 @param pkt The raw packet (string) which may contain multiple OF msgs
Dan Talaycod12b6612010-03-07 22:00:46 -0800189 """
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800190
191 # snag any left over data from last read()
192 pkt = self.buffered_input + pkt
193 self.buffered_input = ""
194
Glen Gibb6d467062010-07-08 16:15:08 -0700195 # Process each of the OF msgs inside the pkt
196 offset = 0
197 while offset < len(pkt):
198 # Parse the header to get type
199 hdr = of_header_parse(pkt[offset:])
Dan Talaycof8de5182012-04-12 22:38:41 -0700200 if not hdr or hdr.length == 0:
201 self.logger.error("Could not parse header")
202 self.logger.error("pkt len %d." % len(pkt))
203 if hdr:
204 self.logger.error("hdr len %d." % hdr.length)
205 self.logger.error("%s" % hex_dump_buffer(pkt[:200]))
206 self.kill()
Dan Talaycod12b6612010-03-07 22:00:46 -0800207 return
208
Glen Gibb6d467062010-07-08 16:15:08 -0700209 # Extract the raw message bytes
Ed Swierk836e5bd2012-03-20 11:08:53 -0700210 if (offset + hdr.length) > len(pkt):
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800211 break
Glen Gibb6d467062010-07-08 16:15:08 -0700212 rawmsg = pkt[offset : offset + hdr.length]
Dan Talayco4306d3e2011-09-07 09:42:26 -0700213 offset += hdr.length
Dan Talaycof8de5182012-04-12 22:38:41 -0700214
215 if self.filter_packet(rawmsg, hdr):
216 continue
217
218 self.logger.debug("Msg in: buf len %d. hdr.type %s. hdr.len %d" %
219 (len(pkt), ofp_type_map[hdr.type], hdr.length))
Glen Gibb6d467062010-07-08 16:15:08 -0700220 if hdr.version != OFP_VERSION:
221 self.logger.error("Version %d does not match OFTest version %d"
222 % (hdr.version, OFP_VERSION))
223 print "Version %d does not match OFTest version %d" % \
224 (hdr.version, OFP_VERSION)
Ken Chiangadc950f2012-10-05 13:50:03 -0700225 self.disconnect()
Dan Talaycof8de5182012-04-12 22:38:41 -0700226 return
Dan Talaycod12b6612010-03-07 22:00:46 -0800227
Glen Gibb6d467062010-07-08 16:15:08 -0700228 msg = of_message_parse(rawmsg)
229 if not msg:
230 self.parse_errors += 1
231 self.logger.warn("Could not parse message")
232 continue
233
Rich Lanec4f071b2012-07-11 17:25:57 -0700234 with self.sync:
235 # Check if transaction is waiting
236 with self.xid_cv:
237 if self.xid and hdr.xid == self.xid:
238 self.logger.debug("Matched expected XID " + str(hdr.xid))
239 self.xid_response = (msg, rawmsg)
240 self.xid = None
241 self.xid_cv.notify()
242 continue
Glen Gibb6d467062010-07-08 16:15:08 -0700243
Rich Lanec4f071b2012-07-11 17:25:57 -0700244 # Check if keep alive is set; if so, respond to echo requests
245 if self.keep_alive:
246 if hdr.type == OFPT_ECHO_REQUEST:
247 self.logger.debug("Responding to echo request")
248 rep = echo_reply()
249 rep.header.xid = hdr.xid
250 # Ignoring additional data
251 if self.message_send(rep.pack(), zero_xid=True) < 0:
252 self.logger.error("Error sending echo reply")
253 continue
Glen Gibb6d467062010-07-08 16:15:08 -0700254
Rich Lanec4f071b2012-07-11 17:25:57 -0700255 # Now check for message handlers; preference is given to
256 # handlers for a specific packet
257 handled = False
258 if hdr.type in self.handlers.keys():
259 handled = self.handlers[hdr.type](self, msg, rawmsg)
260 if not handled and ("all" in self.handlers.keys()):
261 handled = self.handlers["all"](self, msg, rawmsg)
Glen Gibb6d467062010-07-08 16:15:08 -0700262
Rich Lanec4f071b2012-07-11 17:25:57 -0700263 if not handled: # Not handled, enqueue
264 self.logger.debug("Enqueuing pkt type " + ofp_type_map[hdr.type])
265 with self.packets_cv:
266 if len(self.packets) >= self.max_pkts:
267 self.packets.pop(0)
268 self.packets_expired += 1
269 self.packets.append((msg, rawmsg))
270 self.packets_cv.notify_all()
271 self.packets_total += 1
272 else:
273 self.packets_handled += 1
274 self.logger.debug("Message handled by callback")
Glen Gibb6d467062010-07-08 16:15:08 -0700275
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800276 # end of 'while offset < len(pkt)'
277 # note that if offset = len(pkt), this is
278 # appends a harmless empty string
279 self.buffered_input += pkt[offset:]
Dan Talaycod12b6612010-03-07 22:00:46 -0800280
Dan Talayco710438c2010-02-18 15:16:07 -0800281 def _socket_ready_handle(self, s):
282 """
283 Handle an input-ready socket
Dan Talaycof8de5182012-04-12 22:38:41 -0700284
Dan Talayco710438c2010-02-18 15:16:07 -0800285 @param s The socket object that is ready
Dan Talaycof8de5182012-04-12 22:38:41 -0700286 @returns 0 on success, -1 on error
Dan Talayco710438c2010-02-18 15:16:07 -0800287 """
288
Dan Talayco69ca4d62012-11-15 11:50:22 -0800289 if self.passive and s and s == self.listen_socket:
Dan Talayco710438c2010-02-18 15:16:07 -0800290 if self.switch_socket:
Rich Lanee1da7ea2012-07-26 15:58:45 -0700291 self.logger.warning("Ignoring incoming connection; already connected to switch")
Rich Laneb4f8ecb2012-09-25 09:36:26 -0700292 (sock, addr) = self.listen_socket.accept()
293 sock.close()
Rich Lanee1da7ea2012-07-26 15:58:45 -0700294 return 0
Dan Talayco710438c2010-02-18 15:16:07 -0800295
Ken Chiange875baf2012-10-09 15:24:40 -0700296 try:
297 (sock, addr) = self.listen_socket.accept()
298 except:
299 self.logger.warning("Error on listen socket accept")
300 return -1
Ken Chiang77173992012-10-30 15:44:39 -0700301 self.logger.info(self.host+":"+str(self.port)+": Incoming connection from "+str(addr))
Rich Lanee1da7ea2012-07-26 15:58:45 -0700302
Rich Laneee3586c2012-07-11 17:26:02 -0700303 with self.connect_cv:
Rich Lanee1da7ea2012-07-26 15:58:45 -0700304 (self.switch_socket, self.switch_addr) = (sock, addr)
Rich Lane82ef1832012-12-22 17:04:35 -0800305 self.switch_socket.setsockopt(socket.IPPROTO_TCP,
306 socket.TCP_NODELAY, True)
Rich Lane1a8d5aa2012-10-08 15:40:03 -0700307 if self.initial_hello:
308 self.message_send(hello())
Rich Lanee1da7ea2012-07-26 15:58:45 -0700309 self.connect_cv.notify() # Notify anyone waiting
Dan Talaycof8de5182012-04-12 22:38:41 -0700310 elif s and s == self.switch_socket:
311 for idx in range(3): # debug: try a couple of times
312 try:
313 pkt = self.switch_socket.recv(self.rcv_size)
314 except:
315 self.logger.warning("Error on switch read")
316 return -1
317
318 if not self.active:
319 return 0
320
321 if len(pkt) == 0:
322 self.logger.warning("Zero-length switch read, %d" % idx)
323 else:
324 break
Dan Talayco710438c2010-02-18 15:16:07 -0800325
Dan Talaycof8de5182012-04-12 22:38:41 -0700326 if len(pkt) == 0: # Still no packet
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700327 self.logger.warning("Zero-length switch read; closing cxn")
Dan Talaycof8de5182012-04-12 22:38:41 -0700328 self.logger.info(str(self))
329 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800330
Dan Talaycod12b6612010-03-07 22:00:46 -0800331 self._pkt_handle(pkt)
Rich Lane4dfd5e12012-12-22 19:48:01 -0800332 elif s and s == self.waker:
333 self.waker.wait()
Dan Talayco710438c2010-02-18 15:16:07 -0800334 else:
Dan Talayco48370102010-03-03 15:17:33 -0800335 self.logger.error("Unknown socket ready: " + str(s))
Dan Talaycof8de5182012-04-12 22:38:41 -0700336 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800337
Dan Talaycof8de5182012-04-12 22:38:41 -0700338 return 0
Dan Talayco710438c2010-02-18 15:16:07 -0800339
Dan Talayco69ca4d62012-11-15 11:50:22 -0800340 def active_connect(self):
341 """
342 Actively connect to a switch IP addr
343 """
344 try:
345 self.logger.info("Trying active connection to %s" % self.switch)
346 soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
347 soc.connect((self.switch, self.port))
348 self.logger.info("Connected to " + self.switch + " on " +
349 str(self.port))
Rich Lane82ef1832012-12-22 17:04:35 -0800350 soc.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800351 self.switch_addr = (self.switch, self.port)
352 return soc
353 except (StandardError, socket.error), e:
354 self.logger.error("Could not connect to %s at %d:: %s" %
355 (self.switch, self.port, str(e)))
356 return None
357
Rich Lane32797542012-12-22 17:46:05 -0800358 def wakeup(self):
359 """
360 Wake up the event loop, presumably from another thread.
361 """
Rich Lane4dfd5e12012-12-22 19:48:01 -0800362 self.waker.notify()
Rich Lane32797542012-12-22 17:46:05 -0800363
364 def sockets(self):
365 """
366 Return list of sockets to select on.
367 """
Rich Lane4dfd5e12012-12-22 19:48:01 -0800368 socs = [self.listen_socket, self.switch_socket, self.waker]
Rich Lane32797542012-12-22 17:46:05 -0800369 return [x for x in socs if x]
370
Dan Talayco1b3f6902010-02-15 14:14:19 -0800371 def run(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800372 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800373 Activity function for class
Dan Talayco21c75c72010-02-12 22:59:24 -0800374
Dan Talayco1b3f6902010-02-15 14:14:19 -0800375 Assumes connection to switch already exists. Listens on
376 switch_socket for messages until an error (or zero len pkt)
377 occurs.
Dan Talayco21c75c72010-02-12 22:59:24 -0800378
Dan Talayco1b3f6902010-02-15 14:14:19 -0800379 When there is a message on the socket, check for handlers; queue the
380 packet if no one handles the packet.
381
382 See note for controller describing the limitation of a single
383 connection for now.
384 """
385
Dan Talayco710438c2010-02-18 15:16:07 -0800386 self.dbg_state = "starting"
Dan Talayco1b3f6902010-02-15 14:14:19 -0800387
Dan Talayco710438c2010-02-18 15:16:07 -0800388 # Create listen socket
Dan Talayco69ca4d62012-11-15 11:50:22 -0800389 if self.passive:
390 self.logger.info("Create/listen at " + self.host + ":" +
391 str(self.port))
392 self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
393 self.listen_socket.setsockopt(socket.SOL_SOCKET,
394 socket.SO_REUSEADDR, 1)
395 self.listen_socket.bind((self.host, self.port))
396 self.dbg_state = "listening"
397 self.listen_socket.listen(LISTEN_QUEUE_SIZE)
Dan Talayco21c75c72010-02-12 22:59:24 -0800398
Dan Talayco69ca4d62012-11-15 11:50:22 -0800399 self.logger.info("Listening for switch connection")
Dan Talayco69ca4d62012-11-15 11:50:22 -0800400 self.dbg_state = "running"
Ken Chiangadc950f2012-10-05 13:50:03 -0700401
Dan Talayco710438c2010-02-18 15:16:07 -0800402 while self.active:
Dan Talayco710438c2010-02-18 15:16:07 -0800403 try:
404 sel_in, sel_out, sel_err = \
Rich Lane32797542012-12-22 17:46:05 -0800405 select.select(self.sockets(), [], self.sockets(), 1)
Dan Talayco710438c2010-02-18 15:16:07 -0800406 except:
407 print sys.exc_info()
Ken Chiangadc950f2012-10-05 13:50:03 -0700408 self.logger.error("Select error, disconnecting")
409 self.disconnect()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800410
Dan Talayco710438c2010-02-18 15:16:07 -0800411 for s in sel_err:
Ken Chiangadc950f2012-10-05 13:50:03 -0700412 self.logger.error("Got socket error on: " + str(s) + ", disconnecting")
413 self.disconnect()
Dan Talaycof8de5182012-04-12 22:38:41 -0700414
415 for s in sel_in:
416 if self._socket_ready_handle(s) == -1:
Ken Chiangadc950f2012-10-05 13:50:03 -0700417 self.disconnect()
Dan Talayco710438c2010-02-18 15:16:07 -0800418
Dan Talayco710438c2010-02-18 15:16:07 -0800419 # End of main loop
420 self.dbg_state = "closing"
Dan Talayco48370102010-03-03 15:17:33 -0800421 self.logger.info("Exiting controller thread")
Dan Talayco710438c2010-02-18 15:16:07 -0800422 self.shutdown()
423
Rich Lane8806bc42012-07-26 19:18:37 -0700424 def connect(self, timeout=-1):
Dan Talayco710438c2010-02-18 15:16:07 -0800425 """
426 Connect to the switch
427
Rich Lane8806bc42012-07-26 19:18:37 -0700428 @param timeout Block for up to timeout seconds. Pass -1 for the default.
Dan Talayco710438c2010-02-18 15:16:07 -0800429 @return Boolean, True if connected
430 """
431
Dan Talayco69ca4d62012-11-15 11:50:22 -0800432 if not self.passive: # Do active connection now
433 self.logger.info("Attempting to connect to %s on port %s" %
434 (self.switch, str(self.port)))
435 soc = self.active_connect()
436 if soc:
437 self.logger.info("Connected to %s", self.switch)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800438 self.dbg_state = "running"
439 self.switch_socket = soc
Rich Lane32797542012-12-22 17:46:05 -0800440 self.wakeup()
Dan Talayco69ca4d62012-11-15 11:50:22 -0800441 with self.connect_cv:
442 if self.initial_hello:
443 self.message_send(hello())
444 self.connect_cv.notify() # Notify anyone waiting
445 else:
446 self.logger.error("Could not actively connect to switch %s",
447 self.switch)
448 self.active = False
449 else:
450 with self.connect_cv:
451 timed_wait(self.connect_cv, lambda: self.switch_socket,
452 timeout=timeout)
453
Dan Talayco710438c2010-02-18 15:16:07 -0800454 return self.switch_socket is not None
455
Ken Chiangadc950f2012-10-05 13:50:03 -0700456 def disconnect(self, timeout=-1):
457 """
458 If connected to a switch, disconnect.
459 """
460 if self.switch_socket:
Ken Chiangadc950f2012-10-05 13:50:03 -0700461 self.switch_socket.close()
462 self.switch_socket = None
463 self.switch_addr = None
Ken Chiang74be4722012-12-21 13:07:03 -0800464 with self.packets_cv:
465 self.packets = []
Ken Chiange875baf2012-10-09 15:24:40 -0700466 with self.connect_cv:
467 self.connect_cv.notifyAll()
Ken Chiangadc950f2012-10-05 13:50:03 -0700468
469 def wait_disconnected(self, timeout=-1):
470 """
471 @param timeout Block for up to timeout seconds. Pass -1 for the default.
472 @return Boolean, True if disconnected
473 """
474
Ken Chiange875baf2012-10-09 15:24:40 -0700475 with self.connect_cv:
476 timed_wait(self.connect_cv,
Ken Chiangadc950f2012-10-05 13:50:03 -0700477 lambda: True if not self.switch_socket else None,
478 timeout=timeout)
479 return self.switch_socket is None
480
Dan Talayco710438c2010-02-18 15:16:07 -0800481 def kill(self):
482 """
483 Force the controller thread to quit
484
485 Just sets the active state variable to false and expects
486 the select timeout to kick in
487 """
488 self.active = False
Rich Lane32797542012-12-22 17:46:05 -0800489 self.wakeup()
Dan Talayco21c75c72010-02-12 22:59:24 -0800490
Dan Talayco1b3f6902010-02-15 14:14:19 -0800491 def shutdown(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800492 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800493 Shutdown the controller closing all sockets
Dan Talayco21c75c72010-02-12 22:59:24 -0800494
Dan Talayco1b3f6902010-02-15 14:14:19 -0800495 @todo Might want to synchronize shutdown with self.sync...
496 """
Dan Talaycof8de5182012-04-12 22:38:41 -0700497
Dan Talayco710438c2010-02-18 15:16:07 -0800498 self.active = False
499 try:
500 self.switch_socket.shutdown(socket.SHUT_RDWR)
501 except:
Dan Talayco48370102010-03-03 15:17:33 -0800502 self.logger.info("Ignoring switch soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800503 self.switch_socket = None
Dan Talayco1b3f6902010-02-15 14:14:19 -0800504
Dan Talayco710438c2010-02-18 15:16:07 -0800505 try:
506 self.listen_socket.shutdown(socket.SHUT_RDWR)
507 except:
Dan Talayco48370102010-03-03 15:17:33 -0800508 self.logger.info("Ignoring listen soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800509 self.listen_socket = None
Dan Talaycof8de5182012-04-12 22:38:41 -0700510
Rich Laneee3586c2012-07-11 17:26:02 -0700511 # Wakeup condition variables on which controller may be wait
512 with self.xid_cv:
513 self.xid_cv.notifyAll()
Dan Talaycof8de5182012-04-12 22:38:41 -0700514
Rich Laneee3586c2012-07-11 17:26:02 -0700515 with self.connect_cv:
516 self.connect_cv.notifyAll()
Dan Talaycof8de5182012-04-12 22:38:41 -0700517
Rich Lane32797542012-12-22 17:46:05 -0800518 self.wakeup()
Dan Talayco710438c2010-02-18 15:16:07 -0800519 self.dbg_state = "down"
520
Dan Talayco34089522010-02-07 23:07:41 -0800521 def register(self, msg_type, handler):
522 """
523 Register a callback to receive a specific message type.
524
525 Only one handler may be registered for a given message type.
Dan Talaycod12b6612010-03-07 22:00:46 -0800526
527 WARNING: A lock is held during the handler call back, so
528 the handler should not make any blocking calls
529
Dan Talayco34089522010-02-07 23:07:41 -0800530 @param msg_type The type of message to receive. May be DEFAULT
Dan Talayco21c75c72010-02-12 22:59:24 -0800531 for all non-handled packets. The special type, the string "all"
532 will send all packets to the handler.
Dan Talayco34089522010-02-07 23:07:41 -0800533 @param handler The function to call when a message of the given
534 type is received.
535 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800536 # Should check type is valid
537 if not handler and msg_type in self.handlers.keys():
538 del self.handlers[msg_type]
539 return
540 self.handlers[msg_type] = handler
Dan Talayco34089522010-02-07 23:07:41 -0800541
Rich Laneb64ce3d2012-07-26 15:37:57 -0700542 def poll(self, exp_msg=None, timeout=-1):
Dan Talayco34089522010-02-07 23:07:41 -0800543 """
544 Wait for the next OF message received from the switch.
545
546 @param exp_msg If set, return only when this type of message
Dan Talayco48370102010-03-03 15:17:33 -0800547 is received (unless timeout occurs).
Rich Laneb64ce3d2012-07-26 15:37:57 -0700548
549 @param timeout Maximum number of seconds to wait for the message.
550 Pass -1 for the default timeout.
Dan Talayco34089522010-02-07 23:07:41 -0800551
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800552 @retval A pair (msg, pkt) where msg is a message object and pkt
553 the string representing the packet as received from the socket.
554 This allows additional parsing by the receiver if necessary.
555
Dan Talayco34089522010-02-07 23:07:41 -0800556 The data members in the message are in host endian order.
Dan Talayco48370102010-03-03 15:17:33 -0800557 If an error occurs, (None, None) is returned
Dan Talayco34089522010-02-07 23:07:41 -0800558 """
Dan Talayco34089522010-02-07 23:07:41 -0800559
Ken Chiang77173992012-10-30 15:44:39 -0700560 if exp_msg is not None:
Ed Swierk9e55e282012-08-22 06:57:28 -0700561 self.logger.debug("Poll for %s" % ofp_type_map[exp_msg])
562 else:
563 self.logger.debug("Poll for any OF message")
Rich Laneb64ce3d2012-07-26 15:37:57 -0700564
565 # Take the packet from the queue
Rich Lanec4f071b2012-07-11 17:25:57 -0700566 def grab():
567 if len(self.packets) > 0:
Ken Chiang77173992012-10-30 15:44:39 -0700568 if exp_msg is None:
Rich Lanec4f071b2012-07-11 17:25:57 -0700569 self.logger.debug("Looking for any packet")
570 (msg, pkt) = self.packets.pop(0)
571 return (msg, pkt)
572 else:
573 self.logger.debug("Looking for %s" % ofp_type_map[exp_msg])
574 for i in range(len(self.packets)):
575 msg = self.packets[i][0]
576 self.logger.debug("Checking packets[%d] (%s)" % (i, ofp_type_map[msg.header.type]))
577 if msg.header.type == exp_msg:
578 (msg, pkt) = self.packets.pop(i)
579 return (msg, pkt)
580 # Not found
581 self.logger.debug("Packet not in queue")
Rich Laneb64ce3d2012-07-26 15:37:57 -0700582 return None
Dan Talayco21c75c72010-02-12 22:59:24 -0800583
Rich Lanec4f071b2012-07-11 17:25:57 -0700584 with self.packets_cv:
Rich Lane8806bc42012-07-26 19:18:37 -0700585 ret = timed_wait(self.packets_cv, grab, timeout=timeout)
Rich Lanec4f071b2012-07-11 17:25:57 -0700586
Rich Laneb64ce3d2012-07-26 15:37:57 -0700587 if ret != None:
588 (msg, pkt) = ret
589 self.logger.debug("Got message %s" % str(msg))
590 return (msg, pkt)
591 else:
592 return (None, None)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800593
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700594 def transact(self, msg, timeout=-1, zero_xid=False):
Dan Talayco34089522010-02-07 23:07:41 -0800595 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800596 Run a message transaction with the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800597
598 Send the message in msg and wait for a reply with a matching
Dan Talayco21c75c72010-02-12 22:59:24 -0800599 transaction id. Transactions have the highest priority in
600 received message handling.
Dan Talaycoe37999f2010-02-09 15:27:12 -0800601
Dan Talayco21c75c72010-02-12 22:59:24 -0800602 @param msg The message object to send; must not be a string
Rich Lanee1da7ea2012-07-26 15:58:45 -0700603 @param timeout The timeout in seconds; if -1 use default.
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800604 @param zero_xid Normally, if the XID is 0 an XID will be generated
Ed Swierk9e55e282012-08-22 06:57:28 -0700605 for the message. Set zero_xid to override this behavior
Dan Talayco21c75c72010-02-12 22:59:24 -0800606 @return The matching message object or None if unsuccessful
Dan Talaycoe37999f2010-02-09 15:27:12 -0800607
Dan Talayco34089522010-02-07 23:07:41 -0800608 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800609
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800610 if not zero_xid and msg.header.xid == 0:
611 msg.header.xid = gen_xid()
612
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700613 self.logger.debug("Running transaction %d" % msg.header.xid)
Dan Talayco21c75c72010-02-12 22:59:24 -0800614
Rich Lane9aca1992012-07-11 17:26:31 -0700615 with self.xid_cv:
616 if self.xid:
617 self.logger.error("Can only run one transaction at a time")
618 return (None, None)
Dan Talaycof8de5182012-04-12 22:38:41 -0700619
Rich Lane9aca1992012-07-11 17:26:31 -0700620 self.xid = msg.header.xid
Dan Talaycod12b6612010-03-07 22:00:46 -0800621 self.xid_response = None
Rich Lane9aca1992012-07-11 17:26:31 -0700622 if self.message_send(msg.pack()) < 0:
623 self.logger.error("Error sending pkt for transaction %d" %
624 msg.header.xid)
625 return (None, None)
626
Rich Lane8806bc42012-07-26 19:18:37 -0700627 self.logger.debug("Waiting for transaction %d" % msg.header.xid)
Rich Lanee1da7ea2012-07-26 15:58:45 -0700628 timed_wait(self.xid_cv, lambda: self.xid_response, timeout=timeout)
Rich Lane9aca1992012-07-11 17:26:31 -0700629
630 if self.xid_response:
631 (resp, pkt) = self.xid_response
632 self.xid_response = None
633 else:
634 (resp, pkt) = (None, None)
635
Dan Talayco09c2c592010-05-13 14:21:52 -0700636 if resp is None:
637 self.logger.warning("No response for xid " + str(self.xid))
638 return (resp, pkt)
Dan Talayco34089522010-02-07 23:07:41 -0800639
Dan Talayco710438c2010-02-18 15:16:07 -0800640 def message_send(self, msg, zero_xid=False):
Dan Talayco34089522010-02-07 23:07:41 -0800641 """
642 Send the message to the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800643
Dan Talayco11c26e72010-03-07 22:03:57 -0800644 @param msg A string or OpenFlow message object to be forwarded to
645 the switch.
646 @param zero_xid If msg is an OpenFlow object (not a string) and if
Dan Talayco710438c2010-02-18 15:16:07 -0800647 the XID in the header is 0, then an XID will be generated
Ed Swierk9e55e282012-08-22 06:57:28 -0700648 for the message. Set zero_xid to override this behavior (and keep an
Dan Talayco710438c2010-02-18 15:16:07 -0800649 existing 0 xid)
Dan Talaycoe37999f2010-02-09 15:27:12 -0800650
Ed Swierk9e55e282012-08-22 06:57:28 -0700651 @return 0 on success
Dan Talayco34089522010-02-07 23:07:41 -0800652
Dan Talayco21c75c72010-02-12 22:59:24 -0800653 """
654
Dan Talayco1b3f6902010-02-15 14:14:19 -0800655 if not self.switch_socket:
656 # Sending a string indicates the message is ready to go
Ed Swierk9e55e282012-08-22 06:57:28 -0700657 raise Exception("no socket")
Dan Talayco710438c2010-02-18 15:16:07 -0800658 #@todo If not string, try to pack
Dan Talayco21c75c72010-02-12 22:59:24 -0800659 if type(msg) != type(""):
Ed Swierk9e55e282012-08-22 06:57:28 -0700660 if msg.header.xid == 0 and not zero_xid:
661 msg.header.xid = gen_xid()
662 outpkt = msg.pack()
Dan Talayco710438c2010-02-18 15:16:07 -0800663 else:
664 outpkt = msg
Dan Talayco21c75c72010-02-12 22:59:24 -0800665
Dan Talayco48370102010-03-03 15:17:33 -0800666 self.logger.debug("Sending pkt of len " + str(len(outpkt)))
Ed Swierk9e55e282012-08-22 06:57:28 -0700667 if self.switch_socket.sendall(outpkt) is not None:
668 raise Exception("unknown error on sendall")
Dan Talayco710438c2010-02-18 15:16:07 -0800669
Ed Swierk9e55e282012-08-22 06:57:28 -0700670 return 0
Dan Talayco21c75c72010-02-12 22:59:24 -0800671
672 def __str__(self):
673 string = "Controller:\n"
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800674 string += " state " + self.dbg_state + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800675 string += " switch_addr " + str(self.switch_addr) + "\n"
676 string += " pending pkts " + str(len(self.packets)) + "\n"
677 string += " total pkts " + str(self.packets_total) + "\n"
678 string += " expired pkts " + str(self.packets_expired) + "\n"
679 string += " handled pkts " + str(self.packets_handled) + "\n"
Dan Talaycoe226eb12010-02-18 23:06:30 -0800680 string += " poll discards " + str(self.poll_discards) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800681 string += " parse errors " + str(self.parse_errors) + "\n"
682 string += " sock errrors " + str(self.socket_errors) + "\n"
683 string += " max pkts " + str(self.max_pkts) + "\n"
Dan Talayco69ca4d62012-11-15 11:50:22 -0800684 string += " target switch " + str(self.switch) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800685 string += " host " + str(self.host) + "\n"
686 string += " port " + str(self.port) + "\n"
687 string += " keep_alive " + str(self.keep_alive) + "\n"
Dan Talaycof8de5182012-04-12 22:38:41 -0700688 string += " pkt_in_run " + str(self.pkt_in_run) + "\n"
689 string += " pkt_in_dropped " + str(self.pkt_in_dropped) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800690 return string
691
692 def show(self):
693 print str(self)
694
695def sample_handler(controller, msg, pkt):
696 """
697 Sample message handler
698
699 This is the prototype for functions registered with the controller
700 class for packet reception
701
702 @param controller The controller calling the handler
703 @param msg The parsed message object
704 @param pkt The raw packet that was received on the socket. This is
705 in case the packet contains extra unparsed data.
706 @returns Boolean value indicating if the packet was handled. If
707 not handled, the packet is placed in the queue for pollers to received
708 """
709 pass