blob: 70b3aade724672e8f6b683f3a52bd5d2b30d6f44 [file] [log] [blame]
Dan Talayco34089522010-02-07 23:07:41 -08001"""
2OpenFlow Test Framework
3
4Controller class
5
6Provide the interface to the control channel to the switch under test.
7
8Class inherits from thread so as to run in background allowing
9asynchronous callbacks (if needed, not required). Also supports
10polling.
11
12The controller thread maintains a queue. Incoming messages that
13are not handled by a callback function are placed in this queue for
14poll calls.
15
16Callbacks and polling support specifying the message type
17
18@todo Support transaction semantics via xid
Dan Talayco1b3f6902010-02-15 14:14:19 -080019@todo Support select and listen on an administrative socket (or
20use a timeout to support clean shutdown).
21
22Currently only one connection is accepted during the life of
23the controller. There seems
24to be no clean way to interrupt an accept call. Using select that also listens
25on an administrative socket and can shut down the socket might work.
26
Dan Talayco34089522010-02-07 23:07:41 -080027"""
28
Dan Talayco34089522010-02-07 23:07:41 -080029import os
30import socket
31import time
Dan Talayco34089522010-02-07 23:07:41 -080032from threading import Thread
33from threading import Lock
Dan Talayco21c75c72010-02-12 22:59:24 -080034from threading import Condition
Dan Talayco34089522010-02-07 23:07:41 -080035from message import *
Dan Talaycoe37999f2010-02-09 15:27:12 -080036from parse import *
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080037from ofutils import *
Dan Talayco710438c2010-02-18 15:16:07 -080038# For some reason, it seems select to be last (or later).
39# Otherwise get an attribute error when calling select.select
40import select
Dan Talayco48370102010-03-03 15:17:33 -080041import logging
42
43##@todo Find a better home for these identifiers (controller)
Glen Gibb741b1182010-07-08 16:43:58 -070044RCV_SIZE_DEFAULT = 32768
Dan Talayco48370102010-03-03 15:17:33 -080045LISTEN_QUEUE_SIZE = 1
Dan Talayco34089522010-02-07 23:07:41 -080046
47class Controller(Thread):
48 """
49 Class abstracting the control interface to the switch.
50
51 For receiving messages, two mechanism will be implemented. First,
52 query the interface with poll. Second, register to have a
53 function called by message type. The callback is passed the
54 message type as well as the raw packet (or message object)
55
56 One of the main purposes of this object is to translate between network
57 and host byte order. 'Above' this object, things should be in host
58 byte order.
Dan Talayco21c75c72010-02-12 22:59:24 -080059
60 @todo Consider using SocketServer for listening socket
61 @todo Test transaction code
62
63 @var rcv_size The receive size to use for receive calls
64 @var max_pkts The max size of the receive queue
65 @var keep_alive If true, listen for echo requests and respond w/
Dan Talaycod12b6612010-03-07 22:00:46 -080066 @var keep_alive If true, listen for echo requests and respond w/
Dan Talayco21c75c72010-02-12 22:59:24 -080067 echo replies
Dan Talayco710438c2010-02-18 15:16:07 -080068 @var initial_hello If true, will send a hello message immediately
69 upon connecting to the switch
Dan Talaycod12b6612010-03-07 22:00:46 -080070 @var exit_on_reset If true, terminate controller on connection reset
Dan Talayco21c75c72010-02-12 22:59:24 -080071 @var host The host to use for connect
72 @var port The port to connect on
73 @var packets_total Total number of packets received
74 @var packets_expired Number of packets popped from queue as queue full
75 @var packets_handled Number of packets handled by something
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080076 @var dbg_state Debug indication of state
Dan Talayco34089522010-02-07 23:07:41 -080077 """
78
Dan Talayco48370102010-03-03 15:17:33 -080079 def __init__(self, host='127.0.0.1', port=6633, max_pkts=1024):
Dan Talayco21c75c72010-02-12 22:59:24 -080080 Thread.__init__(self)
Dan Talayco1b3f6902010-02-15 14:14:19 -080081 # Socket related
Dan Talayco21c75c72010-02-12 22:59:24 -080082 self.rcv_size = RCV_SIZE_DEFAULT
Dan Talayco1b3f6902010-02-15 14:14:19 -080083 self.listen_socket = None
84 self.switch_socket = None
85 self.switch_addr = None
Dan Talayco710438c2010-02-18 15:16:07 -080086 self.socs = []
87 self.connect_cv = Condition()
Dan Talaycoe226eb12010-02-18 23:06:30 -080088 self.message_cv = Condition()
Dan Talayco1b3f6902010-02-15 14:14:19 -080089
90 # Counters
Dan Talayco21c75c72010-02-12 22:59:24 -080091 self.socket_errors = 0
92 self.parse_errors = 0
Dan Talayco21c75c72010-02-12 22:59:24 -080093 self.packets_total = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -080094 self.packets_expired = 0
95 self.packets_handled = 0
Dan Talaycoe226eb12010-02-18 23:06:30 -080096 self.poll_discards = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -080097
98 # State
Dan Talayco21c75c72010-02-12 22:59:24 -080099 self.packets = []
100 self.sync = Lock()
101 self.handlers = {}
102 self.keep_alive = False
Dan Talayco710438c2010-02-18 15:16:07 -0800103 self.active = True
104 self.initial_hello = True
Dan Talaycod12b6612010-03-07 22:00:46 -0800105 self.exit_on_reset = True
Dan Talayco1b3f6902010-02-15 14:14:19 -0800106
107 # Settings
108 self.max_pkts = max_pkts
109 self.passive = True
Dan Talayco48370102010-03-03 15:17:33 -0800110 self.host = host
111 self.port = port
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800112 self.dbg_state = "init"
Dan Talayco48370102010-03-03 15:17:33 -0800113 self.logger = logging.getLogger("controller")
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700114 self.barrier_to = 15 # Barrier timeout default value; add to config
Dan Talayco1b3f6902010-02-15 14:14:19 -0800115
Dan Talaycoe226eb12010-02-18 23:06:30 -0800116 # Transaction and message type waiting variables
117 # xid_cv: Condition variable (semaphore) for packet waiters
Dan Talayco21c75c72010-02-12 22:59:24 -0800118 # xid: Transaction ID being waited on
119 # xid_response: Transaction response message
Dan Talaycoe226eb12010-02-18 23:06:30 -0800120 # expect_msg: Is a message being waited on
121 # expect_msg_cv: Semaphore for waiters
122 # expect_msg_type: Type of message expected
123 # expect_msg_response: Result passed through here
124
Dan Talayco21c75c72010-02-12 22:59:24 -0800125 self.xid_cv = Condition()
126 self.xid = None
127 self.xid_response = None
128
Dan Talaycoe226eb12010-02-18 23:06:30 -0800129 self.expect_msg = False
130 self.expect_msg_cv = Condition()
131 self.expect_msg_type = None
132 self.expect_msg_response = None
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800133 self.buffered_input = ""
Dan Talaycoe226eb12010-02-18 23:06:30 -0800134
Dan Talaycod12b6612010-03-07 22:00:46 -0800135 def _pkt_handle(self, pkt):
136 """
137 Check for all packet handling conditions
138
139 Parse and verify message
140 Check if XID matches something waiting
141 Check if message is being expected for a poll operation
142 Check if keep alive is on and message is an echo request
143 Check if any registered handler wants the packet
144 Enqueue if none of those conditions is met
145
146 an echo request in case keep_alive is true, followed by
147 registered message handlers.
Glen Gibb6d467062010-07-08 16:15:08 -0700148 @param pkt The raw packet (string) which may contain multiple OF msgs
Dan Talaycod12b6612010-03-07 22:00:46 -0800149 """
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800150
151 # snag any left over data from last read()
152 pkt = self.buffered_input + pkt
153 self.buffered_input = ""
154
Glen Gibb6d467062010-07-08 16:15:08 -0700155 # Process each of the OF msgs inside the pkt
156 offset = 0
157 while offset < len(pkt):
158 # Parse the header to get type
159 hdr = of_header_parse(pkt[offset:])
160 if not hdr:
161 self.logger.info("Could not parse header, pkt len", len(pkt))
162 self.parse_errors += 1
Dan Talaycod12b6612010-03-07 22:00:46 -0800163 return
Glen Gibb6d467062010-07-08 16:15:08 -0700164 if hdr.length == 0:
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700165 self.logger.error("Header length is zero; out of sync")
Glen Gibb6d467062010-07-08 16:15:08 -0700166 self.parse_errors += 1
Dan Talaycod12b6612010-03-07 22:00:46 -0800167 return
168
Glen Gibb6d467062010-07-08 16:15:08 -0700169 # Extract the raw message bytes
Ed Swierk836e5bd2012-03-20 11:08:53 -0700170 if (offset + hdr.length) > len(pkt):
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800171 break
Glen Gibb6d467062010-07-08 16:15:08 -0700172 rawmsg = pkt[offset : offset + hdr.length]
Dan Talaycod12b6612010-03-07 22:00:46 -0800173
Glen Gibb6d467062010-07-08 16:15:08 -0700174 self.logger.debug("Msg in: len %d. offset %d. type %s. hdr.len %d" %
175 (len(pkt), offset, ofp_type_map[hdr.type], hdr.length))
Dan Talayco4306d3e2011-09-07 09:42:26 -0700176 offset += hdr.length
Glen Gibb6d467062010-07-08 16:15:08 -0700177 if hdr.version != OFP_VERSION:
178 self.logger.error("Version %d does not match OFTest version %d"
179 % (hdr.version, OFP_VERSION))
180 print "Version %d does not match OFTest version %d" % \
181 (hdr.version, OFP_VERSION)
182 self.active = False
183 self.switch_socket = None
184 self.kill()
Dan Talaycod12b6612010-03-07 22:00:46 -0800185
Glen Gibb6d467062010-07-08 16:15:08 -0700186 msg = of_message_parse(rawmsg)
187 if not msg:
188 self.parse_errors += 1
189 self.logger.warn("Could not parse message")
190 continue
191
192 self.sync.acquire()
193
194 # Check if transaction is waiting
195 self.xid_cv.acquire()
196 if self.xid:
197 if hdr.xid == self.xid:
198 self.logger.debug("Matched expected XID " + str(hdr.xid))
199 self.xid_response = (msg, rawmsg)
200 self.xid = None
201 self.xid_cv.notify()
202 self.xid_cv.release()
203 self.sync.release()
204 continue
205 self.xid_cv.release()
206
207 # PREVENT QUEUE ACCESS AT THIS POINT?
208 # Check if anyone waiting on this type of message
209 self.expect_msg_cv.acquire()
210 if self.expect_msg:
211 if not self.expect_msg_type or (self.expect_msg_type == hdr.type):
root2843d2b2012-04-06 10:27:46 -0700212 self.logger.debug("Matched msg; type %s. expected %s " %
213 (ofp_type_map[hdr.type],
Howard Pershc00c7472012-04-09 15:49:07 -0700214 str(self.expect_msg_type)))
Glen Gibb6d467062010-07-08 16:15:08 -0700215 self.expect_msg_response = (msg, rawmsg)
216 self.expect_msg = False
217 self.expect_msg_cv.notify()
218 self.expect_msg_cv.release()
219 self.sync.release()
220 continue
221 self.expect_msg_cv.release()
222
223 # Check if keep alive is set; if so, respond to echo requests
224 if self.keep_alive:
225 if hdr.type == OFPT_ECHO_REQUEST:
226 self.sync.release()
227 self.logger.debug("Responding to echo request")
228 rep = echo_reply()
229 rep.header.xid = hdr.xid
230 # Ignoring additional data
231 self.message_send(rep.pack(), zero_xid=True)
232 continue
233
234 # Now check for message handlers; preference is given to
235 # handlers for a specific packet
236 handled = False
237 if hdr.type in self.handlers.keys():
238 handled = self.handlers[hdr.type](self, msg, rawmsg)
239 if not handled and ("all" in self.handlers.keys()):
240 handled = self.handlers["all"](self, msg, rawmsg)
241
242 if not handled: # Not handled, enqueue
243 self.logger.debug("Enqueuing pkt type " + ofp_type_map[hdr.type])
244 if len(self.packets) >= self.max_pkts:
245 self.packets.pop(0)
246 self.packets_expired += 1
247 self.packets.append((msg, rawmsg))
248 self.packets_total += 1
249 else:
250 self.packets_handled += 1
251 self.logger.debug("Message handled by callback")
252
253 self.sync.release()
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800254 # end of 'while offset < len(pkt)'
255 # note that if offset = len(pkt), this is
256 # appends a harmless empty string
257 self.buffered_input += pkt[offset:]
Dan Talaycod12b6612010-03-07 22:00:46 -0800258
Dan Talayco710438c2010-02-18 15:16:07 -0800259 def _socket_ready_handle(self, s):
260 """
261 Handle an input-ready socket
262 @param s The socket object that is ready
263 @retval True, reset the switch connection
264 """
265
266 if s == self.listen_socket:
267 if self.switch_socket:
Ed Swierk0214da22012-03-19 14:58:02 -0700268 return False
Dan Talayco710438c2010-02-18 15:16:07 -0800269
270 (self.switch_socket, self.switch_addr) = \
271 self.listen_socket.accept()
Dan Talayco48370102010-03-03 15:17:33 -0800272 self.logger.info("Got cxn to " + str(self.switch_addr))
Dan Talayco710438c2010-02-18 15:16:07 -0800273 # Notify anyone waiting
274 self.connect_cv.acquire()
275 self.connect_cv.notify()
276 self.connect_cv.release()
277 self.socs.append(self.switch_socket)
278 if self.initial_hello:
279 self.message_send(hello())
280 elif s == self.switch_socket:
281 try:
282 pkt = self.switch_socket.recv(self.rcv_size)
283 except:
Dan Talaycod12b6612010-03-07 22:00:46 -0800284 self.logger.warning("Error on switch read")
Dan Talayco710438c2010-02-18 15:16:07 -0800285 return True
286
287 if not self.active:
288 return False
289
290 if len(pkt) == 0:
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700291 self.logger.warning("Zero-length switch read; closing cxn")
292 return True
Dan Talayco710438c2010-02-18 15:16:07 -0800293
Dan Talaycod12b6612010-03-07 22:00:46 -0800294 self._pkt_handle(pkt)
Dan Talayco710438c2010-02-18 15:16:07 -0800295 else:
Dan Talayco48370102010-03-03 15:17:33 -0800296 self.logger.error("Unknown socket ready: " + str(s))
Dan Talayco710438c2010-02-18 15:16:07 -0800297 return True
298
299 return False
300
Dan Talayco1b3f6902010-02-15 14:14:19 -0800301 def run(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800302 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800303 Activity function for class
Dan Talayco21c75c72010-02-12 22:59:24 -0800304
Dan Talayco1b3f6902010-02-15 14:14:19 -0800305 Assumes connection to switch already exists. Listens on
306 switch_socket for messages until an error (or zero len pkt)
307 occurs.
Dan Talayco21c75c72010-02-12 22:59:24 -0800308
Dan Talayco1b3f6902010-02-15 14:14:19 -0800309 When there is a message on the socket, check for handlers; queue the
310 packet if no one handles the packet.
311
312 See note for controller describing the limitation of a single
313 connection for now.
314 """
315
Dan Talayco710438c2010-02-18 15:16:07 -0800316 self.dbg_state = "starting"
Dan Talayco1b3f6902010-02-15 14:14:19 -0800317
Dan Talayco710438c2010-02-18 15:16:07 -0800318 # Create listen socket
Dan Talayco48370102010-03-03 15:17:33 -0800319 self.logger.info("Create/listen at " + self.host + ":" +
Dan Talayco710438c2010-02-18 15:16:07 -0800320 str(self.port))
321 self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
322 self.listen_socket.setsockopt(socket.SOL_SOCKET,
323 socket.SO_REUSEADDR, 1)
324 self.listen_socket.bind((self.host, self.port))
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800325 self.dbg_state = "listening"
Dan Talayco21c75c72010-02-12 22:59:24 -0800326 self.listen_socket.listen(LISTEN_QUEUE_SIZE)
Dan Talayco21c75c72010-02-12 22:59:24 -0800327
Dan Talayco48370102010-03-03 15:17:33 -0800328 self.logger.info("Waiting for switch connection")
Dan Talayco710438c2010-02-18 15:16:07 -0800329 self.socs = [self.listen_socket]
330 self.dbg_state = "running"
331 while self.active:
332 reset_switch_cxn = False
333 try:
334 sel_in, sel_out, sel_err = \
335 select.select(self.socs, [], self.socs, 1)
336 except:
337 print sys.exc_info()
Dan Talayco48370102010-03-03 15:17:33 -0800338 self.logger.error("Select error, exiting")
Dan Talayco710438c2010-02-18 15:16:07 -0800339 sys.exit(1)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800340
Dan Talayco710438c2010-02-18 15:16:07 -0800341 if not self.active:
342 break
Dan Talayco1b3f6902010-02-15 14:14:19 -0800343
Dan Talayco710438c2010-02-18 15:16:07 -0800344 for s in sel_in:
345 reset_switch_cxn = self._socket_ready_handle(s)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800346
Dan Talayco710438c2010-02-18 15:16:07 -0800347 for s in sel_err:
Dan Talayco48370102010-03-03 15:17:33 -0800348 self.logger.error("Got socket error on: " + str(s))
Dan Talayco710438c2010-02-18 15:16:07 -0800349 if s == self.switch_socket:
350 reset_switch_cxn = True
351 else:
Dan Talayco48370102010-03-03 15:17:33 -0800352 self.logger.error("Socket error; exiting")
Dan Talayco710438c2010-02-18 15:16:07 -0800353 self.active = False
354 break
355
356 if self.active and reset_switch_cxn:
Dan Talaycod12b6612010-03-07 22:00:46 -0800357 if self.exit_on_reset:
358 self.kill()
359 else:
360 self.logger.warning("Closing switch cxn")
361 try:
362 self.switch_socket.close()
363 except:
364 pass
365 self.switch_socket = None
366 self.socs = self.socs[0:1]
Dan Talayco710438c2010-02-18 15:16:07 -0800367
368 # End of main loop
369 self.dbg_state = "closing"
Dan Talayco48370102010-03-03 15:17:33 -0800370 self.logger.info("Exiting controller thread")
Dan Talayco710438c2010-02-18 15:16:07 -0800371 self.shutdown()
372
373 def connect(self, timeout=None):
374 """
375 Connect to the switch
376
377 @param timeout If None, block until connected. If 0, return
378 immedidately. Otherwise, block for up to timeout seconds
379 @return Boolean, True if connected
380 """
381
382 if timeout == 0:
383 return self.switch_socket is not None
384 if self.switch_socket is not None:
385 return True
386 self.connect_cv.acquire()
387 self.connect_cv.wait(timeout)
388 self.connect_cv.release()
389
390 return self.switch_socket is not None
391
392 def kill(self):
393 """
394 Force the controller thread to quit
395
396 Just sets the active state variable to false and expects
397 the select timeout to kick in
398 """
399 self.active = False
Dan Talayco21c75c72010-02-12 22:59:24 -0800400
Dan Talayco1b3f6902010-02-15 14:14:19 -0800401 def shutdown(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800402 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800403 Shutdown the controller closing all sockets
Dan Talayco21c75c72010-02-12 22:59:24 -0800404
Dan Talayco1b3f6902010-02-15 14:14:19 -0800405 @todo Might want to synchronize shutdown with self.sync...
406 """
Dan Talayco710438c2010-02-18 15:16:07 -0800407 self.active = False
408 try:
409 self.switch_socket.shutdown(socket.SHUT_RDWR)
410 except:
Dan Talayco48370102010-03-03 15:17:33 -0800411 self.logger.info("Ignoring switch soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800412 self.switch_socket = None
Dan Talayco1b3f6902010-02-15 14:14:19 -0800413
Dan Talayco710438c2010-02-18 15:16:07 -0800414 try:
415 self.listen_socket.shutdown(socket.SHUT_RDWR)
416 except:
Dan Talayco48370102010-03-03 15:17:33 -0800417 self.logger.info("Ignoring listen soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800418 self.listen_socket = None
419 self.dbg_state = "down"
420
Dan Talayco34089522010-02-07 23:07:41 -0800421 def register(self, msg_type, handler):
422 """
423 Register a callback to receive a specific message type.
424
425 Only one handler may be registered for a given message type.
Dan Talaycod12b6612010-03-07 22:00:46 -0800426
427 WARNING: A lock is held during the handler call back, so
428 the handler should not make any blocking calls
429
Dan Talayco34089522010-02-07 23:07:41 -0800430 @param msg_type The type of message to receive. May be DEFAULT
Dan Talayco21c75c72010-02-12 22:59:24 -0800431 for all non-handled packets. The special type, the string "all"
432 will send all packets to the handler.
Dan Talayco34089522010-02-07 23:07:41 -0800433 @param handler The function to call when a message of the given
434 type is received.
435 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800436 # Should check type is valid
437 if not handler and msg_type in self.handlers.keys():
438 del self.handlers[msg_type]
439 return
440 self.handlers[msg_type] = handler
Dan Talayco34089522010-02-07 23:07:41 -0800441
Dan Talayco21c75c72010-02-12 22:59:24 -0800442 def poll(self, exp_msg=None, timeout=None):
Dan Talayco34089522010-02-07 23:07:41 -0800443 """
444 Wait for the next OF message received from the switch.
445
446 @param exp_msg If set, return only when this type of message
Dan Talayco48370102010-03-03 15:17:33 -0800447 is received (unless timeout occurs).
Dan Talaycoe226eb12010-02-18 23:06:30 -0800448 @param timeout If None, do not block. Otherwise, sleep in
Dan Talayco48370102010-03-03 15:17:33 -0800449 intervals of 1 second until message is received.
Dan Talayco34089522010-02-07 23:07:41 -0800450
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800451 @retval A pair (msg, pkt) where msg is a message object and pkt
452 the string representing the packet as received from the socket.
453 This allows additional parsing by the receiver if necessary.
454
Dan Talayco34089522010-02-07 23:07:41 -0800455 The data members in the message are in host endian order.
Dan Talayco48370102010-03-03 15:17:33 -0800456 If an error occurs, (None, None) is returned
457
458 The current queue is searched for a message of the desired type
459 before sleeping on message in events.
Dan Talayco34089522010-02-07 23:07:41 -0800460 """
Dan Talayco34089522010-02-07 23:07:41 -0800461
Dan Talaycoe226eb12010-02-18 23:06:30 -0800462 msg = pkt = None
Dan Talayco34089522010-02-07 23:07:41 -0800463
Dan Talayco48370102010-03-03 15:17:33 -0800464 self.logger.debug("Poll for " + ofp_type_map[exp_msg])
Dan Talaycoe226eb12010-02-18 23:06:30 -0800465 # First check the current queue
466 self.sync.acquire()
467 if len(self.packets) > 0:
468 if not exp_msg:
469 (msg, pkt) = self.packets.pop(0)
470 self.sync.release()
471 return (msg, pkt)
472 else:
473 for i in range(len(self.packets)):
474 msg = self.packets[i][0]
475 if msg.header.type == exp_msg:
476 (msg, pkt) = self.packets.pop(i)
477 self.sync.release()
478 return (msg, pkt)
479
480 # Okay, not currently in the queue
481 if timeout is None or timeout <= 0:
Dan Talayco21c75c72010-02-12 22:59:24 -0800482 self.sync.release()
Dan Talaycoe226eb12010-02-18 23:06:30 -0800483 return (None, None)
Dan Talayco21c75c72010-02-12 22:59:24 -0800484
Dan Talayco48370102010-03-03 15:17:33 -0800485 msg = pkt = None
486 self.logger.debug("Entering timeout")
Dan Talaycoe226eb12010-02-18 23:06:30 -0800487 # Careful of race condition releasing sync before message cv
Dan Talayco90576bd2010-02-19 10:59:02 -0800488 # Also, this style is ripe for a lockup.
Dan Talaycoe226eb12010-02-18 23:06:30 -0800489 self.expect_msg_cv.acquire()
Dan Talayco48370102010-03-03 15:17:33 -0800490 self.expect_msg_response = None
Dan Talaycoe226eb12010-02-18 23:06:30 -0800491 self.expect_msg = True
492 self.expect_msg_type = exp_msg
root2843d2b2012-04-06 10:27:46 -0700493 self.sync.release()
Dan Talaycoe226eb12010-02-18 23:06:30 -0800494 self.expect_msg_cv.wait(timeout)
495 if self.expect_msg_response is not None:
496 (msg, pkt) = self.expect_msg_response
Dan Talaycoe226eb12010-02-18 23:06:30 -0800497 self.expect_msg_cv.release()
498
499 if msg is None:
Dan Talayco48370102010-03-03 15:17:33 -0800500 self.logger.debug("Poll time out")
501 else:
502 self.logger.debug("Got msg " + str(msg))
Dan Talaycoe226eb12010-02-18 23:06:30 -0800503
504 return (msg, pkt)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800505
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700506 def transact(self, msg, timeout=-1, zero_xid=False):
Dan Talayco34089522010-02-07 23:07:41 -0800507 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800508 Run a message transaction with the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800509
510 Send the message in msg and wait for a reply with a matching
Dan Talayco21c75c72010-02-12 22:59:24 -0800511 transaction id. Transactions have the highest priority in
512 received message handling.
Dan Talaycoe37999f2010-02-09 15:27:12 -0800513
Dan Talayco21c75c72010-02-12 22:59:24 -0800514 @param msg The message object to send; must not be a string
515 @param timeout The timeout in seconds (?)
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800516 @param zero_xid Normally, if the XID is 0 an XID will be generated
517 for the message. Set xero_xid to override this behavior
Dan Talayco21c75c72010-02-12 22:59:24 -0800518 @return The matching message object or None if unsuccessful
Dan Talaycoe37999f2010-02-09 15:27:12 -0800519
Dan Talayco34089522010-02-07 23:07:41 -0800520 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800521
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800522 if not zero_xid and msg.header.xid == 0:
523 msg.header.xid = gen_xid()
524
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700525 if timeout == -1:
526 timeout = self.barrier_to
527 self.logger.debug("Running transaction %d" % msg.header.xid)
Dan Talayco21c75c72010-02-12 22:59:24 -0800528 self.xid_cv.acquire()
529 if self.xid:
530 self.xid_cv.release()
Dan Talayco48370102010-03-03 15:17:33 -0800531 self.logger.error("Can only run one transaction at a time")
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700532 return (None, None)
Dan Talayco21c75c72010-02-12 22:59:24 -0800533
534 self.xid = msg.header.xid
535 self.xid_response = None
536 self.message_send(msg.pack())
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700537 self.logger.debug("Waiting for transaction %d" % msg.header.xid)
Dan Talayco21c75c72010-02-12 22:59:24 -0800538 self.xid_cv.wait(timeout)
Dan Talaycod12b6612010-03-07 22:00:46 -0800539 if self.xid_response:
Dan Talayco09c2c592010-05-13 14:21:52 -0700540 (resp, pkt) = self.xid_response
Dan Talaycod12b6612010-03-07 22:00:46 -0800541 self.xid_response = None
542 else:
Dan Talayco09c2c592010-05-13 14:21:52 -0700543 (resp, pkt) = (None, None)
Dan Talayco21c75c72010-02-12 22:59:24 -0800544 self.xid_cv.release()
Dan Talayco09c2c592010-05-13 14:21:52 -0700545 if resp is None:
546 self.logger.warning("No response for xid " + str(self.xid))
547 return (resp, pkt)
Dan Talayco34089522010-02-07 23:07:41 -0800548
Dan Talayco710438c2010-02-18 15:16:07 -0800549 def message_send(self, msg, zero_xid=False):
Dan Talayco34089522010-02-07 23:07:41 -0800550 """
551 Send the message to the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800552
Dan Talayco11c26e72010-03-07 22:03:57 -0800553 @param msg A string or OpenFlow message object to be forwarded to
554 the switch.
555 @param zero_xid If msg is an OpenFlow object (not a string) and if
Dan Talayco710438c2010-02-18 15:16:07 -0800556 the XID in the header is 0, then an XID will be generated
557 for the message. Set xero_xid to override this behavior (and keep an
558 existing 0 xid)
Dan Talaycoe37999f2010-02-09 15:27:12 -0800559
Dan Talayco710438c2010-02-18 15:16:07 -0800560 @return -1 if error, 0 on success
Dan Talayco34089522010-02-07 23:07:41 -0800561
Dan Talayco21c75c72010-02-12 22:59:24 -0800562 """
563
Dan Talayco1b3f6902010-02-15 14:14:19 -0800564 if not self.switch_socket:
565 # Sending a string indicates the message is ready to go
Dan Talayco48370102010-03-03 15:17:33 -0800566 self.logger.info("message_send: no socket")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800567 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800568 #@todo If not string, try to pack
Dan Talayco21c75c72010-02-12 22:59:24 -0800569 if type(msg) != type(""):
Dan Talayco710438c2010-02-18 15:16:07 -0800570 try:
571 if msg.header.xid == 0 and not zero_xid:
572 msg.header.xid = gen_xid()
573 outpkt = msg.pack()
574 except:
Dan Talayco48370102010-03-03 15:17:33 -0800575 self.logger.error(
Dan Talayco710438c2010-02-18 15:16:07 -0800576 "message_send: not an OF message or string?")
577 return -1
578 else:
579 outpkt = msg
Dan Talayco21c75c72010-02-12 22:59:24 -0800580
Dan Talayco48370102010-03-03 15:17:33 -0800581 self.logger.debug("Sending pkt of len " + str(len(outpkt)))
Dan Talayco710438c2010-02-18 15:16:07 -0800582 if self.switch_socket.sendall(outpkt) is None:
583 return 0
584
Dan Talayco48370102010-03-03 15:17:33 -0800585 self.logger.error("Unknown error on sendall")
Dan Talayco710438c2010-02-18 15:16:07 -0800586 return -1
Dan Talayco21c75c72010-02-12 22:59:24 -0800587
588 def __str__(self):
589 string = "Controller:\n"
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800590 string += " state " + self.dbg_state + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800591 string += " switch_addr " + str(self.switch_addr) + "\n"
592 string += " pending pkts " + str(len(self.packets)) + "\n"
593 string += " total pkts " + str(self.packets_total) + "\n"
594 string += " expired pkts " + str(self.packets_expired) + "\n"
595 string += " handled pkts " + str(self.packets_handled) + "\n"
Dan Talaycoe226eb12010-02-18 23:06:30 -0800596 string += " poll discards " + str(self.poll_discards) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800597 string += " parse errors " + str(self.parse_errors) + "\n"
598 string += " sock errrors " + str(self.socket_errors) + "\n"
599 string += " max pkts " + str(self.max_pkts) + "\n"
600 string += " host " + str(self.host) + "\n"
601 string += " port " + str(self.port) + "\n"
602 string += " keep_alive " + str(self.keep_alive) + "\n"
603 return string
604
605 def show(self):
606 print str(self)
607
608def sample_handler(controller, msg, pkt):
609 """
610 Sample message handler
611
612 This is the prototype for functions registered with the controller
613 class for packet reception
614
615 @param controller The controller calling the handler
616 @param msg The parsed message object
617 @param pkt The raw packet that was received on the socket. This is
618 in case the packet contains extra unparsed data.
619 @returns Boolean value indicating if the packet was handled. If
620 not handled, the packet is placed in the queue for pollers to received
621 """
622 pass