blob: e7051ee5350977e2a15321995a248f629056efb5 [file] [log] [blame]
Dan Talayco34089522010-02-07 23:07:41 -08001"""
2OpenFlow Test Framework
3
4Controller class
5
6Provide the interface to the control channel to the switch under test.
7
8Class inherits from thread so as to run in background allowing
9asynchronous callbacks (if needed, not required). Also supports
10polling.
11
12The controller thread maintains a queue. Incoming messages that
13are not handled by a callback function are placed in this queue for
14poll calls.
15
16Callbacks and polling support specifying the message type
17
18@todo Support transaction semantics via xid
Dan Talayco1b3f6902010-02-15 14:14:19 -080019@todo Support select and listen on an administrative socket (or
20use a timeout to support clean shutdown).
21
22Currently only one connection is accepted during the life of
23the controller. There seems
24to be no clean way to interrupt an accept call. Using select that also listens
25on an administrative socket and can shut down the socket might work.
26
Dan Talayco34089522010-02-07 23:07:41 -080027"""
28
Dan Talayco34089522010-02-07 23:07:41 -080029import os
30import socket
31import time
Dan Talayco34089522010-02-07 23:07:41 -080032from threading import Thread
33from threading import Lock
Dan Talayco21c75c72010-02-12 22:59:24 -080034from threading import Condition
Dan Talayco34089522010-02-07 23:07:41 -080035from message import *
Dan Talaycoe37999f2010-02-09 15:27:12 -080036from parse import *
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080037from ofutils import *
Dan Talayco710438c2010-02-18 15:16:07 -080038# For some reason, it seems select to be last (or later).
39# Otherwise get an attribute error when calling select.select
40import select
Dan Talayco48370102010-03-03 15:17:33 -080041import logging
42
43##@todo Find a better home for these identifiers (controller)
Glen Gibb741b1182010-07-08 16:43:58 -070044RCV_SIZE_DEFAULT = 32768
Dan Talayco48370102010-03-03 15:17:33 -080045LISTEN_QUEUE_SIZE = 1
Dan Talayco34089522010-02-07 23:07:41 -080046
47class Controller(Thread):
48 """
49 Class abstracting the control interface to the switch.
50
51 For receiving messages, two mechanism will be implemented. First,
52 query the interface with poll. Second, register to have a
53 function called by message type. The callback is passed the
54 message type as well as the raw packet (or message object)
55
56 One of the main purposes of this object is to translate between network
57 and host byte order. 'Above' this object, things should be in host
58 byte order.
Dan Talayco21c75c72010-02-12 22:59:24 -080059
60 @todo Consider using SocketServer for listening socket
61 @todo Test transaction code
62
63 @var rcv_size The receive size to use for receive calls
64 @var max_pkts The max size of the receive queue
65 @var keep_alive If true, listen for echo requests and respond w/
Dan Talaycod12b6612010-03-07 22:00:46 -080066 @var keep_alive If true, listen for echo requests and respond w/
Dan Talayco21c75c72010-02-12 22:59:24 -080067 echo replies
Dan Talayco710438c2010-02-18 15:16:07 -080068 @var initial_hello If true, will send a hello message immediately
69 upon connecting to the switch
Dan Talaycod12b6612010-03-07 22:00:46 -080070 @var exit_on_reset If true, terminate controller on connection reset
Dan Talayco21c75c72010-02-12 22:59:24 -080071 @var host The host to use for connect
72 @var port The port to connect on
73 @var packets_total Total number of packets received
74 @var packets_expired Number of packets popped from queue as queue full
75 @var packets_handled Number of packets handled by something
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080076 @var dbg_state Debug indication of state
Dan Talayco34089522010-02-07 23:07:41 -080077 """
78
Dan Talayco48370102010-03-03 15:17:33 -080079 def __init__(self, host='127.0.0.1', port=6633, max_pkts=1024):
Dan Talayco21c75c72010-02-12 22:59:24 -080080 Thread.__init__(self)
Dan Talayco1b3f6902010-02-15 14:14:19 -080081 # Socket related
Dan Talayco21c75c72010-02-12 22:59:24 -080082 self.rcv_size = RCV_SIZE_DEFAULT
Dan Talayco1b3f6902010-02-15 14:14:19 -080083 self.listen_socket = None
84 self.switch_socket = None
85 self.switch_addr = None
Dan Talayco710438c2010-02-18 15:16:07 -080086 self.socs = []
87 self.connect_cv = Condition()
Dan Talaycoe226eb12010-02-18 23:06:30 -080088 self.message_cv = Condition()
Dan Talayco1b3f6902010-02-15 14:14:19 -080089
90 # Counters
Dan Talayco21c75c72010-02-12 22:59:24 -080091 self.socket_errors = 0
92 self.parse_errors = 0
Dan Talayco21c75c72010-02-12 22:59:24 -080093 self.packets_total = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -080094 self.packets_expired = 0
95 self.packets_handled = 0
Dan Talaycoe226eb12010-02-18 23:06:30 -080096 self.poll_discards = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -080097
98 # State
Dan Talayco21c75c72010-02-12 22:59:24 -080099 self.packets = []
100 self.sync = Lock()
101 self.handlers = {}
102 self.keep_alive = False
Dan Talayco710438c2010-02-18 15:16:07 -0800103 self.active = True
104 self.initial_hello = True
Dan Talaycod12b6612010-03-07 22:00:46 -0800105 self.exit_on_reset = True
Dan Talayco1b3f6902010-02-15 14:14:19 -0800106
107 # Settings
108 self.max_pkts = max_pkts
109 self.passive = True
Dan Talayco48370102010-03-03 15:17:33 -0800110 self.host = host
111 self.port = port
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800112 self.dbg_state = "init"
Dan Talayco48370102010-03-03 15:17:33 -0800113 self.logger = logging.getLogger("controller")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800114
Dan Talaycoe226eb12010-02-18 23:06:30 -0800115 # Transaction and message type waiting variables
116 # xid_cv: Condition variable (semaphore) for packet waiters
Dan Talayco21c75c72010-02-12 22:59:24 -0800117 # xid: Transaction ID being waited on
118 # xid_response: Transaction response message
Dan Talaycoe226eb12010-02-18 23:06:30 -0800119 # expect_msg: Is a message being waited on
120 # expect_msg_cv: Semaphore for waiters
121 # expect_msg_type: Type of message expected
122 # expect_msg_response: Result passed through here
123
Dan Talayco21c75c72010-02-12 22:59:24 -0800124 self.xid_cv = Condition()
125 self.xid = None
126 self.xid_response = None
127
Dan Talaycoe226eb12010-02-18 23:06:30 -0800128 self.expect_msg = False
129 self.expect_msg_cv = Condition()
130 self.expect_msg_type = None
131 self.expect_msg_response = None
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800132 self.buffered_input = ""
Dan Talaycoe226eb12010-02-18 23:06:30 -0800133
Dan Talaycod12b6612010-03-07 22:00:46 -0800134 def _pkt_handle(self, pkt):
135 """
136 Check for all packet handling conditions
137
138 Parse and verify message
139 Check if XID matches something waiting
140 Check if message is being expected for a poll operation
141 Check if keep alive is on and message is an echo request
142 Check if any registered handler wants the packet
143 Enqueue if none of those conditions is met
144
145 an echo request in case keep_alive is true, followed by
146 registered message handlers.
Glen Gibb6d467062010-07-08 16:15:08 -0700147 @param pkt The raw packet (string) which may contain multiple OF msgs
Dan Talaycod12b6612010-03-07 22:00:46 -0800148 """
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800149
150 # snag any left over data from last read()
151 pkt = self.buffered_input + pkt
152 self.buffered_input = ""
153
Glen Gibb6d467062010-07-08 16:15:08 -0700154 # Process each of the OF msgs inside the pkt
155 offset = 0
156 while offset < len(pkt):
157 # Parse the header to get type
158 hdr = of_header_parse(pkt[offset:])
159 if not hdr:
160 self.logger.info("Could not parse header, pkt len", len(pkt))
161 self.parse_errors += 1
Dan Talaycod12b6612010-03-07 22:00:46 -0800162 return
Glen Gibb6d467062010-07-08 16:15:08 -0700163 if hdr.length == 0:
164 self.logger.info("Header length is zero")
165 self.parse_errors += 1
Dan Talaycod12b6612010-03-07 22:00:46 -0800166 return
167
Glen Gibb6d467062010-07-08 16:15:08 -0700168 # Extract the raw message bytes
Ed Swierk836e5bd2012-03-20 11:08:53 -0700169 if (offset + hdr.length) > len(pkt):
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800170 break
Glen Gibb6d467062010-07-08 16:15:08 -0700171 rawmsg = pkt[offset : offset + hdr.length]
Dan Talaycod12b6612010-03-07 22:00:46 -0800172
Glen Gibb6d467062010-07-08 16:15:08 -0700173 self.logger.debug("Msg in: len %d. offset %d. type %s. hdr.len %d" %
174 (len(pkt), offset, ofp_type_map[hdr.type], hdr.length))
Dan Talayco4306d3e2011-09-07 09:42:26 -0700175 offset += hdr.length
Glen Gibb6d467062010-07-08 16:15:08 -0700176 if hdr.version != OFP_VERSION:
177 self.logger.error("Version %d does not match OFTest version %d"
178 % (hdr.version, OFP_VERSION))
179 print "Version %d does not match OFTest version %d" % \
180 (hdr.version, OFP_VERSION)
181 self.active = False
182 self.switch_socket = None
183 self.kill()
Dan Talaycod12b6612010-03-07 22:00:46 -0800184
Glen Gibb6d467062010-07-08 16:15:08 -0700185 msg = of_message_parse(rawmsg)
186 if not msg:
187 self.parse_errors += 1
188 self.logger.warn("Could not parse message")
189 continue
190
191 self.sync.acquire()
192
193 # Check if transaction is waiting
194 self.xid_cv.acquire()
195 if self.xid:
196 if hdr.xid == self.xid:
197 self.logger.debug("Matched expected XID " + str(hdr.xid))
198 self.xid_response = (msg, rawmsg)
199 self.xid = None
200 self.xid_cv.notify()
201 self.xid_cv.release()
202 self.sync.release()
203 continue
204 self.xid_cv.release()
205
206 # PREVENT QUEUE ACCESS AT THIS POINT?
207 # Check if anyone waiting on this type of message
208 self.expect_msg_cv.acquire()
209 if self.expect_msg:
210 if not self.expect_msg_type or (self.expect_msg_type == hdr.type):
root2843d2b2012-04-06 10:27:46 -0700211 self.logger.debug("Matched msg; type %s. expected %s " %
212 (ofp_type_map[hdr.type],
213 ofp_type_map[self.expect_msg_type]))
Glen Gibb6d467062010-07-08 16:15:08 -0700214 self.expect_msg_response = (msg, rawmsg)
215 self.expect_msg = False
216 self.expect_msg_cv.notify()
217 self.expect_msg_cv.release()
218 self.sync.release()
219 continue
220 self.expect_msg_cv.release()
221
222 # Check if keep alive is set; if so, respond to echo requests
223 if self.keep_alive:
224 if hdr.type == OFPT_ECHO_REQUEST:
225 self.sync.release()
226 self.logger.debug("Responding to echo request")
227 rep = echo_reply()
228 rep.header.xid = hdr.xid
229 # Ignoring additional data
230 self.message_send(rep.pack(), zero_xid=True)
231 continue
232
233 # Now check for message handlers; preference is given to
234 # handlers for a specific packet
235 handled = False
236 if hdr.type in self.handlers.keys():
237 handled = self.handlers[hdr.type](self, msg, rawmsg)
238 if not handled and ("all" in self.handlers.keys()):
239 handled = self.handlers["all"](self, msg, rawmsg)
240
241 if not handled: # Not handled, enqueue
242 self.logger.debug("Enqueuing pkt type " + ofp_type_map[hdr.type])
243 if len(self.packets) >= self.max_pkts:
244 self.packets.pop(0)
245 self.packets_expired += 1
246 self.packets.append((msg, rawmsg))
247 self.packets_total += 1
248 else:
249 self.packets_handled += 1
250 self.logger.debug("Message handled by callback")
251
252 self.sync.release()
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800253 # end of 'while offset < len(pkt)'
254 # note that if offset = len(pkt), this is
255 # appends a harmless empty string
256 self.buffered_input += pkt[offset:]
Dan Talaycod12b6612010-03-07 22:00:46 -0800257
Dan Talayco710438c2010-02-18 15:16:07 -0800258 def _socket_ready_handle(self, s):
259 """
260 Handle an input-ready socket
261 @param s The socket object that is ready
262 @retval True, reset the switch connection
263 """
264
265 if s == self.listen_socket:
266 if self.switch_socket:
Ed Swierk0214da22012-03-19 14:58:02 -0700267 return False
Dan Talayco710438c2010-02-18 15:16:07 -0800268
269 (self.switch_socket, self.switch_addr) = \
270 self.listen_socket.accept()
Dan Talayco48370102010-03-03 15:17:33 -0800271 self.logger.info("Got cxn to " + str(self.switch_addr))
Dan Talayco710438c2010-02-18 15:16:07 -0800272 # Notify anyone waiting
273 self.connect_cv.acquire()
274 self.connect_cv.notify()
275 self.connect_cv.release()
276 self.socs.append(self.switch_socket)
277 if self.initial_hello:
278 self.message_send(hello())
279 elif s == self.switch_socket:
280 try:
281 pkt = self.switch_socket.recv(self.rcv_size)
282 except:
Dan Talaycod12b6612010-03-07 22:00:46 -0800283 self.logger.warning("Error on switch read")
Dan Talayco710438c2010-02-18 15:16:07 -0800284 return True
285
286 if not self.active:
287 return False
288
289 if len(pkt) == 0:
Dan Talayco48370102010-03-03 15:17:33 -0800290 self.logger.info("zero-len pkt in")
Dan Talayco710438c2010-02-18 15:16:07 -0800291 return True
292
Dan Talaycod12b6612010-03-07 22:00:46 -0800293 self._pkt_handle(pkt)
Dan Talayco710438c2010-02-18 15:16:07 -0800294 else:
Dan Talayco48370102010-03-03 15:17:33 -0800295 self.logger.error("Unknown socket ready: " + str(s))
Dan Talayco710438c2010-02-18 15:16:07 -0800296 return True
297
298 return False
299
Dan Talayco1b3f6902010-02-15 14:14:19 -0800300 def run(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800301 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800302 Activity function for class
Dan Talayco21c75c72010-02-12 22:59:24 -0800303
Dan Talayco1b3f6902010-02-15 14:14:19 -0800304 Assumes connection to switch already exists. Listens on
305 switch_socket for messages until an error (or zero len pkt)
306 occurs.
Dan Talayco21c75c72010-02-12 22:59:24 -0800307
Dan Talayco1b3f6902010-02-15 14:14:19 -0800308 When there is a message on the socket, check for handlers; queue the
309 packet if no one handles the packet.
310
311 See note for controller describing the limitation of a single
312 connection for now.
313 """
314
Dan Talayco710438c2010-02-18 15:16:07 -0800315 self.dbg_state = "starting"
Dan Talayco1b3f6902010-02-15 14:14:19 -0800316
Dan Talayco710438c2010-02-18 15:16:07 -0800317 # Create listen socket
Dan Talayco48370102010-03-03 15:17:33 -0800318 self.logger.info("Create/listen at " + self.host + ":" +
Dan Talayco710438c2010-02-18 15:16:07 -0800319 str(self.port))
320 self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
321 self.listen_socket.setsockopt(socket.SOL_SOCKET,
322 socket.SO_REUSEADDR, 1)
323 self.listen_socket.bind((self.host, self.port))
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800324 self.dbg_state = "listening"
Dan Talayco21c75c72010-02-12 22:59:24 -0800325 self.listen_socket.listen(LISTEN_QUEUE_SIZE)
Dan Talayco21c75c72010-02-12 22:59:24 -0800326
Dan Talayco48370102010-03-03 15:17:33 -0800327 self.logger.info("Waiting for switch connection")
Dan Talayco710438c2010-02-18 15:16:07 -0800328 self.socs = [self.listen_socket]
329 self.dbg_state = "running"
330 while self.active:
331 reset_switch_cxn = False
332 try:
333 sel_in, sel_out, sel_err = \
334 select.select(self.socs, [], self.socs, 1)
335 except:
336 print sys.exc_info()
Dan Talayco48370102010-03-03 15:17:33 -0800337 self.logger.error("Select error, exiting")
Dan Talayco710438c2010-02-18 15:16:07 -0800338 sys.exit(1)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800339
Dan Talayco710438c2010-02-18 15:16:07 -0800340 if not self.active:
341 break
Dan Talayco1b3f6902010-02-15 14:14:19 -0800342
Dan Talayco710438c2010-02-18 15:16:07 -0800343 for s in sel_in:
344 reset_switch_cxn = self._socket_ready_handle(s)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800345
Dan Talayco710438c2010-02-18 15:16:07 -0800346 for s in sel_err:
Dan Talayco48370102010-03-03 15:17:33 -0800347 self.logger.error("Got socket error on: " + str(s))
Dan Talayco710438c2010-02-18 15:16:07 -0800348 if s == self.switch_socket:
349 reset_switch_cxn = True
350 else:
Dan Talayco48370102010-03-03 15:17:33 -0800351 self.logger.error("Socket error; exiting")
Dan Talayco710438c2010-02-18 15:16:07 -0800352 self.active = False
353 break
354
355 if self.active and reset_switch_cxn:
Dan Talaycod12b6612010-03-07 22:00:46 -0800356 if self.exit_on_reset:
357 self.kill()
358 else:
359 self.logger.warning("Closing switch cxn")
360 try:
361 self.switch_socket.close()
362 except:
363 pass
364 self.switch_socket = None
365 self.socs = self.socs[0:1]
Dan Talayco710438c2010-02-18 15:16:07 -0800366
367 # End of main loop
368 self.dbg_state = "closing"
Dan Talayco48370102010-03-03 15:17:33 -0800369 self.logger.info("Exiting controller thread")
Dan Talayco710438c2010-02-18 15:16:07 -0800370 self.shutdown()
371
372 def connect(self, timeout=None):
373 """
374 Connect to the switch
375
376 @param timeout If None, block until connected. If 0, return
377 immedidately. Otherwise, block for up to timeout seconds
378 @return Boolean, True if connected
379 """
380
381 if timeout == 0:
382 return self.switch_socket is not None
383 if self.switch_socket is not None:
384 return True
385 self.connect_cv.acquire()
386 self.connect_cv.wait(timeout)
387 self.connect_cv.release()
388
389 return self.switch_socket is not None
390
391 def kill(self):
392 """
393 Force the controller thread to quit
394
395 Just sets the active state variable to false and expects
396 the select timeout to kick in
397 """
398 self.active = False
Dan Talayco21c75c72010-02-12 22:59:24 -0800399
Dan Talayco1b3f6902010-02-15 14:14:19 -0800400 def shutdown(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800401 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800402 Shutdown the controller closing all sockets
Dan Talayco21c75c72010-02-12 22:59:24 -0800403
Dan Talayco1b3f6902010-02-15 14:14:19 -0800404 @todo Might want to synchronize shutdown with self.sync...
405 """
Dan Talayco710438c2010-02-18 15:16:07 -0800406 self.active = False
407 try:
408 self.switch_socket.shutdown(socket.SHUT_RDWR)
409 except:
Dan Talayco48370102010-03-03 15:17:33 -0800410 self.logger.info("Ignoring switch soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800411 self.switch_socket = None
Dan Talayco1b3f6902010-02-15 14:14:19 -0800412
Dan Talayco710438c2010-02-18 15:16:07 -0800413 try:
414 self.listen_socket.shutdown(socket.SHUT_RDWR)
415 except:
Dan Talayco48370102010-03-03 15:17:33 -0800416 self.logger.info("Ignoring listen soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800417 self.listen_socket = None
418 self.dbg_state = "down"
419
Dan Talayco34089522010-02-07 23:07:41 -0800420 def register(self, msg_type, handler):
421 """
422 Register a callback to receive a specific message type.
423
424 Only one handler may be registered for a given message type.
Dan Talaycod12b6612010-03-07 22:00:46 -0800425
426 WARNING: A lock is held during the handler call back, so
427 the handler should not make any blocking calls
428
Dan Talayco34089522010-02-07 23:07:41 -0800429 @param msg_type The type of message to receive. May be DEFAULT
Dan Talayco21c75c72010-02-12 22:59:24 -0800430 for all non-handled packets. The special type, the string "all"
431 will send all packets to the handler.
Dan Talayco34089522010-02-07 23:07:41 -0800432 @param handler The function to call when a message of the given
433 type is received.
434 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800435 # Should check type is valid
436 if not handler and msg_type in self.handlers.keys():
437 del self.handlers[msg_type]
438 return
439 self.handlers[msg_type] = handler
Dan Talayco34089522010-02-07 23:07:41 -0800440
Dan Talayco21c75c72010-02-12 22:59:24 -0800441 def poll(self, exp_msg=None, timeout=None):
Dan Talayco34089522010-02-07 23:07:41 -0800442 """
443 Wait for the next OF message received from the switch.
444
445 @param exp_msg If set, return only when this type of message
Dan Talayco48370102010-03-03 15:17:33 -0800446 is received (unless timeout occurs).
Dan Talaycoe226eb12010-02-18 23:06:30 -0800447 @param timeout If None, do not block. Otherwise, sleep in
Dan Talayco48370102010-03-03 15:17:33 -0800448 intervals of 1 second until message is received.
Dan Talayco34089522010-02-07 23:07:41 -0800449
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800450 @retval A pair (msg, pkt) where msg is a message object and pkt
451 the string representing the packet as received from the socket.
452 This allows additional parsing by the receiver if necessary.
453
Dan Talayco34089522010-02-07 23:07:41 -0800454 The data members in the message are in host endian order.
Dan Talayco48370102010-03-03 15:17:33 -0800455 If an error occurs, (None, None) is returned
456
457 The current queue is searched for a message of the desired type
458 before sleeping on message in events.
Dan Talayco34089522010-02-07 23:07:41 -0800459 """
Dan Talayco34089522010-02-07 23:07:41 -0800460
Dan Talaycoe226eb12010-02-18 23:06:30 -0800461 msg = pkt = None
Dan Talayco34089522010-02-07 23:07:41 -0800462
Dan Talayco48370102010-03-03 15:17:33 -0800463 self.logger.debug("Poll for " + ofp_type_map[exp_msg])
Dan Talaycoe226eb12010-02-18 23:06:30 -0800464 # First check the current queue
465 self.sync.acquire()
466 if len(self.packets) > 0:
467 if not exp_msg:
468 (msg, pkt) = self.packets.pop(0)
469 self.sync.release()
470 return (msg, pkt)
471 else:
472 for i in range(len(self.packets)):
473 msg = self.packets[i][0]
474 if msg.header.type == exp_msg:
475 (msg, pkt) = self.packets.pop(i)
476 self.sync.release()
477 return (msg, pkt)
478
479 # Okay, not currently in the queue
480 if timeout is None or timeout <= 0:
Dan Talayco21c75c72010-02-12 22:59:24 -0800481 self.sync.release()
Dan Talaycoe226eb12010-02-18 23:06:30 -0800482 return (None, None)
Dan Talayco21c75c72010-02-12 22:59:24 -0800483
Dan Talayco48370102010-03-03 15:17:33 -0800484 msg = pkt = None
485 self.logger.debug("Entering timeout")
Dan Talaycoe226eb12010-02-18 23:06:30 -0800486 # Careful of race condition releasing sync before message cv
Dan Talayco90576bd2010-02-19 10:59:02 -0800487 # Also, this style is ripe for a lockup.
Dan Talaycoe226eb12010-02-18 23:06:30 -0800488 self.expect_msg_cv.acquire()
Dan Talayco48370102010-03-03 15:17:33 -0800489 self.expect_msg_response = None
Dan Talaycoe226eb12010-02-18 23:06:30 -0800490 self.expect_msg = True
491 self.expect_msg_type = exp_msg
root2843d2b2012-04-06 10:27:46 -0700492 self.sync.release()
Dan Talaycoe226eb12010-02-18 23:06:30 -0800493 self.expect_msg_cv.wait(timeout)
494 if self.expect_msg_response is not None:
495 (msg, pkt) = self.expect_msg_response
root2843d2b2012-04-06 10:27:46 -0700496 self.expect_msg = False # Check this -- only 1 thread?
Dan Talaycoe226eb12010-02-18 23:06:30 -0800497 self.expect_msg_cv.release()
498
499 if msg is None:
Dan Talayco48370102010-03-03 15:17:33 -0800500 self.logger.debug("Poll time out")
501 else:
502 self.logger.debug("Got msg " + str(msg))
Dan Talaycoe226eb12010-02-18 23:06:30 -0800503
504 return (msg, pkt)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800505
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800506 def transact(self, msg, timeout=None, zero_xid=False):
Dan Talayco34089522010-02-07 23:07:41 -0800507 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800508 Run a message transaction with the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800509
510 Send the message in msg and wait for a reply with a matching
Dan Talayco21c75c72010-02-12 22:59:24 -0800511 transaction id. Transactions have the highest priority in
512 received message handling.
Dan Talaycoe37999f2010-02-09 15:27:12 -0800513
Dan Talayco21c75c72010-02-12 22:59:24 -0800514 @param msg The message object to send; must not be a string
515 @param timeout The timeout in seconds (?)
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800516 @param zero_xid Normally, if the XID is 0 an XID will be generated
517 for the message. Set xero_xid to override this behavior
Dan Talayco21c75c72010-02-12 22:59:24 -0800518 @return The matching message object or None if unsuccessful
Dan Talaycoe37999f2010-02-09 15:27:12 -0800519
Dan Talayco34089522010-02-07 23:07:41 -0800520 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800521
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800522 if not zero_xid and msg.header.xid == 0:
523 msg.header.xid = gen_xid()
524
Dan Talayco21c75c72010-02-12 22:59:24 -0800525 self.xid_cv.acquire()
526 if self.xid:
527 self.xid_cv.release()
Dan Talayco48370102010-03-03 15:17:33 -0800528 self.logger.error("Can only run one transaction at a time")
Dan Talayco21c75c72010-02-12 22:59:24 -0800529 return None
530
531 self.xid = msg.header.xid
532 self.xid_response = None
533 self.message_send(msg.pack())
534 self.xid_cv.wait(timeout)
Dan Talaycod12b6612010-03-07 22:00:46 -0800535 if self.xid_response:
Dan Talayco09c2c592010-05-13 14:21:52 -0700536 (resp, pkt) = self.xid_response
Dan Talaycod12b6612010-03-07 22:00:46 -0800537 self.xid_response = None
538 else:
Dan Talayco09c2c592010-05-13 14:21:52 -0700539 (resp, pkt) = (None, None)
Dan Talayco21c75c72010-02-12 22:59:24 -0800540 self.xid_cv.release()
Dan Talayco09c2c592010-05-13 14:21:52 -0700541 if resp is None:
542 self.logger.warning("No response for xid " + str(self.xid))
543 return (resp, pkt)
Dan Talayco34089522010-02-07 23:07:41 -0800544
Dan Talayco710438c2010-02-18 15:16:07 -0800545 def message_send(self, msg, zero_xid=False):
Dan Talayco34089522010-02-07 23:07:41 -0800546 """
547 Send the message to the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800548
Dan Talayco11c26e72010-03-07 22:03:57 -0800549 @param msg A string or OpenFlow message object to be forwarded to
550 the switch.
551 @param zero_xid If msg is an OpenFlow object (not a string) and if
Dan Talayco710438c2010-02-18 15:16:07 -0800552 the XID in the header is 0, then an XID will be generated
553 for the message. Set xero_xid to override this behavior (and keep an
554 existing 0 xid)
Dan Talaycoe37999f2010-02-09 15:27:12 -0800555
Dan Talayco710438c2010-02-18 15:16:07 -0800556 @return -1 if error, 0 on success
Dan Talayco34089522010-02-07 23:07:41 -0800557
Dan Talayco21c75c72010-02-12 22:59:24 -0800558 """
559
Dan Talayco1b3f6902010-02-15 14:14:19 -0800560 if not self.switch_socket:
561 # Sending a string indicates the message is ready to go
Dan Talayco48370102010-03-03 15:17:33 -0800562 self.logger.info("message_send: no socket")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800563 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800564 #@todo If not string, try to pack
Dan Talayco21c75c72010-02-12 22:59:24 -0800565 if type(msg) != type(""):
Dan Talayco710438c2010-02-18 15:16:07 -0800566 try:
567 if msg.header.xid == 0 and not zero_xid:
568 msg.header.xid = gen_xid()
569 outpkt = msg.pack()
570 except:
Dan Talayco48370102010-03-03 15:17:33 -0800571 self.logger.error(
Dan Talayco710438c2010-02-18 15:16:07 -0800572 "message_send: not an OF message or string?")
573 return -1
574 else:
575 outpkt = msg
Dan Talayco21c75c72010-02-12 22:59:24 -0800576
Dan Talayco48370102010-03-03 15:17:33 -0800577 self.logger.debug("Sending pkt of len " + str(len(outpkt)))
Dan Talayco710438c2010-02-18 15:16:07 -0800578 if self.switch_socket.sendall(outpkt) is None:
579 return 0
580
Dan Talayco48370102010-03-03 15:17:33 -0800581 self.logger.error("Unknown error on sendall")
Dan Talayco710438c2010-02-18 15:16:07 -0800582 return -1
Dan Talayco21c75c72010-02-12 22:59:24 -0800583
584 def __str__(self):
585 string = "Controller:\n"
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800586 string += " state " + self.dbg_state + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800587 string += " switch_addr " + str(self.switch_addr) + "\n"
588 string += " pending pkts " + str(len(self.packets)) + "\n"
589 string += " total pkts " + str(self.packets_total) + "\n"
590 string += " expired pkts " + str(self.packets_expired) + "\n"
591 string += " handled pkts " + str(self.packets_handled) + "\n"
Dan Talaycoe226eb12010-02-18 23:06:30 -0800592 string += " poll discards " + str(self.poll_discards) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800593 string += " parse errors " + str(self.parse_errors) + "\n"
594 string += " sock errrors " + str(self.socket_errors) + "\n"
595 string += " max pkts " + str(self.max_pkts) + "\n"
596 string += " host " + str(self.host) + "\n"
597 string += " port " + str(self.port) + "\n"
598 string += " keep_alive " + str(self.keep_alive) + "\n"
599 return string
600
601 def show(self):
602 print str(self)
603
604def sample_handler(controller, msg, pkt):
605 """
606 Sample message handler
607
608 This is the prototype for functions registered with the controller
609 class for packet reception
610
611 @param controller The controller calling the handler
612 @param msg The parsed message object
613 @param pkt The raw packet that was received on the socket. This is
614 in case the packet contains extra unparsed data.
615 @returns Boolean value indicating if the packet was handled. If
616 not handled, the packet is placed in the queue for pollers to received
617 """
618 pass