blob: 81d26dc1bea36c2bfb2a76d824309988f1fb2721 [file] [log] [blame]
Dan Talayco34089522010-02-07 23:07:41 -08001"""
2OpenFlow Test Framework
3
4Controller class
5
6Provide the interface to the control channel to the switch under test.
7
8Class inherits from thread so as to run in background allowing
9asynchronous callbacks (if needed, not required). Also supports
10polling.
11
12The controller thread maintains a queue. Incoming messages that
13are not handled by a callback function are placed in this queue for
14poll calls.
15
16Callbacks and polling support specifying the message type
17
18@todo Support transaction semantics via xid
Dan Talayco1b3f6902010-02-15 14:14:19 -080019@todo Support select and listen on an administrative socket (or
20use a timeout to support clean shutdown).
21
22Currently only one connection is accepted during the life of
23the controller. There seems
24to be no clean way to interrupt an accept call. Using select that also listens
25on an administrative socket and can shut down the socket might work.
26
Dan Talayco34089522010-02-07 23:07:41 -080027"""
28
Rich Lane720eaf22013-08-09 18:00:45 -070029import sys
Dan Talayco34089522010-02-07 23:07:41 -080030import os
31import socket
32import time
Rich Lanecd97d3d2013-01-07 18:50:06 -080033import struct
34import select
35import logging
Dan Talayco34089522010-02-07 23:07:41 -080036from threading import Thread
37from threading import Lock
Dan Talayco21c75c72010-02-12 22:59:24 -080038from threading import Condition
Dan Talayco48370102010-03-03 15:17:33 -080039
Wilson Ngc11a9182013-10-28 16:02:03 -070040import ofutils
41import loxi
42
43# Configured openflow version
44import ofp as cfg_ofp
Dan Talaycof8de5182012-04-12 22:38:41 -070045
46FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.'
47 for x in range(256)])
48
49def hex_dump_buffer(src, length=16):
50 """
51 Convert src to a hex dump string and return the string
52 @param src The source buffer
53 @param length The number of bytes shown in each line
54 @returns A string showing the hex dump
55 """
56 result = ["\n"]
57 for i in xrange(0, len(src), length):
58 chars = src[i:i+length]
59 hex = ' '.join(["%02x" % ord(x) for x in chars])
60 printable = ''.join(["%s" % ((ord(x) <= 127 and
61 FILTER[ord(x)]) or '.') for x in chars])
62 result.append("%04x %-*s %s\n" % (i, length*3, hex, printable))
63 return ''.join(result)
64
Dan Talayco48370102010-03-03 15:17:33 -080065##@todo Find a better home for these identifiers (controller)
Glen Gibb741b1182010-07-08 16:43:58 -070066RCV_SIZE_DEFAULT = 32768
Dan Talayco48370102010-03-03 15:17:33 -080067LISTEN_QUEUE_SIZE = 1
Dan Talayco34089522010-02-07 23:07:41 -080068
69class Controller(Thread):
70 """
71 Class abstracting the control interface to the switch.
72
73 For receiving messages, two mechanism will be implemented. First,
74 query the interface with poll. Second, register to have a
75 function called by message type. The callback is passed the
76 message type as well as the raw packet (or message object)
77
78 One of the main purposes of this object is to translate between network
79 and host byte order. 'Above' this object, things should be in host
80 byte order.
Dan Talayco21c75c72010-02-12 22:59:24 -080081
82 @todo Consider using SocketServer for listening socket
83 @todo Test transaction code
84
85 @var rcv_size The receive size to use for receive calls
86 @var max_pkts The max size of the receive queue
87 @var keep_alive If true, listen for echo requests and respond w/
88 echo replies
Dan Talayco710438c2010-02-18 15:16:07 -080089 @var initial_hello If true, will send a hello message immediately
90 upon connecting to the switch
Dan Talayco69ca4d62012-11-15 11:50:22 -080091 @var switch If not None, do an active connection to the switch
Dan Talayco21c75c72010-02-12 22:59:24 -080092 @var host The host to use for connect
93 @var port The port to connect on
94 @var packets_total Total number of packets received
95 @var packets_expired Number of packets popped from queue as queue full
96 @var packets_handled Number of packets handled by something
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080097 @var dbg_state Debug indication of state
Dan Talayco34089522010-02-07 23:07:41 -080098 """
99
Rich Lane4d1f3eb2013-10-03 13:45:57 -0700100 def __init__(self, switch=None, host='127.0.0.1', port=6653, max_pkts=1024):
Dan Talayco21c75c72010-02-12 22:59:24 -0800101 Thread.__init__(self)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800102 # Socket related
Dan Talayco21c75c72010-02-12 22:59:24 -0800103 self.rcv_size = RCV_SIZE_DEFAULT
Dan Talayco1b3f6902010-02-15 14:14:19 -0800104 self.listen_socket = None
105 self.switch_socket = None
106 self.switch_addr = None
Dan Talayco710438c2010-02-18 15:16:07 -0800107 self.connect_cv = Condition()
Dan Talaycoe226eb12010-02-18 23:06:30 -0800108 self.message_cv = Condition()
Rich Lanec9d3edd2013-10-09 00:21:01 -0700109 self.tx_lock = Lock()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800110
Rich Lane4dfd5e12012-12-22 19:48:01 -0800111 # Used to wake up the event loop from another thread
Rich Lanecd97d3d2013-01-07 18:50:06 -0800112 self.waker = ofutils.EventDescriptor()
Rich Lane32797542012-12-22 17:46:05 -0800113
Dan Talayco1b3f6902010-02-15 14:14:19 -0800114 # Counters
Dan Talayco21c75c72010-02-12 22:59:24 -0800115 self.socket_errors = 0
116 self.parse_errors = 0
Dan Talayco21c75c72010-02-12 22:59:24 -0800117 self.packets_total = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -0800118 self.packets_expired = 0
119 self.packets_handled = 0
Dan Talaycoe226eb12010-02-18 23:06:30 -0800120 self.poll_discards = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -0800121
122 # State
Dan Talayco21c75c72010-02-12 22:59:24 -0800123 self.sync = Lock()
124 self.handlers = {}
125 self.keep_alive = False
Dan Talayco710438c2010-02-18 15:16:07 -0800126 self.active = True
127 self.initial_hello = True
Dan Talayco1b3f6902010-02-15 14:14:19 -0800128
Rich Lanec4f071b2012-07-11 17:25:57 -0700129 # OpenFlow message/packet queue
130 # Protected by the packets_cv lock / condition variable
131 self.packets = []
132 self.packets_cv = Condition()
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700133 self.packet_in_count = 0
Rich Lanec4f071b2012-07-11 17:25:57 -0700134
Dan Talayco1b3f6902010-02-15 14:14:19 -0800135 # Settings
136 self.max_pkts = max_pkts
Dan Talayco69ca4d62012-11-15 11:50:22 -0800137 self.switch = switch
138 self.passive = not self.switch
Dan Talayco48370102010-03-03 15:17:33 -0800139 self.host = host
140 self.port = port
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800141 self.dbg_state = "init"
Dan Talayco48370102010-03-03 15:17:33 -0800142 self.logger = logging.getLogger("controller")
Dan Talaycof8de5182012-04-12 22:38:41 -0700143 self.filter_packet_in = False # Drop "excessive" packet ins
144 self.pkt_in_run = 0 # Count on run of packet ins
145 self.pkt_in_filter_limit = 50 # Count on run of packet ins
146 self.pkt_in_dropped = 0 # Total dropped packet ins
147 self.transact_to = 15 # Transact timeout default value; add to config
Dan Talayco1b3f6902010-02-15 14:14:19 -0800148
Dan Talaycoe226eb12010-02-18 23:06:30 -0800149 # Transaction and message type waiting variables
150 # xid_cv: Condition variable (semaphore) for packet waiters
Dan Talayco21c75c72010-02-12 22:59:24 -0800151 # xid: Transaction ID being waited on
152 # xid_response: Transaction response message
153 self.xid_cv = Condition()
154 self.xid = None
155 self.xid_response = None
156
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800157 self.buffered_input = ""
Dan Talaycoe226eb12010-02-18 23:06:30 -0800158
Rich Lane207502e2012-12-31 14:29:12 -0800159 # Create listen socket
160 if self.passive:
161 self.logger.info("Create/listen at " + self.host + ":" +
162 str(self.port))
Ken Chiang8f3f7052015-01-27 11:25:34 -0800163 try:
164 ai = socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC,
165 socket.SOCK_STREAM, 0,
166 socket.AI_PASSIVE)
167 except:
168 raise Exception("Could not get address info for " +
169 self.host + ":" + str(self.port))
170
171 try:
172 # Use first returned addrinfo
173 (family, socktype, proto, name, sockaddr) = ai[0]
174 self.listen_socket = socket.socket(family, socktype)
175 self.listen_socket.setsockopt(socket.SOL_SOCKET,
176 socket.SO_REUSEADDR, 1)
177 self.listen_socket.bind(sockaddr)
178 self.listen_socket.listen(LISTEN_QUEUE_SIZE)
179 except Exception as e:
180 raise Exception("Could not set up socket for " +
181 self.host + ":" + str(self.port) + ': ' +
182 str(e))
Rich Lane207502e2012-12-31 14:29:12 -0800183
Dan Talaycof8de5182012-04-12 22:38:41 -0700184 def filter_packet(self, rawmsg, hdr):
185 """
186 Check if packet should be filtered
187
188 Currently filters packet in messages
189 @return Boolean, True if packet should be dropped
190 """
Rich Lane1622bbb2013-03-11 17:11:53 -0700191 # XXX didn't actually check for packet-in...
192 return False
Dan Talaycof8de5182012-04-12 22:38:41 -0700193 # Add check for packet in and rate limit
194 if self.filter_packet_in:
Rich Lanec4f071b2012-07-11 17:25:57 -0700195 # If we were dropping packets, report number dropped
196 # TODO dont drop expected packet ins
197 if self.pkt_in_run > self.pkt_in_filter_limit:
198 self.logger.debug("Dropped %d packet ins (%d total)"
199 % ((self.pkt_in_run -
200 self.pkt_in_filter_limit),
201 self.pkt_in_dropped))
202 self.pkt_in_run = 0
Dan Talaycof8de5182012-04-12 22:38:41 -0700203
204 return False
205
Dan Talaycod12b6612010-03-07 22:00:46 -0800206 def _pkt_handle(self, pkt):
207 """
208 Check for all packet handling conditions
209
210 Parse and verify message
211 Check if XID matches something waiting
212 Check if message is being expected for a poll operation
213 Check if keep alive is on and message is an echo request
214 Check if any registered handler wants the packet
215 Enqueue if none of those conditions is met
216
217 an echo request in case keep_alive is true, followed by
218 registered message handlers.
Glen Gibb6d467062010-07-08 16:15:08 -0700219 @param pkt The raw packet (string) which may contain multiple OF msgs
Dan Talaycod12b6612010-03-07 22:00:46 -0800220 """
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800221
222 # snag any left over data from last read()
223 pkt = self.buffered_input + pkt
224 self.buffered_input = ""
225
Glen Gibb6d467062010-07-08 16:15:08 -0700226 # Process each of the OF msgs inside the pkt
227 offset = 0
228 while offset < len(pkt):
Rich Lane1622bbb2013-03-11 17:11:53 -0700229 if offset + 8 > len(pkt):
230 break
231
Glen Gibb6d467062010-07-08 16:15:08 -0700232 # Parse the header to get type
Wilson Ngc11a9182013-10-28 16:02:03 -0700233 hdr_version, hdr_type, hdr_length, hdr_xid = cfg_ofp.message.parse_header(pkt[offset:])
234
235 # Use loxi to resolve to ofp of matching version
236 ofp = loxi.protocol(hdr_version)
Dan Talaycod12b6612010-03-07 22:00:46 -0800237
Glen Gibb6d467062010-07-08 16:15:08 -0700238 # Extract the raw message bytes
Rich Lane1622bbb2013-03-11 17:11:53 -0700239 if (offset + hdr_length) > len(pkt):
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800240 break
Rich Lane1622bbb2013-03-11 17:11:53 -0700241 rawmsg = pkt[offset : offset + hdr_length]
242 offset += hdr_length
Dan Talaycof8de5182012-04-12 22:38:41 -0700243
Rich Lane1622bbb2013-03-11 17:11:53 -0700244 #if self.filter_packet(rawmsg, hdr):
245 # continue
Dan Talaycof8de5182012-04-12 22:38:41 -0700246
Rich Lanef6883512013-03-11 17:00:09 -0700247 msg = ofp.message.parse_message(rawmsg)
Glen Gibb6d467062010-07-08 16:15:08 -0700248 if not msg:
249 self.parse_errors += 1
250 self.logger.warn("Could not parse message")
251 continue
252
Rich Lane1fd43e32014-01-06 15:22:50 -0800253 self.logger.debug("Msg in: version %d class %s len %d xid %d",
254 hdr_version, type(msg).__name__, hdr_length, hdr_xid)
255
Rich Lanec4f071b2012-07-11 17:25:57 -0700256 with self.sync:
257 # Check if transaction is waiting
258 with self.xid_cv:
Rich Lane1622bbb2013-03-11 17:11:53 -0700259 if self.xid and hdr_xid == self.xid:
260 self.logger.debug("Matched expected XID " + str(hdr_xid))
Rich Lanec4f071b2012-07-11 17:25:57 -0700261 self.xid_response = (msg, rawmsg)
262 self.xid = None
263 self.xid_cv.notify()
264 continue
Glen Gibb6d467062010-07-08 16:15:08 -0700265
Rich Lanec4f071b2012-07-11 17:25:57 -0700266 # Check if keep alive is set; if so, respond to echo requests
267 if self.keep_alive:
Rich Lane1622bbb2013-03-11 17:11:53 -0700268 if hdr_type == ofp.OFPT_ECHO_REQUEST:
Rich Lanec4f071b2012-07-11 17:25:57 -0700269 self.logger.debug("Responding to echo request")
Rich Lane78ef8b92013-01-10 12:19:23 -0800270 rep = ofp.message.echo_reply()
Rich Lane1622bbb2013-03-11 17:11:53 -0700271 rep.xid = hdr_xid
Rich Lanec4f071b2012-07-11 17:25:57 -0700272 # Ignoring additional data
Rich Lane1fd43e32014-01-06 15:22:50 -0800273 self.message_send(rep)
Rich Lanec4f071b2012-07-11 17:25:57 -0700274 continue
Glen Gibb6d467062010-07-08 16:15:08 -0700275
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700276 # Generalize to counters for all packet types?
277 if msg.type == ofp.OFPT_PACKET_IN:
278 self.packet_in_count += 1
279
Rich Lane5d63b9c2013-01-11 14:12:37 -0800280 # Log error messages
Rich Lanefa2a4de2014-01-29 16:03:04 -0800281 if isinstance(msg, ofp.message.error_msg):
282 #pylint: disable=E1103
Rich Laneb73808c2013-03-11 15:22:23 -0700283 if msg.err_type in ofp.ofp_error_type_map:
284 type_str = ofp.ofp_error_type_map[msg.err_type]
285 if msg.err_type == ofp.OFPET_HELLO_FAILED:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800286 code_map = ofp.ofp_hello_failed_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700287 elif msg.err_type == ofp.OFPET_BAD_REQUEST:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800288 code_map = ofp.ofp_bad_request_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700289 elif msg.err_type == ofp.OFPET_BAD_ACTION:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800290 code_map = ofp.ofp_bad_action_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700291 elif msg.err_type == ofp.OFPET_FLOW_MOD_FAILED:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800292 code_map = ofp.ofp_flow_mod_failed_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700293 elif msg.err_type == ofp.OFPET_PORT_MOD_FAILED:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800294 code_map = ofp.ofp_port_mod_failed_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700295 elif msg.err_type == ofp.OFPET_QUEUE_OP_FAILED:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800296 code_map = ofp.ofp_queue_op_failed_code_map
297 else:
298 code_map = None
299
300 if code_map and msg.code in code_map:
301 code_str = code_map[msg.code]
302 else:
303 code_str = "unknown"
304 else:
305 type_str = "unknown"
Rich Lane1879dc72013-03-11 22:08:51 -0700306 code_str = "unknown"
Rich Lane5d63b9c2013-01-11 14:12:37 -0800307 self.logger.warn("Received error message: xid=%d type=%s (%d) code=%s (%d)",
Rich Lane1622bbb2013-03-11 17:11:53 -0700308 hdr_xid, type_str, msg.err_type, code_str, msg.code)
Rich Lane5d63b9c2013-01-11 14:12:37 -0800309
Rich Lanec4f071b2012-07-11 17:25:57 -0700310 # Now check for message handlers; preference is given to
311 # handlers for a specific packet
312 handled = False
Rich Lane1622bbb2013-03-11 17:11:53 -0700313 if hdr_type in self.handlers.keys():
314 handled = self.handlers[hdr_type](self, msg, rawmsg)
Rich Lanec4f071b2012-07-11 17:25:57 -0700315 if not handled and ("all" in self.handlers.keys()):
316 handled = self.handlers["all"](self, msg, rawmsg)
Glen Gibb6d467062010-07-08 16:15:08 -0700317
Rich Lanec4f071b2012-07-11 17:25:57 -0700318 if not handled: # Not handled, enqueue
Rich Lanec4f071b2012-07-11 17:25:57 -0700319 with self.packets_cv:
320 if len(self.packets) >= self.max_pkts:
321 self.packets.pop(0)
322 self.packets_expired += 1
323 self.packets.append((msg, rawmsg))
324 self.packets_cv.notify_all()
325 self.packets_total += 1
326 else:
327 self.packets_handled += 1
328 self.logger.debug("Message handled by callback")
Glen Gibb6d467062010-07-08 16:15:08 -0700329
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800330 # end of 'while offset < len(pkt)'
331 # note that if offset = len(pkt), this is
332 # appends a harmless empty string
333 self.buffered_input += pkt[offset:]
Dan Talaycod12b6612010-03-07 22:00:46 -0800334
Dan Talayco710438c2010-02-18 15:16:07 -0800335 def _socket_ready_handle(self, s):
336 """
337 Handle an input-ready socket
Dan Talaycof8de5182012-04-12 22:38:41 -0700338
Dan Talayco710438c2010-02-18 15:16:07 -0800339 @param s The socket object that is ready
Dan Talaycof8de5182012-04-12 22:38:41 -0700340 @returns 0 on success, -1 on error
Dan Talayco710438c2010-02-18 15:16:07 -0800341 """
342
Dan Talayco69ca4d62012-11-15 11:50:22 -0800343 if self.passive and s and s == self.listen_socket:
Dan Talayco710438c2010-02-18 15:16:07 -0800344 if self.switch_socket:
Rich Lanee1da7ea2012-07-26 15:58:45 -0700345 self.logger.warning("Ignoring incoming connection; already connected to switch")
Rich Laneb4f8ecb2012-09-25 09:36:26 -0700346 (sock, addr) = self.listen_socket.accept()
347 sock.close()
Rich Lanee1da7ea2012-07-26 15:58:45 -0700348 return 0
Dan Talayco710438c2010-02-18 15:16:07 -0800349
Ken Chiange875baf2012-10-09 15:24:40 -0700350 try:
351 (sock, addr) = self.listen_socket.accept()
352 except:
353 self.logger.warning("Error on listen socket accept")
354 return -1
Ken Chiang77173992012-10-30 15:44:39 -0700355 self.logger.info(self.host+":"+str(self.port)+": Incoming connection from "+str(addr))
Rich Lanee1da7ea2012-07-26 15:58:45 -0700356
Rich Laneee3586c2012-07-11 17:26:02 -0700357 with self.connect_cv:
Rich Lanee1da7ea2012-07-26 15:58:45 -0700358 (self.switch_socket, self.switch_addr) = (sock, addr)
Rich Lane82ef1832012-12-22 17:04:35 -0800359 self.switch_socket.setsockopt(socket.IPPROTO_TCP,
360 socket.TCP_NODELAY, True)
Rich Lane1a8d5aa2012-10-08 15:40:03 -0700361 if self.initial_hello:
Wilson Ngc11a9182013-10-28 16:02:03 -0700362 self.message_send(cfg_ofp.message.hello())
Rich Lanee1da7ea2012-07-26 15:58:45 -0700363 self.connect_cv.notify() # Notify anyone waiting
Rich Laned929b8d2013-04-15 15:59:14 -0700364
365 # Prevent further connections
366 self.listen_socket.close()
367 self.listen_socket = None
Dan Talaycof8de5182012-04-12 22:38:41 -0700368 elif s and s == self.switch_socket:
369 for idx in range(3): # debug: try a couple of times
370 try:
371 pkt = self.switch_socket.recv(self.rcv_size)
372 except:
373 self.logger.warning("Error on switch read")
374 return -1
375
376 if not self.active:
377 return 0
378
379 if len(pkt) == 0:
380 self.logger.warning("Zero-length switch read, %d" % idx)
381 else:
382 break
Dan Talayco710438c2010-02-18 15:16:07 -0800383
Dan Talaycof8de5182012-04-12 22:38:41 -0700384 if len(pkt) == 0: # Still no packet
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700385 self.logger.warning("Zero-length switch read; closing cxn")
Dan Talaycof8de5182012-04-12 22:38:41 -0700386 self.logger.info(str(self))
387 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800388
Dan Talaycod12b6612010-03-07 22:00:46 -0800389 self._pkt_handle(pkt)
Rich Lane4dfd5e12012-12-22 19:48:01 -0800390 elif s and s == self.waker:
391 self.waker.wait()
Dan Talayco710438c2010-02-18 15:16:07 -0800392 else:
Dan Talayco48370102010-03-03 15:17:33 -0800393 self.logger.error("Unknown socket ready: " + str(s))
Dan Talaycof8de5182012-04-12 22:38:41 -0700394 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800395
Dan Talaycof8de5182012-04-12 22:38:41 -0700396 return 0
Dan Talayco710438c2010-02-18 15:16:07 -0800397
Dan Talayco69ca4d62012-11-15 11:50:22 -0800398 def active_connect(self):
399 """
400 Actively connect to a switch IP addr
401 """
402 try:
403 self.logger.info("Trying active connection to %s" % self.switch)
404 soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
405 soc.connect((self.switch, self.port))
406 self.logger.info("Connected to " + self.switch + " on " +
407 str(self.port))
Rich Lane82ef1832012-12-22 17:04:35 -0800408 soc.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800409 self.switch_addr = (self.switch, self.port)
410 return soc
411 except (StandardError, socket.error), e:
412 self.logger.error("Could not connect to %s at %d:: %s" %
413 (self.switch, self.port, str(e)))
414 return None
415
Rich Lane32797542012-12-22 17:46:05 -0800416 def wakeup(self):
417 """
418 Wake up the event loop, presumably from another thread.
419 """
Rich Lane4dfd5e12012-12-22 19:48:01 -0800420 self.waker.notify()
Rich Lane32797542012-12-22 17:46:05 -0800421
422 def sockets(self):
423 """
424 Return list of sockets to select on.
425 """
Rich Lane4dfd5e12012-12-22 19:48:01 -0800426 socs = [self.listen_socket, self.switch_socket, self.waker]
Rich Lane32797542012-12-22 17:46:05 -0800427 return [x for x in socs if x]
428
Dan Talayco1b3f6902010-02-15 14:14:19 -0800429 def run(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800430 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800431 Activity function for class
Dan Talayco21c75c72010-02-12 22:59:24 -0800432
Dan Talayco1b3f6902010-02-15 14:14:19 -0800433 Assumes connection to switch already exists. Listens on
434 switch_socket for messages until an error (or zero len pkt)
435 occurs.
Dan Talayco21c75c72010-02-12 22:59:24 -0800436
Dan Talayco1b3f6902010-02-15 14:14:19 -0800437 When there is a message on the socket, check for handlers; queue the
438 packet if no one handles the packet.
439
440 See note for controller describing the limitation of a single
441 connection for now.
442 """
443
Rich Lane207502e2012-12-31 14:29:12 -0800444 self.dbg_state = "running"
Ken Chiangadc950f2012-10-05 13:50:03 -0700445
Dan Talayco710438c2010-02-18 15:16:07 -0800446 while self.active:
Dan Talayco710438c2010-02-18 15:16:07 -0800447 try:
448 sel_in, sel_out, sel_err = \
Rich Lane32797542012-12-22 17:46:05 -0800449 select.select(self.sockets(), [], self.sockets(), 1)
Dan Talayco710438c2010-02-18 15:16:07 -0800450 except:
451 print sys.exc_info()
Ken Chiangadc950f2012-10-05 13:50:03 -0700452 self.logger.error("Select error, disconnecting")
453 self.disconnect()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800454
Dan Talayco710438c2010-02-18 15:16:07 -0800455 for s in sel_err:
Ken Chiangadc950f2012-10-05 13:50:03 -0700456 self.logger.error("Got socket error on: " + str(s) + ", disconnecting")
457 self.disconnect()
Dan Talaycof8de5182012-04-12 22:38:41 -0700458
459 for s in sel_in:
460 if self._socket_ready_handle(s) == -1:
Ken Chiangadc950f2012-10-05 13:50:03 -0700461 self.disconnect()
Dan Talayco710438c2010-02-18 15:16:07 -0800462
Dan Talayco710438c2010-02-18 15:16:07 -0800463 # End of main loop
464 self.dbg_state = "closing"
Dan Talayco48370102010-03-03 15:17:33 -0800465 self.logger.info("Exiting controller thread")
Dan Talayco710438c2010-02-18 15:16:07 -0800466 self.shutdown()
467
Rich Lane8806bc42012-07-26 19:18:37 -0700468 def connect(self, timeout=-1):
Dan Talayco710438c2010-02-18 15:16:07 -0800469 """
470 Connect to the switch
471
Rich Lane8806bc42012-07-26 19:18:37 -0700472 @param timeout Block for up to timeout seconds. Pass -1 for the default.
Dan Talayco710438c2010-02-18 15:16:07 -0800473 @return Boolean, True if connected
474 """
475
Dan Talayco69ca4d62012-11-15 11:50:22 -0800476 if not self.passive: # Do active connection now
477 self.logger.info("Attempting to connect to %s on port %s" %
478 (self.switch, str(self.port)))
479 soc = self.active_connect()
480 if soc:
481 self.logger.info("Connected to %s", self.switch)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800482 self.dbg_state = "running"
483 self.switch_socket = soc
Rich Lane32797542012-12-22 17:46:05 -0800484 self.wakeup()
Dan Talayco69ca4d62012-11-15 11:50:22 -0800485 with self.connect_cv:
486 if self.initial_hello:
Wilson Ngc11a9182013-10-28 16:02:03 -0700487 self.message_send(cfg_ofp.message.hello())
Dan Talayco69ca4d62012-11-15 11:50:22 -0800488 self.connect_cv.notify() # Notify anyone waiting
489 else:
490 self.logger.error("Could not actively connect to switch %s",
491 self.switch)
492 self.active = False
493 else:
494 with self.connect_cv:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800495 ofutils.timed_wait(self.connect_cv, lambda: self.switch_socket,
496 timeout=timeout)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800497
Dan Talayco710438c2010-02-18 15:16:07 -0800498 return self.switch_socket is not None
499
Ken Chiangadc950f2012-10-05 13:50:03 -0700500 def disconnect(self, timeout=-1):
501 """
502 If connected to a switch, disconnect.
503 """
504 if self.switch_socket:
Ken Chiangadc950f2012-10-05 13:50:03 -0700505 self.switch_socket.close()
506 self.switch_socket = None
507 self.switch_addr = None
Ken Chiang74be4722012-12-21 13:07:03 -0800508 with self.packets_cv:
509 self.packets = []
Ken Chiange875baf2012-10-09 15:24:40 -0700510 with self.connect_cv:
511 self.connect_cv.notifyAll()
Ken Chiangadc950f2012-10-05 13:50:03 -0700512
513 def wait_disconnected(self, timeout=-1):
514 """
515 @param timeout Block for up to timeout seconds. Pass -1 for the default.
516 @return Boolean, True if disconnected
517 """
518
Ken Chiange875baf2012-10-09 15:24:40 -0700519 with self.connect_cv:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800520 ofutils.timed_wait(self.connect_cv,
521 lambda: True if not self.switch_socket else None,
522 timeout=timeout)
Ken Chiangadc950f2012-10-05 13:50:03 -0700523 return self.switch_socket is None
524
Dan Talayco710438c2010-02-18 15:16:07 -0800525 def kill(self):
526 """
527 Force the controller thread to quit
Dan Talayco710438c2010-02-18 15:16:07 -0800528 """
529 self.active = False
Rich Lane32797542012-12-22 17:46:05 -0800530 self.wakeup()
Rich Lane376bb402012-12-31 15:20:16 -0800531 self.join()
Dan Talayco21c75c72010-02-12 22:59:24 -0800532
Dan Talayco1b3f6902010-02-15 14:14:19 -0800533 def shutdown(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800534 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800535 Shutdown the controller closing all sockets
Dan Talayco21c75c72010-02-12 22:59:24 -0800536
Dan Talayco1b3f6902010-02-15 14:14:19 -0800537 @todo Might want to synchronize shutdown with self.sync...
538 """
Dan Talaycof8de5182012-04-12 22:38:41 -0700539
Dan Talayco710438c2010-02-18 15:16:07 -0800540 self.active = False
541 try:
542 self.switch_socket.shutdown(socket.SHUT_RDWR)
543 except:
Dan Talayco48370102010-03-03 15:17:33 -0800544 self.logger.info("Ignoring switch soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800545 self.switch_socket = None
Dan Talayco1b3f6902010-02-15 14:14:19 -0800546
Dan Talayco710438c2010-02-18 15:16:07 -0800547 try:
548 self.listen_socket.shutdown(socket.SHUT_RDWR)
549 except:
Dan Talayco48370102010-03-03 15:17:33 -0800550 self.logger.info("Ignoring listen soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800551 self.listen_socket = None
Dan Talaycof8de5182012-04-12 22:38:41 -0700552
Rich Laneee3586c2012-07-11 17:26:02 -0700553 # Wakeup condition variables on which controller may be wait
554 with self.xid_cv:
555 self.xid_cv.notifyAll()
Dan Talaycof8de5182012-04-12 22:38:41 -0700556
Rich Laneee3586c2012-07-11 17:26:02 -0700557 with self.connect_cv:
558 self.connect_cv.notifyAll()
Dan Talaycof8de5182012-04-12 22:38:41 -0700559
Rich Lane32797542012-12-22 17:46:05 -0800560 self.wakeup()
Dan Talayco710438c2010-02-18 15:16:07 -0800561 self.dbg_state = "down"
562
Dan Talayco34089522010-02-07 23:07:41 -0800563 def register(self, msg_type, handler):
564 """
565 Register a callback to receive a specific message type.
566
567 Only one handler may be registered for a given message type.
Dan Talaycod12b6612010-03-07 22:00:46 -0800568
569 WARNING: A lock is held during the handler call back, so
570 the handler should not make any blocking calls
571
Dan Talayco34089522010-02-07 23:07:41 -0800572 @param msg_type The type of message to receive. May be DEFAULT
Dan Talayco21c75c72010-02-12 22:59:24 -0800573 for all non-handled packets. The special type, the string "all"
574 will send all packets to the handler.
Dan Talayco34089522010-02-07 23:07:41 -0800575 @param handler The function to call when a message of the given
576 type is received.
577 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800578 # Should check type is valid
579 if not handler and msg_type in self.handlers.keys():
580 del self.handlers[msg_type]
581 return
582 self.handlers[msg_type] = handler
Dan Talayco34089522010-02-07 23:07:41 -0800583
sumithdev095542cf52013-07-12 14:56:28 -0400584 def poll(self, exp_msg=None, timeout=-1):
Dan Talayco34089522010-02-07 23:07:41 -0800585 """
586 Wait for the next OF message received from the switch.
587
588 @param exp_msg If set, return only when this type of message
Dan Talayco48370102010-03-03 15:17:33 -0800589 is received (unless timeout occurs).
Rich Laneb64ce3d2012-07-26 15:37:57 -0700590
591 @param timeout Maximum number of seconds to wait for the message.
592 Pass -1 for the default timeout.
Dan Talayco34089522010-02-07 23:07:41 -0800593
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800594 @retval A pair (msg, pkt) where msg is a message object and pkt
595 the string representing the packet as received from the socket.
596 This allows additional parsing by the receiver if necessary.
597
Dan Talayco34089522010-02-07 23:07:41 -0800598 The data members in the message are in host endian order.
Dan Talayco48370102010-03-03 15:17:33 -0800599 If an error occurs, (None, None) is returned
Dan Talayco34089522010-02-07 23:07:41 -0800600 """
Dan Talayco34089522010-02-07 23:07:41 -0800601
Rich Lanee9d36912014-01-31 12:46:05 -0800602 if exp_msg is None:
603 self.logger.warn("DEPRECATED polling for any message class")
604 klass = None
605 elif isinstance(exp_msg, int):
606 klass = cfg_ofp.message.message.subtypes[exp_msg]
Rich Lane1e7617b2014-11-24 14:10:06 -0800607 elif issubclass(exp_msg, loxi.OFObject):
Rich Lanee9d36912014-01-31 12:46:05 -0800608 klass = exp_msg
Ed Swierk9e55e282012-08-22 06:57:28 -0700609 else:
Rich Lane1e31d742014-02-03 15:46:24 -0800610 raise ValueError("Unexpected exp_msg argument %r" % exp_msg)
Rich Lanee9d36912014-01-31 12:46:05 -0800611
612 self.logger.debug("Polling for %s", klass.__name__)
Rich Laneb64ce3d2012-07-26 15:37:57 -0700613
614 # Take the packet from the queue
Rich Lanec4f071b2012-07-11 17:25:57 -0700615 def grab():
Rich Lanee5b67312014-02-03 14:56:04 -0800616 for i, (msg, pkt) in enumerate(self.packets):
Rich Lanee9d36912014-01-31 12:46:05 -0800617 if klass is None or isinstance(msg, klass):
618 self.logger.debug("Got %s message", msg.__class__.__name__)
Rich Lanee5b67312014-02-03 14:56:04 -0800619 return self.packets.pop(i)
Rich Lanec4f071b2012-07-11 17:25:57 -0700620 # Not found
Rich Lanee9d36912014-01-31 12:46:05 -0800621 self.logger.debug("%s message not in queue", klass.__name__)
Rich Laneb64ce3d2012-07-26 15:37:57 -0700622 return None
Dan Talayco21c75c72010-02-12 22:59:24 -0800623
Rich Lanec4f071b2012-07-11 17:25:57 -0700624 with self.packets_cv:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800625 ret = ofutils.timed_wait(self.packets_cv, grab, timeout=timeout)
Rich Lanec4f071b2012-07-11 17:25:57 -0700626
Rich Laneb64ce3d2012-07-26 15:37:57 -0700627 if ret != None:
628 (msg, pkt) = ret
Rich Laneb64ce3d2012-07-26 15:37:57 -0700629 return (msg, pkt)
630 else:
631 return (None, None)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800632
Rich Lane8fbfd662013-03-11 15:30:44 -0700633 def transact(self, msg, timeout=-1):
Dan Talayco34089522010-02-07 23:07:41 -0800634 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800635 Run a message transaction with the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800636
637 Send the message in msg and wait for a reply with a matching
Dan Talayco21c75c72010-02-12 22:59:24 -0800638 transaction id. Transactions have the highest priority in
639 received message handling.
Dan Talaycoe37999f2010-02-09 15:27:12 -0800640
Dan Talayco21c75c72010-02-12 22:59:24 -0800641 @param msg The message object to send; must not be a string
Rich Lanee1da7ea2012-07-26 15:58:45 -0700642 @param timeout The timeout in seconds; if -1 use default.
Dan Talayco34089522010-02-07 23:07:41 -0800643 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800644
Rich Lane8fbfd662013-03-11 15:30:44 -0700645 if msg.xid == None:
Rich Laneb73808c2013-03-11 15:22:23 -0700646 msg.xid = ofutils.gen_xid()
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800647
Rich Laneb73808c2013-03-11 15:22:23 -0700648 self.logger.debug("Running transaction %d" % msg.xid)
Dan Talayco21c75c72010-02-12 22:59:24 -0800649
Rich Lane9aca1992012-07-11 17:26:31 -0700650 with self.xid_cv:
651 if self.xid:
652 self.logger.error("Can only run one transaction at a time")
653 return (None, None)
Dan Talaycof8de5182012-04-12 22:38:41 -0700654
Rich Laneb73808c2013-03-11 15:22:23 -0700655 self.xid = msg.xid
Dan Talaycod12b6612010-03-07 22:00:46 -0800656 self.xid_response = None
Rich Lane1fd43e32014-01-06 15:22:50 -0800657 self.message_send(msg)
Rich Lane9aca1992012-07-11 17:26:31 -0700658
Rich Laneb73808c2013-03-11 15:22:23 -0700659 self.logger.debug("Waiting for transaction %d" % msg.xid)
Rich Lanecd97d3d2013-01-07 18:50:06 -0800660 ofutils.timed_wait(self.xid_cv, lambda: self.xid_response, timeout=timeout)
Rich Lane9aca1992012-07-11 17:26:31 -0700661
662 if self.xid_response:
663 (resp, pkt) = self.xid_response
664 self.xid_response = None
665 else:
666 (resp, pkt) = (None, None)
667
Dan Talayco09c2c592010-05-13 14:21:52 -0700668 if resp is None:
669 self.logger.warning("No response for xid " + str(self.xid))
670 return (resp, pkt)
Dan Talayco34089522010-02-07 23:07:41 -0800671
Rich Lane8fbfd662013-03-11 15:30:44 -0700672 def message_send(self, msg):
Dan Talayco34089522010-02-07 23:07:41 -0800673 """
674 Send the message to the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800675
Dan Talayco11c26e72010-03-07 22:03:57 -0800676 @param msg A string or OpenFlow message object to be forwarded to
677 the switch.
Dan Talayco21c75c72010-02-12 22:59:24 -0800678 """
679
Dan Talayco1b3f6902010-02-15 14:14:19 -0800680 if not self.switch_socket:
681 # Sending a string indicates the message is ready to go
Ed Swierk9e55e282012-08-22 06:57:28 -0700682 raise Exception("no socket")
Dan Talayco21c75c72010-02-12 22:59:24 -0800683
Rich Lane1fd43e32014-01-06 15:22:50 -0800684 if msg.xid == None:
685 msg.xid = ofutils.gen_xid()
686
687 outpkt = msg.pack()
688
689 self.logger.debug("Msg out: version %d class %s len %d xid %d",
690 msg.version, type(msg).__name__, len(outpkt), msg.xid)
Rich Lanec9d3edd2013-10-09 00:21:01 -0700691
692 with self.tx_lock:
693 if self.switch_socket.sendall(outpkt) is not None:
694 raise AssertionError("failed to send message to switch")
Dan Talayco710438c2010-02-18 15:16:07 -0800695
Rich Lane5c3151c2013-01-03 17:15:41 -0800696 return 0 # for backwards compatibility
Dan Talayco21c75c72010-02-12 22:59:24 -0800697
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700698 def clear_queue(self):
699 """
700 Clear the input queue and report the number of messages
701 that were in it
702 """
Dan Talayco7071cf12013-04-16 11:02:13 -0700703 enqueued_pkt_count = len(self.packets)
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700704 with self.packets_cv:
705 self.packets = []
Dan Talayco7071cf12013-04-16 11:02:13 -0700706 return enqueued_pkt_count
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700707
Dan Talayco21c75c72010-02-12 22:59:24 -0800708 def __str__(self):
709 string = "Controller:\n"
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800710 string += " state " + self.dbg_state + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800711 string += " switch_addr " + str(self.switch_addr) + "\n"
712 string += " pending pkts " + str(len(self.packets)) + "\n"
713 string += " total pkts " + str(self.packets_total) + "\n"
714 string += " expired pkts " + str(self.packets_expired) + "\n"
715 string += " handled pkts " + str(self.packets_handled) + "\n"
Dan Talaycoe226eb12010-02-18 23:06:30 -0800716 string += " poll discards " + str(self.poll_discards) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800717 string += " parse errors " + str(self.parse_errors) + "\n"
718 string += " sock errrors " + str(self.socket_errors) + "\n"
719 string += " max pkts " + str(self.max_pkts) + "\n"
Dan Talayco69ca4d62012-11-15 11:50:22 -0800720 string += " target switch " + str(self.switch) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800721 string += " host " + str(self.host) + "\n"
722 string += " port " + str(self.port) + "\n"
723 string += " keep_alive " + str(self.keep_alive) + "\n"
Dan Talaycof8de5182012-04-12 22:38:41 -0700724 string += " pkt_in_run " + str(self.pkt_in_run) + "\n"
725 string += " pkt_in_dropped " + str(self.pkt_in_dropped) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800726 return string
727
728 def show(self):
729 print str(self)
730
731def sample_handler(controller, msg, pkt):
732 """
733 Sample message handler
734
735 This is the prototype for functions registered with the controller
736 class for packet reception
737
738 @param controller The controller calling the handler
739 @param msg The parsed message object
740 @param pkt The raw packet that was received on the socket. This is
741 in case the packet contains extra unparsed data.
742 @returns Boolean value indicating if the packet was handled. If
743 not handled, the packet is placed in the queue for pollers to received
744 """
745 pass