blob: d8747b7e260fa79218db157df3968a9e8a7c460e [file] [log] [blame]
Dan Talayco34089522010-02-07 23:07:41 -08001"""
2OpenFlow Test Framework
3
4Controller class
5
6Provide the interface to the control channel to the switch under test.
7
8Class inherits from thread so as to run in background allowing
9asynchronous callbacks (if needed, not required). Also supports
10polling.
11
12The controller thread maintains a queue. Incoming messages that
13are not handled by a callback function are placed in this queue for
14poll calls.
15
16Callbacks and polling support specifying the message type
17
18@todo Support transaction semantics via xid
Dan Talayco1b3f6902010-02-15 14:14:19 -080019@todo Support select and listen on an administrative socket (or
20use a timeout to support clean shutdown).
21
22Currently only one connection is accepted during the life of
23the controller. There seems
24to be no clean way to interrupt an accept call. Using select that also listens
25on an administrative socket and can shut down the socket might work.
26
Dan Talayco34089522010-02-07 23:07:41 -080027"""
28
Dan Talayco34089522010-02-07 23:07:41 -080029import os
30import socket
31import time
Dan Talayco34089522010-02-07 23:07:41 -080032from threading import Thread
33from threading import Lock
Dan Talayco21c75c72010-02-12 22:59:24 -080034from threading import Condition
Dan Talayco34089522010-02-07 23:07:41 -080035from message import *
Dan Talaycoe37999f2010-02-09 15:27:12 -080036from parse import *
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080037from ofutils import *
Dan Talayco710438c2010-02-18 15:16:07 -080038# For some reason, it seems select to be last (or later).
39# Otherwise get an attribute error when calling select.select
40import select
Dan Talayco48370102010-03-03 15:17:33 -080041import logging
42
43##@todo Find a better home for these identifiers (controller)
Glen Gibb741b1182010-07-08 16:43:58 -070044RCV_SIZE_DEFAULT = 32768
Dan Talayco48370102010-03-03 15:17:33 -080045LISTEN_QUEUE_SIZE = 1
Dan Talayco34089522010-02-07 23:07:41 -080046
47class Controller(Thread):
48 """
49 Class abstracting the control interface to the switch.
50
51 For receiving messages, two mechanism will be implemented. First,
52 query the interface with poll. Second, register to have a
53 function called by message type. The callback is passed the
54 message type as well as the raw packet (or message object)
55
56 One of the main purposes of this object is to translate between network
57 and host byte order. 'Above' this object, things should be in host
58 byte order.
Dan Talayco21c75c72010-02-12 22:59:24 -080059
60 @todo Consider using SocketServer for listening socket
61 @todo Test transaction code
62
63 @var rcv_size The receive size to use for receive calls
64 @var max_pkts The max size of the receive queue
65 @var keep_alive If true, listen for echo requests and respond w/
Dan Talaycod12b6612010-03-07 22:00:46 -080066 @var keep_alive If true, listen for echo requests and respond w/
Dan Talayco21c75c72010-02-12 22:59:24 -080067 echo replies
Dan Talayco710438c2010-02-18 15:16:07 -080068 @var initial_hello If true, will send a hello message immediately
69 upon connecting to the switch
Dan Talaycod12b6612010-03-07 22:00:46 -080070 @var exit_on_reset If true, terminate controller on connection reset
Dan Talayco21c75c72010-02-12 22:59:24 -080071 @var host The host to use for connect
72 @var port The port to connect on
73 @var packets_total Total number of packets received
74 @var packets_expired Number of packets popped from queue as queue full
75 @var packets_handled Number of packets handled by something
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080076 @var dbg_state Debug indication of state
Dan Talayco34089522010-02-07 23:07:41 -080077 """
78
Dan Talayco48370102010-03-03 15:17:33 -080079 def __init__(self, host='127.0.0.1', port=6633, max_pkts=1024):
Dan Talayco21c75c72010-02-12 22:59:24 -080080 Thread.__init__(self)
Dan Talayco1b3f6902010-02-15 14:14:19 -080081 # Socket related
Dan Talayco21c75c72010-02-12 22:59:24 -080082 self.rcv_size = RCV_SIZE_DEFAULT
Dan Talayco1b3f6902010-02-15 14:14:19 -080083 self.listen_socket = None
84 self.switch_socket = None
85 self.switch_addr = None
Dan Talayco710438c2010-02-18 15:16:07 -080086 self.socs = []
87 self.connect_cv = Condition()
Dan Talaycoe226eb12010-02-18 23:06:30 -080088 self.message_cv = Condition()
Dan Talayco1b3f6902010-02-15 14:14:19 -080089
90 # Counters
Dan Talayco21c75c72010-02-12 22:59:24 -080091 self.socket_errors = 0
92 self.parse_errors = 0
Dan Talayco21c75c72010-02-12 22:59:24 -080093 self.packets_total = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -080094 self.packets_expired = 0
95 self.packets_handled = 0
Dan Talaycoe226eb12010-02-18 23:06:30 -080096 self.poll_discards = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -080097
98 # State
Dan Talayco21c75c72010-02-12 22:59:24 -080099 self.packets = []
100 self.sync = Lock()
101 self.handlers = {}
102 self.keep_alive = False
Dan Talayco710438c2010-02-18 15:16:07 -0800103 self.active = True
104 self.initial_hello = True
Dan Talaycod12b6612010-03-07 22:00:46 -0800105 self.exit_on_reset = True
Dan Talayco1b3f6902010-02-15 14:14:19 -0800106
107 # Settings
108 self.max_pkts = max_pkts
109 self.passive = True
Dan Talayco48370102010-03-03 15:17:33 -0800110 self.host = host
111 self.port = port
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800112 self.dbg_state = "init"
Dan Talayco48370102010-03-03 15:17:33 -0800113 self.logger = logging.getLogger("controller")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800114
Dan Talaycoe226eb12010-02-18 23:06:30 -0800115 # Transaction and message type waiting variables
116 # xid_cv: Condition variable (semaphore) for packet waiters
Dan Talayco21c75c72010-02-12 22:59:24 -0800117 # xid: Transaction ID being waited on
118 # xid_response: Transaction response message
Dan Talaycoe226eb12010-02-18 23:06:30 -0800119 # expect_msg: Is a message being waited on
120 # expect_msg_cv: Semaphore for waiters
121 # expect_msg_type: Type of message expected
122 # expect_msg_response: Result passed through here
123
Dan Talayco21c75c72010-02-12 22:59:24 -0800124 self.xid_cv = Condition()
125 self.xid = None
126 self.xid_response = None
127
Dan Talaycoe226eb12010-02-18 23:06:30 -0800128 self.expect_msg = False
129 self.expect_msg_cv = Condition()
130 self.expect_msg_type = None
131 self.expect_msg_response = None
132
Dan Talaycod12b6612010-03-07 22:00:46 -0800133 def _pkt_handle(self, pkt):
134 """
135 Check for all packet handling conditions
136
137 Parse and verify message
138 Check if XID matches something waiting
139 Check if message is being expected for a poll operation
140 Check if keep alive is on and message is an echo request
141 Check if any registered handler wants the packet
142 Enqueue if none of those conditions is met
143
144 an echo request in case keep_alive is true, followed by
145 registered message handlers.
Glen Gibb6d467062010-07-08 16:15:08 -0700146 @param pkt The raw packet (string) which may contain multiple OF msgs
Dan Talaycod12b6612010-03-07 22:00:46 -0800147 """
Glen Gibb6d467062010-07-08 16:15:08 -0700148 # Process each of the OF msgs inside the pkt
149 offset = 0
150 while offset < len(pkt):
151 # Parse the header to get type
152 hdr = of_header_parse(pkt[offset:])
153 if not hdr:
154 self.logger.info("Could not parse header, pkt len", len(pkt))
155 self.parse_errors += 1
Dan Talaycod12b6612010-03-07 22:00:46 -0800156 return
Glen Gibb6d467062010-07-08 16:15:08 -0700157 if hdr.length == 0:
158 self.logger.info("Header length is zero")
159 self.parse_errors += 1
Dan Talaycod12b6612010-03-07 22:00:46 -0800160 return
161
Glen Gibb6d467062010-07-08 16:15:08 -0700162 # Extract the raw message bytes
163 rawmsg = pkt[offset : offset + hdr.length]
Dan Talaycod12b6612010-03-07 22:00:46 -0800164
Glen Gibb6d467062010-07-08 16:15:08 -0700165 self.logger.debug("Msg in: len %d. offset %d. type %s. hdr.len %d" %
166 (len(pkt), offset, ofp_type_map[hdr.type], hdr.length))
167 if hdr.version != OFP_VERSION:
168 self.logger.error("Version %d does not match OFTest version %d"
169 % (hdr.version, OFP_VERSION))
170 print "Version %d does not match OFTest version %d" % \
171 (hdr.version, OFP_VERSION)
172 self.active = False
173 self.switch_socket = None
174 self.kill()
Dan Talaycod12b6612010-03-07 22:00:46 -0800175
Glen Gibb6d467062010-07-08 16:15:08 -0700176 msg = of_message_parse(rawmsg)
177 if not msg:
178 self.parse_errors += 1
179 self.logger.warn("Could not parse message")
180 continue
181
182 self.sync.acquire()
183
184 # Check if transaction is waiting
185 self.xid_cv.acquire()
186 if self.xid:
187 if hdr.xid == self.xid:
188 self.logger.debug("Matched expected XID " + str(hdr.xid))
189 self.xid_response = (msg, rawmsg)
190 self.xid = None
191 self.xid_cv.notify()
192 self.xid_cv.release()
193 self.sync.release()
194 continue
195 self.xid_cv.release()
196
197 # PREVENT QUEUE ACCESS AT THIS POINT?
198 # Check if anyone waiting on this type of message
199 self.expect_msg_cv.acquire()
200 if self.expect_msg:
201 if not self.expect_msg_type or (self.expect_msg_type == hdr.type):
202 self.logger.debug("Matched expected msg type "
203 + ofp_type_map[hdr.type])
204 self.expect_msg_response = (msg, rawmsg)
205 self.expect_msg = False
206 self.expect_msg_cv.notify()
207 self.expect_msg_cv.release()
208 self.sync.release()
209 continue
210 self.expect_msg_cv.release()
211
212 # Check if keep alive is set; if so, respond to echo requests
213 if self.keep_alive:
214 if hdr.type == OFPT_ECHO_REQUEST:
215 self.sync.release()
216 self.logger.debug("Responding to echo request")
217 rep = echo_reply()
218 rep.header.xid = hdr.xid
219 # Ignoring additional data
220 self.message_send(rep.pack(), zero_xid=True)
Dan Talayco475023c2010-10-26 22:27:11 -0700221 offset += hdr.length
Glen Gibb6d467062010-07-08 16:15:08 -0700222 continue
223
224 # Now check for message handlers; preference is given to
225 # handlers for a specific packet
226 handled = False
227 if hdr.type in self.handlers.keys():
228 handled = self.handlers[hdr.type](self, msg, rawmsg)
229 if not handled and ("all" in self.handlers.keys()):
230 handled = self.handlers["all"](self, msg, rawmsg)
231
232 if not handled: # Not handled, enqueue
233 self.logger.debug("Enqueuing pkt type " + ofp_type_map[hdr.type])
234 if len(self.packets) >= self.max_pkts:
235 self.packets.pop(0)
236 self.packets_expired += 1
237 self.packets.append((msg, rawmsg))
238 self.packets_total += 1
239 else:
240 self.packets_handled += 1
241 self.logger.debug("Message handled by callback")
242
243 self.sync.release()
244 offset += hdr.length
Dan Talaycod12b6612010-03-07 22:00:46 -0800245
Dan Talayco710438c2010-02-18 15:16:07 -0800246 def _socket_ready_handle(self, s):
247 """
248 Handle an input-ready socket
249 @param s The socket object that is ready
250 @retval True, reset the switch connection
251 """
252
253 if s == self.listen_socket:
254 if self.switch_socket:
Dan Talayco48370102010-03-03 15:17:33 -0800255 self.logger.error("Multiple switch cxns not supported")
Dan Talayco710438c2010-02-18 15:16:07 -0800256 sys.exit(1)
257
258 (self.switch_socket, self.switch_addr) = \
259 self.listen_socket.accept()
Dan Talayco48370102010-03-03 15:17:33 -0800260 self.logger.info("Got cxn to " + str(self.switch_addr))
Dan Talayco710438c2010-02-18 15:16:07 -0800261 # Notify anyone waiting
262 self.connect_cv.acquire()
263 self.connect_cv.notify()
264 self.connect_cv.release()
265 self.socs.append(self.switch_socket)
266 if self.initial_hello:
267 self.message_send(hello())
268 elif s == self.switch_socket:
269 try:
270 pkt = self.switch_socket.recv(self.rcv_size)
271 except:
Dan Talaycod12b6612010-03-07 22:00:46 -0800272 self.logger.warning("Error on switch read")
Dan Talayco710438c2010-02-18 15:16:07 -0800273 return True
274
275 if not self.active:
276 return False
277
278 if len(pkt) == 0:
Dan Talayco48370102010-03-03 15:17:33 -0800279 self.logger.info("zero-len pkt in")
Dan Talayco710438c2010-02-18 15:16:07 -0800280 return True
281
Dan Talaycod12b6612010-03-07 22:00:46 -0800282 self._pkt_handle(pkt)
Dan Talayco710438c2010-02-18 15:16:07 -0800283 else:
Dan Talayco48370102010-03-03 15:17:33 -0800284 self.logger.error("Unknown socket ready: " + str(s))
Dan Talayco710438c2010-02-18 15:16:07 -0800285 return True
286
287 return False
288
Dan Talayco1b3f6902010-02-15 14:14:19 -0800289 def run(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800290 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800291 Activity function for class
Dan Talayco21c75c72010-02-12 22:59:24 -0800292
Dan Talayco1b3f6902010-02-15 14:14:19 -0800293 Assumes connection to switch already exists. Listens on
294 switch_socket for messages until an error (or zero len pkt)
295 occurs.
Dan Talayco21c75c72010-02-12 22:59:24 -0800296
Dan Talayco1b3f6902010-02-15 14:14:19 -0800297 When there is a message on the socket, check for handlers; queue the
298 packet if no one handles the packet.
299
300 See note for controller describing the limitation of a single
301 connection for now.
302 """
303
Dan Talayco710438c2010-02-18 15:16:07 -0800304 self.dbg_state = "starting"
Dan Talayco1b3f6902010-02-15 14:14:19 -0800305
Dan Talayco710438c2010-02-18 15:16:07 -0800306 # Create listen socket
Dan Talayco48370102010-03-03 15:17:33 -0800307 self.logger.info("Create/listen at " + self.host + ":" +
Dan Talayco710438c2010-02-18 15:16:07 -0800308 str(self.port))
309 self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
310 self.listen_socket.setsockopt(socket.SOL_SOCKET,
311 socket.SO_REUSEADDR, 1)
312 self.listen_socket.bind((self.host, self.port))
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800313 self.dbg_state = "listening"
Dan Talayco21c75c72010-02-12 22:59:24 -0800314 self.listen_socket.listen(LISTEN_QUEUE_SIZE)
Dan Talayco21c75c72010-02-12 22:59:24 -0800315
Dan Talayco48370102010-03-03 15:17:33 -0800316 self.logger.info("Waiting for switch connection")
Dan Talayco710438c2010-02-18 15:16:07 -0800317 self.socs = [self.listen_socket]
318 self.dbg_state = "running"
319 while self.active:
320 reset_switch_cxn = False
321 try:
322 sel_in, sel_out, sel_err = \
323 select.select(self.socs, [], self.socs, 1)
324 except:
325 print sys.exc_info()
Dan Talayco48370102010-03-03 15:17:33 -0800326 self.logger.error("Select error, exiting")
Dan Talayco710438c2010-02-18 15:16:07 -0800327 sys.exit(1)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800328
Dan Talayco710438c2010-02-18 15:16:07 -0800329 if not self.active:
330 break
Dan Talayco1b3f6902010-02-15 14:14:19 -0800331
Dan Talayco710438c2010-02-18 15:16:07 -0800332 for s in sel_in:
333 reset_switch_cxn = self._socket_ready_handle(s)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800334
Dan Talayco710438c2010-02-18 15:16:07 -0800335 for s in sel_err:
Dan Talayco48370102010-03-03 15:17:33 -0800336 self.logger.error("Got socket error on: " + str(s))
Dan Talayco710438c2010-02-18 15:16:07 -0800337 if s == self.switch_socket:
338 reset_switch_cxn = True
339 else:
Dan Talayco48370102010-03-03 15:17:33 -0800340 self.logger.error("Socket error; exiting")
Dan Talayco710438c2010-02-18 15:16:07 -0800341 self.active = False
342 break
343
344 if self.active and reset_switch_cxn:
Dan Talaycod12b6612010-03-07 22:00:46 -0800345 if self.exit_on_reset:
346 self.kill()
347 else:
348 self.logger.warning("Closing switch cxn")
349 try:
350 self.switch_socket.close()
351 except:
352 pass
353 self.switch_socket = None
354 self.socs = self.socs[0:1]
Dan Talayco710438c2010-02-18 15:16:07 -0800355
356 # End of main loop
357 self.dbg_state = "closing"
Dan Talayco48370102010-03-03 15:17:33 -0800358 self.logger.info("Exiting controller thread")
Dan Talayco710438c2010-02-18 15:16:07 -0800359 self.shutdown()
360
361 def connect(self, timeout=None):
362 """
363 Connect to the switch
364
365 @param timeout If None, block until connected. If 0, return
366 immedidately. Otherwise, block for up to timeout seconds
367 @return Boolean, True if connected
368 """
369
370 if timeout == 0:
371 return self.switch_socket is not None
372 if self.switch_socket is not None:
373 return True
374 self.connect_cv.acquire()
375 self.connect_cv.wait(timeout)
376 self.connect_cv.release()
377
378 return self.switch_socket is not None
379
380 def kill(self):
381 """
382 Force the controller thread to quit
383
384 Just sets the active state variable to false and expects
385 the select timeout to kick in
386 """
387 self.active = False
Dan Talayco21c75c72010-02-12 22:59:24 -0800388
Dan Talayco1b3f6902010-02-15 14:14:19 -0800389 def shutdown(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800390 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800391 Shutdown the controller closing all sockets
Dan Talayco21c75c72010-02-12 22:59:24 -0800392
Dan Talayco1b3f6902010-02-15 14:14:19 -0800393 @todo Might want to synchronize shutdown with self.sync...
394 """
Dan Talayco710438c2010-02-18 15:16:07 -0800395 self.active = False
396 try:
397 self.switch_socket.shutdown(socket.SHUT_RDWR)
398 except:
Dan Talayco48370102010-03-03 15:17:33 -0800399 self.logger.info("Ignoring switch soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800400 self.switch_socket = None
Dan Talayco1b3f6902010-02-15 14:14:19 -0800401
Dan Talayco710438c2010-02-18 15:16:07 -0800402 try:
403 self.listen_socket.shutdown(socket.SHUT_RDWR)
404 except:
Dan Talayco48370102010-03-03 15:17:33 -0800405 self.logger.info("Ignoring listen soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800406 self.listen_socket = None
407 self.dbg_state = "down"
408
Dan Talayco34089522010-02-07 23:07:41 -0800409 def register(self, msg_type, handler):
410 """
411 Register a callback to receive a specific message type.
412
413 Only one handler may be registered for a given message type.
Dan Talaycod12b6612010-03-07 22:00:46 -0800414
415 WARNING: A lock is held during the handler call back, so
416 the handler should not make any blocking calls
417
Dan Talayco34089522010-02-07 23:07:41 -0800418 @param msg_type The type of message to receive. May be DEFAULT
Dan Talayco21c75c72010-02-12 22:59:24 -0800419 for all non-handled packets. The special type, the string "all"
420 will send all packets to the handler.
Dan Talayco34089522010-02-07 23:07:41 -0800421 @param handler The function to call when a message of the given
422 type is received.
423 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800424 # Should check type is valid
425 if not handler and msg_type in self.handlers.keys():
426 del self.handlers[msg_type]
427 return
428 self.handlers[msg_type] = handler
Dan Talayco34089522010-02-07 23:07:41 -0800429
Dan Talayco21c75c72010-02-12 22:59:24 -0800430 def poll(self, exp_msg=None, timeout=None):
Dan Talayco34089522010-02-07 23:07:41 -0800431 """
432 Wait for the next OF message received from the switch.
433
434 @param exp_msg If set, return only when this type of message
Dan Talayco48370102010-03-03 15:17:33 -0800435 is received (unless timeout occurs).
Dan Talaycoe226eb12010-02-18 23:06:30 -0800436 @param timeout If None, do not block. Otherwise, sleep in
Dan Talayco48370102010-03-03 15:17:33 -0800437 intervals of 1 second until message is received.
Dan Talayco34089522010-02-07 23:07:41 -0800438
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800439 @retval A pair (msg, pkt) where msg is a message object and pkt
440 the string representing the packet as received from the socket.
441 This allows additional parsing by the receiver if necessary.
442
Dan Talayco34089522010-02-07 23:07:41 -0800443 The data members in the message are in host endian order.
Dan Talayco48370102010-03-03 15:17:33 -0800444 If an error occurs, (None, None) is returned
445
446 The current queue is searched for a message of the desired type
447 before sleeping on message in events.
Dan Talayco34089522010-02-07 23:07:41 -0800448 """
Dan Talayco34089522010-02-07 23:07:41 -0800449
Dan Talaycoe226eb12010-02-18 23:06:30 -0800450 msg = pkt = None
Dan Talayco34089522010-02-07 23:07:41 -0800451
Dan Talayco48370102010-03-03 15:17:33 -0800452 self.logger.debug("Poll for " + ofp_type_map[exp_msg])
Dan Talaycoe226eb12010-02-18 23:06:30 -0800453 # First check the current queue
454 self.sync.acquire()
455 if len(self.packets) > 0:
456 if not exp_msg:
457 (msg, pkt) = self.packets.pop(0)
458 self.sync.release()
459 return (msg, pkt)
460 else:
461 for i in range(len(self.packets)):
462 msg = self.packets[i][0]
463 if msg.header.type == exp_msg:
464 (msg, pkt) = self.packets.pop(i)
465 self.sync.release()
466 return (msg, pkt)
467
468 # Okay, not currently in the queue
469 if timeout is None or timeout <= 0:
Dan Talayco21c75c72010-02-12 22:59:24 -0800470 self.sync.release()
Dan Talaycoe226eb12010-02-18 23:06:30 -0800471 return (None, None)
Dan Talayco21c75c72010-02-12 22:59:24 -0800472
Dan Talayco48370102010-03-03 15:17:33 -0800473 msg = pkt = None
474 self.logger.debug("Entering timeout")
Dan Talaycoe226eb12010-02-18 23:06:30 -0800475 # Careful of race condition releasing sync before message cv
Dan Talayco90576bd2010-02-19 10:59:02 -0800476 # Also, this style is ripe for a lockup.
Dan Talaycoe226eb12010-02-18 23:06:30 -0800477 self.expect_msg_cv.acquire()
478 self.sync.release()
Dan Talayco48370102010-03-03 15:17:33 -0800479 self.expect_msg_response = None
Dan Talaycoe226eb12010-02-18 23:06:30 -0800480 self.expect_msg = True
481 self.expect_msg_type = exp_msg
482 self.expect_msg_cv.wait(timeout)
483 if self.expect_msg_response is not None:
484 (msg, pkt) = self.expect_msg_response
Dan Talaycoe226eb12010-02-18 23:06:30 -0800485 self.expect_msg_cv.release()
486
487 if msg is None:
Dan Talayco48370102010-03-03 15:17:33 -0800488 self.logger.debug("Poll time out")
489 else:
490 self.logger.debug("Got msg " + str(msg))
Dan Talaycoe226eb12010-02-18 23:06:30 -0800491
492 return (msg, pkt)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800493
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800494 def transact(self, msg, timeout=None, zero_xid=False):
Dan Talayco34089522010-02-07 23:07:41 -0800495 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800496 Run a message transaction with the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800497
498 Send the message in msg and wait for a reply with a matching
Dan Talayco21c75c72010-02-12 22:59:24 -0800499 transaction id. Transactions have the highest priority in
500 received message handling.
Dan Talaycoe37999f2010-02-09 15:27:12 -0800501
Dan Talayco21c75c72010-02-12 22:59:24 -0800502 @param msg The message object to send; must not be a string
503 @param timeout The timeout in seconds (?)
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800504 @param zero_xid Normally, if the XID is 0 an XID will be generated
505 for the message. Set xero_xid to override this behavior
Dan Talayco21c75c72010-02-12 22:59:24 -0800506 @return The matching message object or None if unsuccessful
Dan Talaycoe37999f2010-02-09 15:27:12 -0800507
Dan Talayco34089522010-02-07 23:07:41 -0800508 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800509
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800510 if not zero_xid and msg.header.xid == 0:
511 msg.header.xid = gen_xid()
512
Dan Talayco21c75c72010-02-12 22:59:24 -0800513 self.xid_cv.acquire()
514 if self.xid:
515 self.xid_cv.release()
Dan Talayco48370102010-03-03 15:17:33 -0800516 self.logger.error("Can only run one transaction at a time")
Dan Talayco21c75c72010-02-12 22:59:24 -0800517 return None
518
519 self.xid = msg.header.xid
520 self.xid_response = None
521 self.message_send(msg.pack())
522 self.xid_cv.wait(timeout)
Dan Talaycod12b6612010-03-07 22:00:46 -0800523 if self.xid_response:
Dan Talayco09c2c592010-05-13 14:21:52 -0700524 (resp, pkt) = self.xid_response
Dan Talaycod12b6612010-03-07 22:00:46 -0800525 self.xid_response = None
526 else:
Dan Talayco09c2c592010-05-13 14:21:52 -0700527 (resp, pkt) = (None, None)
Dan Talayco21c75c72010-02-12 22:59:24 -0800528 self.xid_cv.release()
Dan Talayco09c2c592010-05-13 14:21:52 -0700529 if resp is None:
530 self.logger.warning("No response for xid " + str(self.xid))
531 return (resp, pkt)
Dan Talayco34089522010-02-07 23:07:41 -0800532
Dan Talayco710438c2010-02-18 15:16:07 -0800533 def message_send(self, msg, zero_xid=False):
Dan Talayco34089522010-02-07 23:07:41 -0800534 """
535 Send the message to the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800536
Dan Talayco11c26e72010-03-07 22:03:57 -0800537 @param msg A string or OpenFlow message object to be forwarded to
538 the switch.
539 @param zero_xid If msg is an OpenFlow object (not a string) and if
Dan Talayco710438c2010-02-18 15:16:07 -0800540 the XID in the header is 0, then an XID will be generated
541 for the message. Set xero_xid to override this behavior (and keep an
542 existing 0 xid)
Dan Talaycoe37999f2010-02-09 15:27:12 -0800543
Dan Talayco710438c2010-02-18 15:16:07 -0800544 @return -1 if error, 0 on success
Dan Talayco34089522010-02-07 23:07:41 -0800545
Dan Talayco21c75c72010-02-12 22:59:24 -0800546 """
547
Dan Talayco1b3f6902010-02-15 14:14:19 -0800548 if not self.switch_socket:
549 # Sending a string indicates the message is ready to go
Dan Talayco48370102010-03-03 15:17:33 -0800550 self.logger.info("message_send: no socket")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800551 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800552 #@todo If not string, try to pack
Dan Talayco21c75c72010-02-12 22:59:24 -0800553 if type(msg) != type(""):
Dan Talayco710438c2010-02-18 15:16:07 -0800554 try:
555 if msg.header.xid == 0 and not zero_xid:
556 msg.header.xid = gen_xid()
557 outpkt = msg.pack()
558 except:
Dan Talayco48370102010-03-03 15:17:33 -0800559 self.logger.error(
Dan Talayco710438c2010-02-18 15:16:07 -0800560 "message_send: not an OF message or string?")
561 return -1
562 else:
563 outpkt = msg
Dan Talayco21c75c72010-02-12 22:59:24 -0800564
Dan Talayco48370102010-03-03 15:17:33 -0800565 self.logger.debug("Sending pkt of len " + str(len(outpkt)))
Dan Talayco710438c2010-02-18 15:16:07 -0800566 if self.switch_socket.sendall(outpkt) is None:
567 return 0
568
Dan Talayco48370102010-03-03 15:17:33 -0800569 self.logger.error("Unknown error on sendall")
Dan Talayco710438c2010-02-18 15:16:07 -0800570 return -1
Dan Talayco21c75c72010-02-12 22:59:24 -0800571
572 def __str__(self):
573 string = "Controller:\n"
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800574 string += " state " + self.dbg_state + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800575 string += " switch_addr " + str(self.switch_addr) + "\n"
576 string += " pending pkts " + str(len(self.packets)) + "\n"
577 string += " total pkts " + str(self.packets_total) + "\n"
578 string += " expired pkts " + str(self.packets_expired) + "\n"
579 string += " handled pkts " + str(self.packets_handled) + "\n"
Dan Talaycoe226eb12010-02-18 23:06:30 -0800580 string += " poll discards " + str(self.poll_discards) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800581 string += " parse errors " + str(self.parse_errors) + "\n"
582 string += " sock errrors " + str(self.socket_errors) + "\n"
583 string += " max pkts " + str(self.max_pkts) + "\n"
584 string += " host " + str(self.host) + "\n"
585 string += " port " + str(self.port) + "\n"
586 string += " keep_alive " + str(self.keep_alive) + "\n"
587 return string
588
589 def show(self):
590 print str(self)
591
592def sample_handler(controller, msg, pkt):
593 """
594 Sample message handler
595
596 This is the prototype for functions registered with the controller
597 class for packet reception
598
599 @param controller The controller calling the handler
600 @param msg The parsed message object
601 @param pkt The raw packet that was received on the socket. This is
602 in case the packet contains extra unparsed data.
603 @returns Boolean value indicating if the packet was handled. If
604 not handled, the packet is placed in the queue for pollers to received
605 """
606 pass