blob: 764662f455d4ab7d1ec779b182590ccda7190733 [file] [log] [blame]
Dan Talayco34089522010-02-07 23:07:41 -08001"""
2OpenFlow Test Framework
3
4Controller class
5
6Provide the interface to the control channel to the switch under test.
7
8Class inherits from thread so as to run in background allowing
9asynchronous callbacks (if needed, not required). Also supports
10polling.
11
12The controller thread maintains a queue. Incoming messages that
13are not handled by a callback function are placed in this queue for
14poll calls.
15
16Callbacks and polling support specifying the message type
17
18@todo Support transaction semantics via xid
Dan Talayco1b3f6902010-02-15 14:14:19 -080019@todo Support select and listen on an administrative socket (or
20use a timeout to support clean shutdown).
21
22Currently only one connection is accepted during the life of
23the controller. There seems
24to be no clean way to interrupt an accept call. Using select that also listens
25on an administrative socket and can shut down the socket might work.
26
Dan Talayco34089522010-02-07 23:07:41 -080027"""
28
Dan Talayco34089522010-02-07 23:07:41 -080029import os
30import socket
31import time
Dan Talayco34089522010-02-07 23:07:41 -080032from threading import Thread
33from threading import Lock
Dan Talayco21c75c72010-02-12 22:59:24 -080034from threading import Condition
Dan Talayco34089522010-02-07 23:07:41 -080035from message import *
Dan Talaycoe37999f2010-02-09 15:27:12 -080036from parse import *
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080037from ofutils import *
Dan Talayco710438c2010-02-18 15:16:07 -080038# For some reason, it seems select to be last (or later).
39# Otherwise get an attribute error when calling select.select
40import select
Dan Talayco48370102010-03-03 15:17:33 -080041import logging
42
Dan Talaycof8de5182012-04-12 22:38:41 -070043
44FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.'
45 for x in range(256)])
46
47def hex_dump_buffer(src, length=16):
48 """
49 Convert src to a hex dump string and return the string
50 @param src The source buffer
51 @param length The number of bytes shown in each line
52 @returns A string showing the hex dump
53 """
54 result = ["\n"]
55 for i in xrange(0, len(src), length):
56 chars = src[i:i+length]
57 hex = ' '.join(["%02x" % ord(x) for x in chars])
58 printable = ''.join(["%s" % ((ord(x) <= 127 and
59 FILTER[ord(x)]) or '.') for x in chars])
60 result.append("%04x %-*s %s\n" % (i, length*3, hex, printable))
61 return ''.join(result)
62
Dan Talayco48370102010-03-03 15:17:33 -080063##@todo Find a better home for these identifiers (controller)
Glen Gibb741b1182010-07-08 16:43:58 -070064RCV_SIZE_DEFAULT = 32768
Dan Talayco48370102010-03-03 15:17:33 -080065LISTEN_QUEUE_SIZE = 1
Dan Talayco34089522010-02-07 23:07:41 -080066
67class Controller(Thread):
68 """
69 Class abstracting the control interface to the switch.
70
71 For receiving messages, two mechanism will be implemented. First,
72 query the interface with poll. Second, register to have a
73 function called by message type. The callback is passed the
74 message type as well as the raw packet (or message object)
75
76 One of the main purposes of this object is to translate between network
77 and host byte order. 'Above' this object, things should be in host
78 byte order.
Dan Talayco21c75c72010-02-12 22:59:24 -080079
80 @todo Consider using SocketServer for listening socket
81 @todo Test transaction code
82
83 @var rcv_size The receive size to use for receive calls
84 @var max_pkts The max size of the receive queue
85 @var keep_alive If true, listen for echo requests and respond w/
86 echo replies
Dan Talayco710438c2010-02-18 15:16:07 -080087 @var initial_hello If true, will send a hello message immediately
88 upon connecting to the switch
Dan Talayco21c75c72010-02-12 22:59:24 -080089 @var host The host to use for connect
90 @var port The port to connect on
91 @var packets_total Total number of packets received
92 @var packets_expired Number of packets popped from queue as queue full
93 @var packets_handled Number of packets handled by something
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080094 @var dbg_state Debug indication of state
Dan Talayco34089522010-02-07 23:07:41 -080095 """
96
Dan Talayco48370102010-03-03 15:17:33 -080097 def __init__(self, host='127.0.0.1', port=6633, max_pkts=1024):
Dan Talayco21c75c72010-02-12 22:59:24 -080098 Thread.__init__(self)
Dan Talayco1b3f6902010-02-15 14:14:19 -080099 # Socket related
Dan Talayco21c75c72010-02-12 22:59:24 -0800100 self.rcv_size = RCV_SIZE_DEFAULT
Dan Talayco1b3f6902010-02-15 14:14:19 -0800101 self.listen_socket = None
102 self.switch_socket = None
103 self.switch_addr = None
Dan Talayco710438c2010-02-18 15:16:07 -0800104 self.socs = []
105 self.connect_cv = Condition()
Dan Talaycoe226eb12010-02-18 23:06:30 -0800106 self.message_cv = Condition()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800107
108 # Counters
Dan Talayco21c75c72010-02-12 22:59:24 -0800109 self.socket_errors = 0
110 self.parse_errors = 0
Dan Talayco21c75c72010-02-12 22:59:24 -0800111 self.packets_total = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -0800112 self.packets_expired = 0
113 self.packets_handled = 0
Dan Talaycoe226eb12010-02-18 23:06:30 -0800114 self.poll_discards = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -0800115
116 # State
Dan Talayco21c75c72010-02-12 22:59:24 -0800117 self.sync = Lock()
118 self.handlers = {}
119 self.keep_alive = False
Dan Talayco710438c2010-02-18 15:16:07 -0800120 self.active = True
121 self.initial_hello = True
Dan Talayco1b3f6902010-02-15 14:14:19 -0800122
Rich Lanec4f071b2012-07-11 17:25:57 -0700123 # OpenFlow message/packet queue
124 # Protected by the packets_cv lock / condition variable
125 self.packets = []
126 self.packets_cv = Condition()
127
Dan Talayco1b3f6902010-02-15 14:14:19 -0800128 # Settings
129 self.max_pkts = max_pkts
130 self.passive = True
Dan Talayco48370102010-03-03 15:17:33 -0800131 self.host = host
132 self.port = port
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800133 self.dbg_state = "init"
Dan Talayco48370102010-03-03 15:17:33 -0800134 self.logger = logging.getLogger("controller")
Dan Talaycof8de5182012-04-12 22:38:41 -0700135 self.filter_packet_in = False # Drop "excessive" packet ins
136 self.pkt_in_run = 0 # Count on run of packet ins
137 self.pkt_in_filter_limit = 50 # Count on run of packet ins
138 self.pkt_in_dropped = 0 # Total dropped packet ins
139 self.transact_to = 15 # Transact timeout default value; add to config
Dan Talayco1b3f6902010-02-15 14:14:19 -0800140
Dan Talaycoe226eb12010-02-18 23:06:30 -0800141 # Transaction and message type waiting variables
142 # xid_cv: Condition variable (semaphore) for packet waiters
Dan Talayco21c75c72010-02-12 22:59:24 -0800143 # xid: Transaction ID being waited on
144 # xid_response: Transaction response message
145 self.xid_cv = Condition()
146 self.xid = None
147 self.xid_response = None
148
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800149 self.buffered_input = ""
Dan Talaycoe226eb12010-02-18 23:06:30 -0800150
Dan Talaycof8de5182012-04-12 22:38:41 -0700151 def filter_packet(self, rawmsg, hdr):
152 """
153 Check if packet should be filtered
154
155 Currently filters packet in messages
156 @return Boolean, True if packet should be dropped
157 """
158 # Add check for packet in and rate limit
159 if self.filter_packet_in:
Rich Lanec4f071b2012-07-11 17:25:57 -0700160 # If we were dropping packets, report number dropped
161 # TODO dont drop expected packet ins
162 if self.pkt_in_run > self.pkt_in_filter_limit:
163 self.logger.debug("Dropped %d packet ins (%d total)"
164 % ((self.pkt_in_run -
165 self.pkt_in_filter_limit),
166 self.pkt_in_dropped))
167 self.pkt_in_run = 0
Dan Talaycof8de5182012-04-12 22:38:41 -0700168
169 return False
170
Dan Talaycod12b6612010-03-07 22:00:46 -0800171 def _pkt_handle(self, pkt):
172 """
173 Check for all packet handling conditions
174
175 Parse and verify message
176 Check if XID matches something waiting
177 Check if message is being expected for a poll operation
178 Check if keep alive is on and message is an echo request
179 Check if any registered handler wants the packet
180 Enqueue if none of those conditions is met
181
182 an echo request in case keep_alive is true, followed by
183 registered message handlers.
Glen Gibb6d467062010-07-08 16:15:08 -0700184 @param pkt The raw packet (string) which may contain multiple OF msgs
Dan Talaycod12b6612010-03-07 22:00:46 -0800185 """
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800186
187 # snag any left over data from last read()
188 pkt = self.buffered_input + pkt
189 self.buffered_input = ""
190
Glen Gibb6d467062010-07-08 16:15:08 -0700191 # Process each of the OF msgs inside the pkt
192 offset = 0
193 while offset < len(pkt):
194 # Parse the header to get type
195 hdr = of_header_parse(pkt[offset:])
Dan Talaycof8de5182012-04-12 22:38:41 -0700196 if not hdr or hdr.length == 0:
197 self.logger.error("Could not parse header")
198 self.logger.error("pkt len %d." % len(pkt))
199 if hdr:
200 self.logger.error("hdr len %d." % hdr.length)
201 self.logger.error("%s" % hex_dump_buffer(pkt[:200]))
202 self.kill()
Dan Talaycod12b6612010-03-07 22:00:46 -0800203 return
204
Glen Gibb6d467062010-07-08 16:15:08 -0700205 # Extract the raw message bytes
Ed Swierk836e5bd2012-03-20 11:08:53 -0700206 if (offset + hdr.length) > len(pkt):
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800207 break
Glen Gibb6d467062010-07-08 16:15:08 -0700208 rawmsg = pkt[offset : offset + hdr.length]
Dan Talayco4306d3e2011-09-07 09:42:26 -0700209 offset += hdr.length
Dan Talaycof8de5182012-04-12 22:38:41 -0700210
211 if self.filter_packet(rawmsg, hdr):
212 continue
213
214 self.logger.debug("Msg in: buf len %d. hdr.type %s. hdr.len %d" %
215 (len(pkt), ofp_type_map[hdr.type], hdr.length))
Glen Gibb6d467062010-07-08 16:15:08 -0700216 if hdr.version != OFP_VERSION:
217 self.logger.error("Version %d does not match OFTest version %d"
218 % (hdr.version, OFP_VERSION))
219 print "Version %d does not match OFTest version %d" % \
220 (hdr.version, OFP_VERSION)
221 self.active = False
222 self.switch_socket = None
Dan Talaycof8de5182012-04-12 22:38:41 -0700223 return
Dan Talaycod12b6612010-03-07 22:00:46 -0800224
Glen Gibb6d467062010-07-08 16:15:08 -0700225 msg = of_message_parse(rawmsg)
226 if not msg:
227 self.parse_errors += 1
228 self.logger.warn("Could not parse message")
229 continue
230
Rich Lanec4f071b2012-07-11 17:25:57 -0700231 with self.sync:
232 # Check if transaction is waiting
233 with self.xid_cv:
234 if self.xid and hdr.xid == self.xid:
235 self.logger.debug("Matched expected XID " + str(hdr.xid))
236 self.xid_response = (msg, rawmsg)
237 self.xid = None
238 self.xid_cv.notify()
239 continue
Glen Gibb6d467062010-07-08 16:15:08 -0700240
Rich Lanec4f071b2012-07-11 17:25:57 -0700241 # Check if keep alive is set; if so, respond to echo requests
242 if self.keep_alive:
243 if hdr.type == OFPT_ECHO_REQUEST:
244 self.logger.debug("Responding to echo request")
245 rep = echo_reply()
246 rep.header.xid = hdr.xid
247 # Ignoring additional data
248 if self.message_send(rep.pack(), zero_xid=True) < 0:
249 self.logger.error("Error sending echo reply")
250 continue
Glen Gibb6d467062010-07-08 16:15:08 -0700251
Rich Lanec4f071b2012-07-11 17:25:57 -0700252 # Now check for message handlers; preference is given to
253 # handlers for a specific packet
254 handled = False
255 if hdr.type in self.handlers.keys():
256 handled = self.handlers[hdr.type](self, msg, rawmsg)
257 if not handled and ("all" in self.handlers.keys()):
258 handled = self.handlers["all"](self, msg, rawmsg)
Glen Gibb6d467062010-07-08 16:15:08 -0700259
Rich Lanec4f071b2012-07-11 17:25:57 -0700260 if not handled: # Not handled, enqueue
261 self.logger.debug("Enqueuing pkt type " + ofp_type_map[hdr.type])
262 with self.packets_cv:
263 if len(self.packets) >= self.max_pkts:
264 self.packets.pop(0)
265 self.packets_expired += 1
266 self.packets.append((msg, rawmsg))
267 self.packets_cv.notify_all()
268 self.packets_total += 1
269 else:
270 self.packets_handled += 1
271 self.logger.debug("Message handled by callback")
Glen Gibb6d467062010-07-08 16:15:08 -0700272
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800273 # end of 'while offset < len(pkt)'
274 # note that if offset = len(pkt), this is
275 # appends a harmless empty string
276 self.buffered_input += pkt[offset:]
Dan Talaycod12b6612010-03-07 22:00:46 -0800277
Dan Talayco710438c2010-02-18 15:16:07 -0800278 def _socket_ready_handle(self, s):
279 """
280 Handle an input-ready socket
Dan Talaycof8de5182012-04-12 22:38:41 -0700281
Dan Talayco710438c2010-02-18 15:16:07 -0800282 @param s The socket object that is ready
Dan Talaycof8de5182012-04-12 22:38:41 -0700283 @returns 0 on success, -1 on error
Dan Talayco710438c2010-02-18 15:16:07 -0800284 """
285
Dan Talaycof8de5182012-04-12 22:38:41 -0700286 if s and s == self.listen_socket:
Dan Talayco710438c2010-02-18 15:16:07 -0800287 if self.switch_socket:
Dan Talaycof8de5182012-04-12 22:38:41 -0700288 return 0 # Ignore listen socket while connected to switch
Dan Talayco710438c2010-02-18 15:16:07 -0800289
290 (self.switch_socket, self.switch_addr) = \
291 self.listen_socket.accept()
Dan Talayco48370102010-03-03 15:17:33 -0800292 self.logger.info("Got cxn to " + str(self.switch_addr))
Dan Talaycof8de5182012-04-12 22:38:41 -0700293 self.socs.append(self.switch_socket)
Dan Talayco710438c2010-02-18 15:16:07 -0800294 # Notify anyone waiting
Rich Laneee3586c2012-07-11 17:26:02 -0700295 with self.connect_cv:
296 self.connect_cv.notify()
Dan Talayco710438c2010-02-18 15:16:07 -0800297 if self.initial_hello:
298 self.message_send(hello())
Dan Talaycof8de5182012-04-12 22:38:41 -0700299 ## @fixme Check return code
300 elif s and s == self.switch_socket:
301 for idx in range(3): # debug: try a couple of times
302 try:
303 pkt = self.switch_socket.recv(self.rcv_size)
304 except:
305 self.logger.warning("Error on switch read")
306 return -1
307
308 if not self.active:
309 return 0
310
311 if len(pkt) == 0:
312 self.logger.warning("Zero-length switch read, %d" % idx)
313 else:
314 break
Dan Talayco710438c2010-02-18 15:16:07 -0800315
Dan Talaycof8de5182012-04-12 22:38:41 -0700316 if len(pkt) == 0: # Still no packet
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700317 self.logger.warning("Zero-length switch read; closing cxn")
Dan Talaycof8de5182012-04-12 22:38:41 -0700318 self.logger.info(str(self))
319 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800320
Dan Talaycod12b6612010-03-07 22:00:46 -0800321 self._pkt_handle(pkt)
Dan Talayco710438c2010-02-18 15:16:07 -0800322 else:
Dan Talayco48370102010-03-03 15:17:33 -0800323 self.logger.error("Unknown socket ready: " + str(s))
Dan Talaycof8de5182012-04-12 22:38:41 -0700324 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800325
Dan Talaycof8de5182012-04-12 22:38:41 -0700326 return 0
Dan Talayco710438c2010-02-18 15:16:07 -0800327
Dan Talayco1b3f6902010-02-15 14:14:19 -0800328 def run(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800329 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800330 Activity function for class
Dan Talayco21c75c72010-02-12 22:59:24 -0800331
Dan Talayco1b3f6902010-02-15 14:14:19 -0800332 Assumes connection to switch already exists. Listens on
333 switch_socket for messages until an error (or zero len pkt)
334 occurs.
Dan Talayco21c75c72010-02-12 22:59:24 -0800335
Dan Talayco1b3f6902010-02-15 14:14:19 -0800336 When there is a message on the socket, check for handlers; queue the
337 packet if no one handles the packet.
338
339 See note for controller describing the limitation of a single
340 connection for now.
341 """
342
Dan Talayco710438c2010-02-18 15:16:07 -0800343 self.dbg_state = "starting"
Dan Talayco1b3f6902010-02-15 14:14:19 -0800344
Dan Talayco710438c2010-02-18 15:16:07 -0800345 # Create listen socket
Dan Talayco48370102010-03-03 15:17:33 -0800346 self.logger.info("Create/listen at " + self.host + ":" +
Dan Talayco710438c2010-02-18 15:16:07 -0800347 str(self.port))
348 self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
349 self.listen_socket.setsockopt(socket.SOL_SOCKET,
350 socket.SO_REUSEADDR, 1)
351 self.listen_socket.bind((self.host, self.port))
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800352 self.dbg_state = "listening"
Dan Talayco21c75c72010-02-12 22:59:24 -0800353 self.listen_socket.listen(LISTEN_QUEUE_SIZE)
Dan Talayco21c75c72010-02-12 22:59:24 -0800354
Dan Talayco48370102010-03-03 15:17:33 -0800355 self.logger.info("Waiting for switch connection")
Dan Talayco710438c2010-02-18 15:16:07 -0800356 self.socs = [self.listen_socket]
357 self.dbg_state = "running"
358 while self.active:
Dan Talayco710438c2010-02-18 15:16:07 -0800359 try:
360 sel_in, sel_out, sel_err = \
361 select.select(self.socs, [], self.socs, 1)
362 except:
363 print sys.exc_info()
Dan Talayco48370102010-03-03 15:17:33 -0800364 self.logger.error("Select error, exiting")
Dan Talaycof8de5182012-04-12 22:38:41 -0700365 self.active = False
Dan Talayco710438c2010-02-18 15:16:07 -0800366 break
Dan Talayco1b3f6902010-02-15 14:14:19 -0800367
Dan Talayco710438c2010-02-18 15:16:07 -0800368 for s in sel_err:
Dan Talayco48370102010-03-03 15:17:33 -0800369 self.logger.error("Got socket error on: " + str(s))
Dan Talaycof8de5182012-04-12 22:38:41 -0700370 self.active = False
371 break
372
373 for s in sel_in:
374 if self._socket_ready_handle(s) == -1:
Dan Talayco710438c2010-02-18 15:16:07 -0800375 self.active = False
376 break
377
Dan Talayco710438c2010-02-18 15:16:07 -0800378 # End of main loop
379 self.dbg_state = "closing"
Dan Talayco48370102010-03-03 15:17:33 -0800380 self.logger.info("Exiting controller thread")
Dan Talayco710438c2010-02-18 15:16:07 -0800381 self.shutdown()
382
383 def connect(self, timeout=None):
384 """
385 Connect to the switch
386
387 @param timeout If None, block until connected. If 0, return
388 immedidately. Otherwise, block for up to timeout seconds
389 @return Boolean, True if connected
390 """
391
392 if timeout == 0:
393 return self.switch_socket is not None
394 if self.switch_socket is not None:
395 return True
Rich Laneee3586c2012-07-11 17:26:02 -0700396 with self.connect_cv:
397 self.connect_cv.wait(timeout)
Dan Talayco710438c2010-02-18 15:16:07 -0800398
399 return self.switch_socket is not None
400
401 def kill(self):
402 """
403 Force the controller thread to quit
404
405 Just sets the active state variable to false and expects
406 the select timeout to kick in
407 """
408 self.active = False
Dan Talayco21c75c72010-02-12 22:59:24 -0800409
Dan Talayco1b3f6902010-02-15 14:14:19 -0800410 def shutdown(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800411 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800412 Shutdown the controller closing all sockets
Dan Talayco21c75c72010-02-12 22:59:24 -0800413
Dan Talayco1b3f6902010-02-15 14:14:19 -0800414 @todo Might want to synchronize shutdown with self.sync...
415 """
Dan Talaycof8de5182012-04-12 22:38:41 -0700416
Dan Talayco710438c2010-02-18 15:16:07 -0800417 self.active = False
418 try:
419 self.switch_socket.shutdown(socket.SHUT_RDWR)
420 except:
Dan Talayco48370102010-03-03 15:17:33 -0800421 self.logger.info("Ignoring switch soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800422 self.switch_socket = None
Dan Talayco1b3f6902010-02-15 14:14:19 -0800423
Dan Talayco710438c2010-02-18 15:16:07 -0800424 try:
425 self.listen_socket.shutdown(socket.SHUT_RDWR)
426 except:
Dan Talayco48370102010-03-03 15:17:33 -0800427 self.logger.info("Ignoring listen soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800428 self.listen_socket = None
Dan Talaycof8de5182012-04-12 22:38:41 -0700429
Rich Laneee3586c2012-07-11 17:26:02 -0700430 # Wakeup condition variables on which controller may be wait
431 with self.xid_cv:
432 self.xid_cv.notifyAll()
Dan Talaycof8de5182012-04-12 22:38:41 -0700433
Rich Laneee3586c2012-07-11 17:26:02 -0700434 with self.connect_cv:
435 self.connect_cv.notifyAll()
Dan Talaycof8de5182012-04-12 22:38:41 -0700436
Dan Talayco710438c2010-02-18 15:16:07 -0800437 self.dbg_state = "down"
438
Dan Talayco34089522010-02-07 23:07:41 -0800439 def register(self, msg_type, handler):
440 """
441 Register a callback to receive a specific message type.
442
443 Only one handler may be registered for a given message type.
Dan Talaycod12b6612010-03-07 22:00:46 -0800444
445 WARNING: A lock is held during the handler call back, so
446 the handler should not make any blocking calls
447
Dan Talayco34089522010-02-07 23:07:41 -0800448 @param msg_type The type of message to receive. May be DEFAULT
Dan Talayco21c75c72010-02-12 22:59:24 -0800449 for all non-handled packets. The special type, the string "all"
450 will send all packets to the handler.
Dan Talayco34089522010-02-07 23:07:41 -0800451 @param handler The function to call when a message of the given
452 type is received.
453 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800454 # Should check type is valid
455 if not handler and msg_type in self.handlers.keys():
456 del self.handlers[msg_type]
457 return
458 self.handlers[msg_type] = handler
Dan Talayco34089522010-02-07 23:07:41 -0800459
Dan Talayco21c75c72010-02-12 22:59:24 -0800460 def poll(self, exp_msg=None, timeout=None):
Dan Talayco34089522010-02-07 23:07:41 -0800461 """
462 Wait for the next OF message received from the switch.
463
464 @param exp_msg If set, return only when this type of message
Dan Talayco48370102010-03-03 15:17:33 -0800465 is received (unless timeout occurs).
Dan Talaycoe226eb12010-02-18 23:06:30 -0800466 @param timeout If None, do not block. Otherwise, sleep in
Dan Talayco48370102010-03-03 15:17:33 -0800467 intervals of 1 second until message is received.
Dan Talayco34089522010-02-07 23:07:41 -0800468
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800469 @retval A pair (msg, pkt) where msg is a message object and pkt
470 the string representing the packet as received from the socket.
471 This allows additional parsing by the receiver if necessary.
472
Dan Talayco34089522010-02-07 23:07:41 -0800473 The data members in the message are in host endian order.
Dan Talayco48370102010-03-03 15:17:33 -0800474 If an error occurs, (None, None) is returned
475
476 The current queue is searched for a message of the desired type
477 before sleeping on message in events.
Dan Talayco34089522010-02-07 23:07:41 -0800478 """
Dan Talayco34089522010-02-07 23:07:41 -0800479
Dan Talayco48370102010-03-03 15:17:33 -0800480 self.logger.debug("Poll for " + ofp_type_map[exp_msg])
Dan Talaycoe226eb12010-02-18 23:06:30 -0800481
Rich Lanec4f071b2012-07-11 17:25:57 -0700482 # Looks for the packet in the queue
483 def grab():
484 if len(self.packets) > 0:
485 if not exp_msg:
486 self.logger.debug("Looking for any packet")
487 (msg, pkt) = self.packets.pop(0)
488 return (msg, pkt)
489 else:
490 self.logger.debug("Looking for %s" % ofp_type_map[exp_msg])
491 for i in range(len(self.packets)):
492 msg = self.packets[i][0]
493 self.logger.debug("Checking packets[%d] (%s)" % (i, ofp_type_map[msg.header.type]))
494 if msg.header.type == exp_msg:
495 (msg, pkt) = self.packets.pop(i)
496 return (msg, pkt)
497 # Not found
498 self.logger.debug("Packet not in queue")
Dan Talaycoe226eb12010-02-18 23:06:30 -0800499 return (None, None)
Dan Talayco21c75c72010-02-12 22:59:24 -0800500
Rich Lanec4f071b2012-07-11 17:25:57 -0700501 # Non-blocking case
502 if timeout is None or timeout <= 0:
503 return grab()
Dan Talaycoe226eb12010-02-18 23:06:30 -0800504
Rich Lanec4f071b2012-07-11 17:25:57 -0700505 msg = pkt = None
506 self.logger.debug("Entering timeout (%fs)" % timeout)
507 end_time = time.time() + timeout
508 with self.packets_cv:
509 while True:
510 if time.time() > end_time:
511 self.logger.debug("Poll time out")
512 return (None, None)
513
514 (msg, pkt) = grab()
515 if msg != None:
516 self.logger.debug("Got msg " + str(msg))
517 return (msg, pkt)
518
519 # Go to sleep
520 remaining_time = end_time - time.time()
521 self.packets_cv.wait(remaining_time)
Dan Talaycoe226eb12010-02-18 23:06:30 -0800522
523 return (msg, pkt)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800524
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700525 def transact(self, msg, timeout=-1, zero_xid=False):
Dan Talayco34089522010-02-07 23:07:41 -0800526 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800527 Run a message transaction with the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800528
529 Send the message in msg and wait for a reply with a matching
Dan Talayco21c75c72010-02-12 22:59:24 -0800530 transaction id. Transactions have the highest priority in
531 received message handling.
Dan Talaycoe37999f2010-02-09 15:27:12 -0800532
Dan Talayco21c75c72010-02-12 22:59:24 -0800533 @param msg The message object to send; must not be a string
Dan Talaycof8de5182012-04-12 22:38:41 -0700534 @param timeout The timeout in seconds; if -1 use default. if None
535 blocks without time out
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800536 @param zero_xid Normally, if the XID is 0 an XID will be generated
537 for the message. Set xero_xid to override this behavior
Dan Talayco21c75c72010-02-12 22:59:24 -0800538 @return The matching message object or None if unsuccessful
Dan Talaycoe37999f2010-02-09 15:27:12 -0800539
Dan Talayco34089522010-02-07 23:07:41 -0800540 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800541
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800542 if not zero_xid and msg.header.xid == 0:
543 msg.header.xid = gen_xid()
544
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700545 if timeout == -1:
Dan Talaycof8de5182012-04-12 22:38:41 -0700546 timeout = self.transact_to
Rich Lane9aca1992012-07-11 17:26:31 -0700547 if timeout == None:
548 timeout = 60
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700549 self.logger.debug("Running transaction %d" % msg.header.xid)
Dan Talayco21c75c72010-02-12 22:59:24 -0800550
Rich Lane9aca1992012-07-11 17:26:31 -0700551 with self.xid_cv:
552 if self.xid:
553 self.logger.error("Can only run one transaction at a time")
554 return (None, None)
Dan Talaycof8de5182012-04-12 22:38:41 -0700555
Rich Lane9aca1992012-07-11 17:26:31 -0700556 self.xid = msg.header.xid
Dan Talaycod12b6612010-03-07 22:00:46 -0800557 self.xid_response = None
Rich Lane9aca1992012-07-11 17:26:31 -0700558 if self.message_send(msg.pack()) < 0:
559 self.logger.error("Error sending pkt for transaction %d" %
560 msg.header.xid)
561 return (None, None)
562
563 self.logger.debug("Waiting %fs for transaction %d" % (timeout, msg.header.xid))
564 end_time = time.time() + timeout
565 while (not self.xid_response) and (time.time() < end_time):
566 self.xid_cv.wait(timeout)
567
568 if self.xid_response:
569 (resp, pkt) = self.xid_response
570 self.xid_response = None
571 else:
572 (resp, pkt) = (None, None)
573
Dan Talayco09c2c592010-05-13 14:21:52 -0700574 if resp is None:
575 self.logger.warning("No response for xid " + str(self.xid))
576 return (resp, pkt)
Dan Talayco34089522010-02-07 23:07:41 -0800577
Dan Talayco710438c2010-02-18 15:16:07 -0800578 def message_send(self, msg, zero_xid=False):
Dan Talayco34089522010-02-07 23:07:41 -0800579 """
580 Send the message to the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800581
Dan Talayco11c26e72010-03-07 22:03:57 -0800582 @param msg A string or OpenFlow message object to be forwarded to
583 the switch.
584 @param zero_xid If msg is an OpenFlow object (not a string) and if
Dan Talayco710438c2010-02-18 15:16:07 -0800585 the XID in the header is 0, then an XID will be generated
586 for the message. Set xero_xid to override this behavior (and keep an
587 existing 0 xid)
Dan Talaycoe37999f2010-02-09 15:27:12 -0800588
Dan Talayco710438c2010-02-18 15:16:07 -0800589 @return -1 if error, 0 on success
Dan Talayco34089522010-02-07 23:07:41 -0800590
Dan Talayco21c75c72010-02-12 22:59:24 -0800591 """
592
Dan Talayco1b3f6902010-02-15 14:14:19 -0800593 if not self.switch_socket:
594 # Sending a string indicates the message is ready to go
Dan Talayco48370102010-03-03 15:17:33 -0800595 self.logger.info("message_send: no socket")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800596 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800597 #@todo If not string, try to pack
Dan Talayco21c75c72010-02-12 22:59:24 -0800598 if type(msg) != type(""):
Dan Talayco710438c2010-02-18 15:16:07 -0800599 try:
600 if msg.header.xid == 0 and not zero_xid:
601 msg.header.xid = gen_xid()
602 outpkt = msg.pack()
603 except:
Dan Talayco48370102010-03-03 15:17:33 -0800604 self.logger.error(
Dan Talayco710438c2010-02-18 15:16:07 -0800605 "message_send: not an OF message or string?")
606 return -1
607 else:
608 outpkt = msg
Dan Talayco21c75c72010-02-12 22:59:24 -0800609
Dan Talayco48370102010-03-03 15:17:33 -0800610 self.logger.debug("Sending pkt of len " + str(len(outpkt)))
Dan Talayco710438c2010-02-18 15:16:07 -0800611 if self.switch_socket.sendall(outpkt) is None:
612 return 0
613
Dan Talayco48370102010-03-03 15:17:33 -0800614 self.logger.error("Unknown error on sendall")
Dan Talayco710438c2010-02-18 15:16:07 -0800615 return -1
Dan Talayco21c75c72010-02-12 22:59:24 -0800616
617 def __str__(self):
618 string = "Controller:\n"
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800619 string += " state " + self.dbg_state + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800620 string += " switch_addr " + str(self.switch_addr) + "\n"
621 string += " pending pkts " + str(len(self.packets)) + "\n"
622 string += " total pkts " + str(self.packets_total) + "\n"
623 string += " expired pkts " + str(self.packets_expired) + "\n"
624 string += " handled pkts " + str(self.packets_handled) + "\n"
Dan Talaycoe226eb12010-02-18 23:06:30 -0800625 string += " poll discards " + str(self.poll_discards) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800626 string += " parse errors " + str(self.parse_errors) + "\n"
627 string += " sock errrors " + str(self.socket_errors) + "\n"
628 string += " max pkts " + str(self.max_pkts) + "\n"
629 string += " host " + str(self.host) + "\n"
630 string += " port " + str(self.port) + "\n"
631 string += " keep_alive " + str(self.keep_alive) + "\n"
Dan Talaycof8de5182012-04-12 22:38:41 -0700632 string += " pkt_in_run " + str(self.pkt_in_run) + "\n"
633 string += " pkt_in_dropped " + str(self.pkt_in_dropped) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800634 return string
635
636 def show(self):
637 print str(self)
638
639def sample_handler(controller, msg, pkt):
640 """
641 Sample message handler
642
643 This is the prototype for functions registered with the controller
644 class for packet reception
645
646 @param controller The controller calling the handler
647 @param msg The parsed message object
648 @param pkt The raw packet that was received on the socket. This is
649 in case the packet contains extra unparsed data.
650 @returns Boolean value indicating if the packet was handled. If
651 not handled, the packet is placed in the queue for pollers to received
652 """
653 pass