blob: d17a88ff51ffd8c0675e9258307142d212783e38 [file] [log] [blame]
Dan Talayco34089522010-02-07 23:07:41 -08001"""
2OpenFlow Test Framework
3
4Controller class
5
6Provide the interface to the control channel to the switch under test.
7
8Class inherits from thread so as to run in background allowing
9asynchronous callbacks (if needed, not required). Also supports
10polling.
11
12The controller thread maintains a queue. Incoming messages that
13are not handled by a callback function are placed in this queue for
14poll calls.
15
16Callbacks and polling support specifying the message type
17
18@todo Support transaction semantics via xid
Dan Talayco1b3f6902010-02-15 14:14:19 -080019@todo Support select and listen on an administrative socket (or
20use a timeout to support clean shutdown).
21
22Currently only one connection is accepted during the life of
23the controller. There seems
24to be no clean way to interrupt an accept call. Using select that also listens
25on an administrative socket and can shut down the socket might work.
26
Dan Talayco34089522010-02-07 23:07:41 -080027"""
28
Rich Lane720eaf22013-08-09 18:00:45 -070029import sys
Dan Talayco34089522010-02-07 23:07:41 -080030import os
31import socket
32import time
Rich Lanecd97d3d2013-01-07 18:50:06 -080033import struct
34import select
35import logging
Dan Talayco34089522010-02-07 23:07:41 -080036from threading import Thread
37from threading import Lock
Dan Talayco21c75c72010-02-12 22:59:24 -080038from threading import Condition
Dan Talayco48370102010-03-03 15:17:33 -080039
Wilson Ngc11a9182013-10-28 16:02:03 -070040import ofutils
41import loxi
42
43# Configured openflow version
44import ofp as cfg_ofp
Dan Talaycof8de5182012-04-12 22:38:41 -070045
46FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.'
47 for x in range(256)])
48
49def hex_dump_buffer(src, length=16):
50 """
51 Convert src to a hex dump string and return the string
52 @param src The source buffer
53 @param length The number of bytes shown in each line
54 @returns A string showing the hex dump
55 """
56 result = ["\n"]
57 for i in xrange(0, len(src), length):
58 chars = src[i:i+length]
59 hex = ' '.join(["%02x" % ord(x) for x in chars])
60 printable = ''.join(["%s" % ((ord(x) <= 127 and
61 FILTER[ord(x)]) or '.') for x in chars])
62 result.append("%04x %-*s %s\n" % (i, length*3, hex, printable))
63 return ''.join(result)
64
Dan Talayco48370102010-03-03 15:17:33 -080065##@todo Find a better home for these identifiers (controller)
Glen Gibb741b1182010-07-08 16:43:58 -070066RCV_SIZE_DEFAULT = 32768
Dan Talayco48370102010-03-03 15:17:33 -080067LISTEN_QUEUE_SIZE = 1
Dan Talayco34089522010-02-07 23:07:41 -080068
69class Controller(Thread):
70 """
71 Class abstracting the control interface to the switch.
72
73 For receiving messages, two mechanism will be implemented. First,
74 query the interface with poll. Second, register to have a
75 function called by message type. The callback is passed the
76 message type as well as the raw packet (or message object)
77
78 One of the main purposes of this object is to translate between network
79 and host byte order. 'Above' this object, things should be in host
80 byte order.
Dan Talayco21c75c72010-02-12 22:59:24 -080081
82 @todo Consider using SocketServer for listening socket
83 @todo Test transaction code
84
85 @var rcv_size The receive size to use for receive calls
86 @var max_pkts The max size of the receive queue
87 @var keep_alive If true, listen for echo requests and respond w/
88 echo replies
Dan Talayco710438c2010-02-18 15:16:07 -080089 @var initial_hello If true, will send a hello message immediately
90 upon connecting to the switch
Dan Talayco69ca4d62012-11-15 11:50:22 -080091 @var switch If not None, do an active connection to the switch
Dan Talayco21c75c72010-02-12 22:59:24 -080092 @var host The host to use for connect
93 @var port The port to connect on
94 @var packets_total Total number of packets received
95 @var packets_expired Number of packets popped from queue as queue full
96 @var packets_handled Number of packets handled by something
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080097 @var dbg_state Debug indication of state
Dan Talayco34089522010-02-07 23:07:41 -080098 """
99
Rich Lane4d1f3eb2013-10-03 13:45:57 -0700100 def __init__(self, switch=None, host='127.0.0.1', port=6653, max_pkts=1024):
Dan Talayco21c75c72010-02-12 22:59:24 -0800101 Thread.__init__(self)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800102 # Socket related
Dan Talayco21c75c72010-02-12 22:59:24 -0800103 self.rcv_size = RCV_SIZE_DEFAULT
Dan Talayco1b3f6902010-02-15 14:14:19 -0800104 self.listen_socket = None
105 self.switch_socket = None
106 self.switch_addr = None
Dan Talayco710438c2010-02-18 15:16:07 -0800107 self.connect_cv = Condition()
Dan Talaycoe226eb12010-02-18 23:06:30 -0800108 self.message_cv = Condition()
Rich Lanec9d3edd2013-10-09 00:21:01 -0700109 self.tx_lock = Lock()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800110
Rich Lane4dfd5e12012-12-22 19:48:01 -0800111 # Used to wake up the event loop from another thread
Rich Lanecd97d3d2013-01-07 18:50:06 -0800112 self.waker = ofutils.EventDescriptor()
Rich Lane32797542012-12-22 17:46:05 -0800113
Dan Talayco1b3f6902010-02-15 14:14:19 -0800114 # Counters
Dan Talayco21c75c72010-02-12 22:59:24 -0800115 self.socket_errors = 0
116 self.parse_errors = 0
Dan Talayco21c75c72010-02-12 22:59:24 -0800117 self.packets_total = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -0800118 self.packets_expired = 0
119 self.packets_handled = 0
Dan Talaycoe226eb12010-02-18 23:06:30 -0800120 self.poll_discards = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -0800121
122 # State
Dan Talayco21c75c72010-02-12 22:59:24 -0800123 self.sync = Lock()
124 self.handlers = {}
125 self.keep_alive = False
Dan Talayco710438c2010-02-18 15:16:07 -0800126 self.active = True
127 self.initial_hello = True
Dan Talayco1b3f6902010-02-15 14:14:19 -0800128
Rich Lanec4f071b2012-07-11 17:25:57 -0700129 # OpenFlow message/packet queue
130 # Protected by the packets_cv lock / condition variable
131 self.packets = []
132 self.packets_cv = Condition()
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700133 self.packet_in_count = 0
Rich Lanec4f071b2012-07-11 17:25:57 -0700134
Dan Talayco1b3f6902010-02-15 14:14:19 -0800135 # Settings
136 self.max_pkts = max_pkts
Dan Talayco69ca4d62012-11-15 11:50:22 -0800137 self.switch = switch
138 self.passive = not self.switch
Dan Talayco48370102010-03-03 15:17:33 -0800139 self.host = host
140 self.port = port
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800141 self.dbg_state = "init"
Dan Talayco48370102010-03-03 15:17:33 -0800142 self.logger = logging.getLogger("controller")
Dan Talaycof8de5182012-04-12 22:38:41 -0700143 self.filter_packet_in = False # Drop "excessive" packet ins
144 self.pkt_in_run = 0 # Count on run of packet ins
145 self.pkt_in_filter_limit = 50 # Count on run of packet ins
146 self.pkt_in_dropped = 0 # Total dropped packet ins
147 self.transact_to = 15 # Transact timeout default value; add to config
Dan Talayco1b3f6902010-02-15 14:14:19 -0800148
Dan Talaycoe226eb12010-02-18 23:06:30 -0800149 # Transaction and message type waiting variables
150 # xid_cv: Condition variable (semaphore) for packet waiters
Dan Talayco21c75c72010-02-12 22:59:24 -0800151 # xid: Transaction ID being waited on
152 # xid_response: Transaction response message
153 self.xid_cv = Condition()
154 self.xid = None
155 self.xid_response = None
156
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800157 self.buffered_input = ""
Dan Talaycoe226eb12010-02-18 23:06:30 -0800158
Rich Lane207502e2012-12-31 14:29:12 -0800159 # Create listen socket
160 if self.passive:
161 self.logger.info("Create/listen at " + self.host + ":" +
162 str(self.port))
163 self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
164 self.listen_socket.setsockopt(socket.SOL_SOCKET,
165 socket.SO_REUSEADDR, 1)
166 self.listen_socket.bind((self.host, self.port))
167 self.listen_socket.listen(LISTEN_QUEUE_SIZE)
168
Dan Talaycof8de5182012-04-12 22:38:41 -0700169 def filter_packet(self, rawmsg, hdr):
170 """
171 Check if packet should be filtered
172
173 Currently filters packet in messages
174 @return Boolean, True if packet should be dropped
175 """
Rich Lane1622bbb2013-03-11 17:11:53 -0700176 # XXX didn't actually check for packet-in...
177 return False
Dan Talaycof8de5182012-04-12 22:38:41 -0700178 # Add check for packet in and rate limit
179 if self.filter_packet_in:
Rich Lanec4f071b2012-07-11 17:25:57 -0700180 # If we were dropping packets, report number dropped
181 # TODO dont drop expected packet ins
182 if self.pkt_in_run > self.pkt_in_filter_limit:
183 self.logger.debug("Dropped %d packet ins (%d total)"
184 % ((self.pkt_in_run -
185 self.pkt_in_filter_limit),
186 self.pkt_in_dropped))
187 self.pkt_in_run = 0
Dan Talaycof8de5182012-04-12 22:38:41 -0700188
189 return False
190
Dan Talaycod12b6612010-03-07 22:00:46 -0800191 def _pkt_handle(self, pkt):
192 """
193 Check for all packet handling conditions
194
195 Parse and verify message
196 Check if XID matches something waiting
197 Check if message is being expected for a poll operation
198 Check if keep alive is on and message is an echo request
199 Check if any registered handler wants the packet
200 Enqueue if none of those conditions is met
201
202 an echo request in case keep_alive is true, followed by
203 registered message handlers.
Glen Gibb6d467062010-07-08 16:15:08 -0700204 @param pkt The raw packet (string) which may contain multiple OF msgs
Dan Talaycod12b6612010-03-07 22:00:46 -0800205 """
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800206
207 # snag any left over data from last read()
208 pkt = self.buffered_input + pkt
209 self.buffered_input = ""
210
Glen Gibb6d467062010-07-08 16:15:08 -0700211 # Process each of the OF msgs inside the pkt
212 offset = 0
213 while offset < len(pkt):
Rich Lane1622bbb2013-03-11 17:11:53 -0700214 if offset + 8 > len(pkt):
215 break
216
Glen Gibb6d467062010-07-08 16:15:08 -0700217 # Parse the header to get type
Wilson Ngc11a9182013-10-28 16:02:03 -0700218 hdr_version, hdr_type, hdr_length, hdr_xid = cfg_ofp.message.parse_header(pkt[offset:])
219
220 # Use loxi to resolve to ofp of matching version
221 ofp = loxi.protocol(hdr_version)
Dan Talaycod12b6612010-03-07 22:00:46 -0800222
Glen Gibb6d467062010-07-08 16:15:08 -0700223 # Extract the raw message bytes
Rich Lane1622bbb2013-03-11 17:11:53 -0700224 if (offset + hdr_length) > len(pkt):
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800225 break
Rich Lane1622bbb2013-03-11 17:11:53 -0700226 rawmsg = pkt[offset : offset + hdr_length]
227 offset += hdr_length
Dan Talaycof8de5182012-04-12 22:38:41 -0700228
Rich Lane1622bbb2013-03-11 17:11:53 -0700229 #if self.filter_packet(rawmsg, hdr):
230 # continue
Dan Talaycof8de5182012-04-12 22:38:41 -0700231
Rich Lanef6883512013-03-11 17:00:09 -0700232 msg = ofp.message.parse_message(rawmsg)
Glen Gibb6d467062010-07-08 16:15:08 -0700233 if not msg:
234 self.parse_errors += 1
235 self.logger.warn("Could not parse message")
236 continue
237
Rich Lane1fd43e32014-01-06 15:22:50 -0800238 self.logger.debug("Msg in: version %d class %s len %d xid %d",
239 hdr_version, type(msg).__name__, hdr_length, hdr_xid)
240
Rich Lanec4f071b2012-07-11 17:25:57 -0700241 with self.sync:
242 # Check if transaction is waiting
243 with self.xid_cv:
Rich Lane1622bbb2013-03-11 17:11:53 -0700244 if self.xid and hdr_xid == self.xid:
245 self.logger.debug("Matched expected XID " + str(hdr_xid))
Rich Lanec4f071b2012-07-11 17:25:57 -0700246 self.xid_response = (msg, rawmsg)
247 self.xid = None
248 self.xid_cv.notify()
249 continue
Glen Gibb6d467062010-07-08 16:15:08 -0700250
Rich Lanec4f071b2012-07-11 17:25:57 -0700251 # Check if keep alive is set; if so, respond to echo requests
252 if self.keep_alive:
Rich Lane1622bbb2013-03-11 17:11:53 -0700253 if hdr_type == ofp.OFPT_ECHO_REQUEST:
Rich Lanec4f071b2012-07-11 17:25:57 -0700254 self.logger.debug("Responding to echo request")
Rich Lane78ef8b92013-01-10 12:19:23 -0800255 rep = ofp.message.echo_reply()
Rich Lane1622bbb2013-03-11 17:11:53 -0700256 rep.xid = hdr_xid
Rich Lanec4f071b2012-07-11 17:25:57 -0700257 # Ignoring additional data
Rich Lane1fd43e32014-01-06 15:22:50 -0800258 self.message_send(rep)
Rich Lanec4f071b2012-07-11 17:25:57 -0700259 continue
Glen Gibb6d467062010-07-08 16:15:08 -0700260
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700261 # Generalize to counters for all packet types?
262 if msg.type == ofp.OFPT_PACKET_IN:
263 self.packet_in_count += 1
264
Rich Lane5d63b9c2013-01-11 14:12:37 -0800265 # Log error messages
Rich Lane1622bbb2013-03-11 17:11:53 -0700266 if hdr_type == ofp.OFPT_ERROR:
Rich Laneb73808c2013-03-11 15:22:23 -0700267 if msg.err_type in ofp.ofp_error_type_map:
268 type_str = ofp.ofp_error_type_map[msg.err_type]
269 if msg.err_type == ofp.OFPET_HELLO_FAILED:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800270 code_map = ofp.ofp_hello_failed_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700271 elif msg.err_type == ofp.OFPET_BAD_REQUEST:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800272 code_map = ofp.ofp_bad_request_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700273 elif msg.err_type == ofp.OFPET_BAD_ACTION:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800274 code_map = ofp.ofp_bad_action_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700275 elif msg.err_type == ofp.OFPET_FLOW_MOD_FAILED:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800276 code_map = ofp.ofp_flow_mod_failed_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700277 elif msg.err_type == ofp.OFPET_PORT_MOD_FAILED:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800278 code_map = ofp.ofp_port_mod_failed_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700279 elif msg.err_type == ofp.OFPET_QUEUE_OP_FAILED:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800280 code_map = ofp.ofp_queue_op_failed_code_map
281 else:
282 code_map = None
283
284 if code_map and msg.code in code_map:
285 code_str = code_map[msg.code]
286 else:
287 code_str = "unknown"
288 else:
289 type_str = "unknown"
Rich Lane1879dc72013-03-11 22:08:51 -0700290 code_str = "unknown"
Rich Lane5d63b9c2013-01-11 14:12:37 -0800291 self.logger.warn("Received error message: xid=%d type=%s (%d) code=%s (%d)",
Rich Lane1622bbb2013-03-11 17:11:53 -0700292 hdr_xid, type_str, msg.err_type, code_str, msg.code)
Rich Lane5d63b9c2013-01-11 14:12:37 -0800293
Rich Lanec4f071b2012-07-11 17:25:57 -0700294 # Now check for message handlers; preference is given to
295 # handlers for a specific packet
296 handled = False
Rich Lane1622bbb2013-03-11 17:11:53 -0700297 if hdr_type in self.handlers.keys():
298 handled = self.handlers[hdr_type](self, msg, rawmsg)
Rich Lanec4f071b2012-07-11 17:25:57 -0700299 if not handled and ("all" in self.handlers.keys()):
300 handled = self.handlers["all"](self, msg, rawmsg)
Glen Gibb6d467062010-07-08 16:15:08 -0700301
Rich Lanec4f071b2012-07-11 17:25:57 -0700302 if not handled: # Not handled, enqueue
Rich Lane1879dc72013-03-11 22:08:51 -0700303 self.logger.debug("Enqueuing pkt type %s (%d)",
304 ofp.ofp_type_map.get(hdr_type, "unknown"),
305 hdr_type)
Rich Lanec4f071b2012-07-11 17:25:57 -0700306 with self.packets_cv:
307 if len(self.packets) >= self.max_pkts:
308 self.packets.pop(0)
309 self.packets_expired += 1
310 self.packets.append((msg, rawmsg))
311 self.packets_cv.notify_all()
312 self.packets_total += 1
313 else:
314 self.packets_handled += 1
315 self.logger.debug("Message handled by callback")
Glen Gibb6d467062010-07-08 16:15:08 -0700316
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800317 # end of 'while offset < len(pkt)'
318 # note that if offset = len(pkt), this is
319 # appends a harmless empty string
320 self.buffered_input += pkt[offset:]
Dan Talaycod12b6612010-03-07 22:00:46 -0800321
Dan Talayco710438c2010-02-18 15:16:07 -0800322 def _socket_ready_handle(self, s):
323 """
324 Handle an input-ready socket
Dan Talaycof8de5182012-04-12 22:38:41 -0700325
Dan Talayco710438c2010-02-18 15:16:07 -0800326 @param s The socket object that is ready
Dan Talaycof8de5182012-04-12 22:38:41 -0700327 @returns 0 on success, -1 on error
Dan Talayco710438c2010-02-18 15:16:07 -0800328 """
329
Dan Talayco69ca4d62012-11-15 11:50:22 -0800330 if self.passive and s and s == self.listen_socket:
Dan Talayco710438c2010-02-18 15:16:07 -0800331 if self.switch_socket:
Rich Lanee1da7ea2012-07-26 15:58:45 -0700332 self.logger.warning("Ignoring incoming connection; already connected to switch")
Rich Laneb4f8ecb2012-09-25 09:36:26 -0700333 (sock, addr) = self.listen_socket.accept()
334 sock.close()
Rich Lanee1da7ea2012-07-26 15:58:45 -0700335 return 0
Dan Talayco710438c2010-02-18 15:16:07 -0800336
Ken Chiange875baf2012-10-09 15:24:40 -0700337 try:
338 (sock, addr) = self.listen_socket.accept()
339 except:
340 self.logger.warning("Error on listen socket accept")
341 return -1
Ken Chiang77173992012-10-30 15:44:39 -0700342 self.logger.info(self.host+":"+str(self.port)+": Incoming connection from "+str(addr))
Rich Lanee1da7ea2012-07-26 15:58:45 -0700343
Rich Laneee3586c2012-07-11 17:26:02 -0700344 with self.connect_cv:
Rich Lanee1da7ea2012-07-26 15:58:45 -0700345 (self.switch_socket, self.switch_addr) = (sock, addr)
Rich Lane82ef1832012-12-22 17:04:35 -0800346 self.switch_socket.setsockopt(socket.IPPROTO_TCP,
347 socket.TCP_NODELAY, True)
Rich Lane1a8d5aa2012-10-08 15:40:03 -0700348 if self.initial_hello:
Wilson Ngc11a9182013-10-28 16:02:03 -0700349 self.message_send(cfg_ofp.message.hello())
Rich Lanee1da7ea2012-07-26 15:58:45 -0700350 self.connect_cv.notify() # Notify anyone waiting
Rich Laned929b8d2013-04-15 15:59:14 -0700351
352 # Prevent further connections
353 self.listen_socket.close()
354 self.listen_socket = None
Dan Talaycof8de5182012-04-12 22:38:41 -0700355 elif s and s == self.switch_socket:
356 for idx in range(3): # debug: try a couple of times
357 try:
358 pkt = self.switch_socket.recv(self.rcv_size)
359 except:
360 self.logger.warning("Error on switch read")
361 return -1
362
363 if not self.active:
364 return 0
365
366 if len(pkt) == 0:
367 self.logger.warning("Zero-length switch read, %d" % idx)
368 else:
369 break
Dan Talayco710438c2010-02-18 15:16:07 -0800370
Dan Talaycof8de5182012-04-12 22:38:41 -0700371 if len(pkt) == 0: # Still no packet
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700372 self.logger.warning("Zero-length switch read; closing cxn")
Dan Talaycof8de5182012-04-12 22:38:41 -0700373 self.logger.info(str(self))
374 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800375
Dan Talaycod12b6612010-03-07 22:00:46 -0800376 self._pkt_handle(pkt)
Rich Lane4dfd5e12012-12-22 19:48:01 -0800377 elif s and s == self.waker:
378 self.waker.wait()
Dan Talayco710438c2010-02-18 15:16:07 -0800379 else:
Dan Talayco48370102010-03-03 15:17:33 -0800380 self.logger.error("Unknown socket ready: " + str(s))
Dan Talaycof8de5182012-04-12 22:38:41 -0700381 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800382
Dan Talaycof8de5182012-04-12 22:38:41 -0700383 return 0
Dan Talayco710438c2010-02-18 15:16:07 -0800384
Dan Talayco69ca4d62012-11-15 11:50:22 -0800385 def active_connect(self):
386 """
387 Actively connect to a switch IP addr
388 """
389 try:
390 self.logger.info("Trying active connection to %s" % self.switch)
391 soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
392 soc.connect((self.switch, self.port))
393 self.logger.info("Connected to " + self.switch + " on " +
394 str(self.port))
Rich Lane82ef1832012-12-22 17:04:35 -0800395 soc.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800396 self.switch_addr = (self.switch, self.port)
397 return soc
398 except (StandardError, socket.error), e:
399 self.logger.error("Could not connect to %s at %d:: %s" %
400 (self.switch, self.port, str(e)))
401 return None
402
Rich Lane32797542012-12-22 17:46:05 -0800403 def wakeup(self):
404 """
405 Wake up the event loop, presumably from another thread.
406 """
Rich Lane4dfd5e12012-12-22 19:48:01 -0800407 self.waker.notify()
Rich Lane32797542012-12-22 17:46:05 -0800408
409 def sockets(self):
410 """
411 Return list of sockets to select on.
412 """
Rich Lane4dfd5e12012-12-22 19:48:01 -0800413 socs = [self.listen_socket, self.switch_socket, self.waker]
Rich Lane32797542012-12-22 17:46:05 -0800414 return [x for x in socs if x]
415
Dan Talayco1b3f6902010-02-15 14:14:19 -0800416 def run(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800417 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800418 Activity function for class
Dan Talayco21c75c72010-02-12 22:59:24 -0800419
Dan Talayco1b3f6902010-02-15 14:14:19 -0800420 Assumes connection to switch already exists. Listens on
421 switch_socket for messages until an error (or zero len pkt)
422 occurs.
Dan Talayco21c75c72010-02-12 22:59:24 -0800423
Dan Talayco1b3f6902010-02-15 14:14:19 -0800424 When there is a message on the socket, check for handlers; queue the
425 packet if no one handles the packet.
426
427 See note for controller describing the limitation of a single
428 connection for now.
429 """
430
Rich Lane207502e2012-12-31 14:29:12 -0800431 self.dbg_state = "running"
Ken Chiangadc950f2012-10-05 13:50:03 -0700432
Dan Talayco710438c2010-02-18 15:16:07 -0800433 while self.active:
Dan Talayco710438c2010-02-18 15:16:07 -0800434 try:
435 sel_in, sel_out, sel_err = \
Rich Lane32797542012-12-22 17:46:05 -0800436 select.select(self.sockets(), [], self.sockets(), 1)
Dan Talayco710438c2010-02-18 15:16:07 -0800437 except:
438 print sys.exc_info()
Ken Chiangadc950f2012-10-05 13:50:03 -0700439 self.logger.error("Select error, disconnecting")
440 self.disconnect()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800441
Dan Talayco710438c2010-02-18 15:16:07 -0800442 for s in sel_err:
Ken Chiangadc950f2012-10-05 13:50:03 -0700443 self.logger.error("Got socket error on: " + str(s) + ", disconnecting")
444 self.disconnect()
Dan Talaycof8de5182012-04-12 22:38:41 -0700445
446 for s in sel_in:
447 if self._socket_ready_handle(s) == -1:
Ken Chiangadc950f2012-10-05 13:50:03 -0700448 self.disconnect()
Dan Talayco710438c2010-02-18 15:16:07 -0800449
Dan Talayco710438c2010-02-18 15:16:07 -0800450 # End of main loop
451 self.dbg_state = "closing"
Dan Talayco48370102010-03-03 15:17:33 -0800452 self.logger.info("Exiting controller thread")
Dan Talayco710438c2010-02-18 15:16:07 -0800453 self.shutdown()
454
Rich Lane8806bc42012-07-26 19:18:37 -0700455 def connect(self, timeout=-1):
Dan Talayco710438c2010-02-18 15:16:07 -0800456 """
457 Connect to the switch
458
Rich Lane8806bc42012-07-26 19:18:37 -0700459 @param timeout Block for up to timeout seconds. Pass -1 for the default.
Dan Talayco710438c2010-02-18 15:16:07 -0800460 @return Boolean, True if connected
461 """
462
Dan Talayco69ca4d62012-11-15 11:50:22 -0800463 if not self.passive: # Do active connection now
464 self.logger.info("Attempting to connect to %s on port %s" %
465 (self.switch, str(self.port)))
466 soc = self.active_connect()
467 if soc:
468 self.logger.info("Connected to %s", self.switch)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800469 self.dbg_state = "running"
470 self.switch_socket = soc
Rich Lane32797542012-12-22 17:46:05 -0800471 self.wakeup()
Dan Talayco69ca4d62012-11-15 11:50:22 -0800472 with self.connect_cv:
473 if self.initial_hello:
Wilson Ngc11a9182013-10-28 16:02:03 -0700474 self.message_send(cfg_ofp.message.hello())
Dan Talayco69ca4d62012-11-15 11:50:22 -0800475 self.connect_cv.notify() # Notify anyone waiting
476 else:
477 self.logger.error("Could not actively connect to switch %s",
478 self.switch)
479 self.active = False
480 else:
481 with self.connect_cv:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800482 ofutils.timed_wait(self.connect_cv, lambda: self.switch_socket,
483 timeout=timeout)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800484
Dan Talayco710438c2010-02-18 15:16:07 -0800485 return self.switch_socket is not None
486
Ken Chiangadc950f2012-10-05 13:50:03 -0700487 def disconnect(self, timeout=-1):
488 """
489 If connected to a switch, disconnect.
490 """
491 if self.switch_socket:
Ken Chiangadc950f2012-10-05 13:50:03 -0700492 self.switch_socket.close()
493 self.switch_socket = None
494 self.switch_addr = None
Ken Chiang74be4722012-12-21 13:07:03 -0800495 with self.packets_cv:
496 self.packets = []
Ken Chiange875baf2012-10-09 15:24:40 -0700497 with self.connect_cv:
498 self.connect_cv.notifyAll()
Ken Chiangadc950f2012-10-05 13:50:03 -0700499
500 def wait_disconnected(self, timeout=-1):
501 """
502 @param timeout Block for up to timeout seconds. Pass -1 for the default.
503 @return Boolean, True if disconnected
504 """
505
Ken Chiange875baf2012-10-09 15:24:40 -0700506 with self.connect_cv:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800507 ofutils.timed_wait(self.connect_cv,
508 lambda: True if not self.switch_socket else None,
509 timeout=timeout)
Ken Chiangadc950f2012-10-05 13:50:03 -0700510 return self.switch_socket is None
511
Dan Talayco710438c2010-02-18 15:16:07 -0800512 def kill(self):
513 """
514 Force the controller thread to quit
Dan Talayco710438c2010-02-18 15:16:07 -0800515 """
516 self.active = False
Rich Lane32797542012-12-22 17:46:05 -0800517 self.wakeup()
Rich Lane376bb402012-12-31 15:20:16 -0800518 self.join()
Dan Talayco21c75c72010-02-12 22:59:24 -0800519
Dan Talayco1b3f6902010-02-15 14:14:19 -0800520 def shutdown(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800521 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800522 Shutdown the controller closing all sockets
Dan Talayco21c75c72010-02-12 22:59:24 -0800523
Dan Talayco1b3f6902010-02-15 14:14:19 -0800524 @todo Might want to synchronize shutdown with self.sync...
525 """
Dan Talaycof8de5182012-04-12 22:38:41 -0700526
Dan Talayco710438c2010-02-18 15:16:07 -0800527 self.active = False
528 try:
529 self.switch_socket.shutdown(socket.SHUT_RDWR)
530 except:
Dan Talayco48370102010-03-03 15:17:33 -0800531 self.logger.info("Ignoring switch soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800532 self.switch_socket = None
Dan Talayco1b3f6902010-02-15 14:14:19 -0800533
Dan Talayco710438c2010-02-18 15:16:07 -0800534 try:
535 self.listen_socket.shutdown(socket.SHUT_RDWR)
536 except:
Dan Talayco48370102010-03-03 15:17:33 -0800537 self.logger.info("Ignoring listen soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800538 self.listen_socket = None
Dan Talaycof8de5182012-04-12 22:38:41 -0700539
Rich Laneee3586c2012-07-11 17:26:02 -0700540 # Wakeup condition variables on which controller may be wait
541 with self.xid_cv:
542 self.xid_cv.notifyAll()
Dan Talaycof8de5182012-04-12 22:38:41 -0700543
Rich Laneee3586c2012-07-11 17:26:02 -0700544 with self.connect_cv:
545 self.connect_cv.notifyAll()
Dan Talaycof8de5182012-04-12 22:38:41 -0700546
Rich Lane32797542012-12-22 17:46:05 -0800547 self.wakeup()
Dan Talayco710438c2010-02-18 15:16:07 -0800548 self.dbg_state = "down"
549
Dan Talayco34089522010-02-07 23:07:41 -0800550 def register(self, msg_type, handler):
551 """
552 Register a callback to receive a specific message type.
553
554 Only one handler may be registered for a given message type.
Dan Talaycod12b6612010-03-07 22:00:46 -0800555
556 WARNING: A lock is held during the handler call back, so
557 the handler should not make any blocking calls
558
Dan Talayco34089522010-02-07 23:07:41 -0800559 @param msg_type The type of message to receive. May be DEFAULT
Dan Talayco21c75c72010-02-12 22:59:24 -0800560 for all non-handled packets. The special type, the string "all"
561 will send all packets to the handler.
Dan Talayco34089522010-02-07 23:07:41 -0800562 @param handler The function to call when a message of the given
563 type is received.
564 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800565 # Should check type is valid
566 if not handler and msg_type in self.handlers.keys():
567 del self.handlers[msg_type]
568 return
569 self.handlers[msg_type] = handler
Dan Talayco34089522010-02-07 23:07:41 -0800570
sumithdev095542cf52013-07-12 14:56:28 -0400571 def poll(self, exp_msg=None, timeout=-1):
Dan Talayco34089522010-02-07 23:07:41 -0800572 """
573 Wait for the next OF message received from the switch.
574
575 @param exp_msg If set, return only when this type of message
Dan Talayco48370102010-03-03 15:17:33 -0800576 is received (unless timeout occurs).
Rich Laneb64ce3d2012-07-26 15:37:57 -0700577
578 @param timeout Maximum number of seconds to wait for the message.
579 Pass -1 for the default timeout.
Dan Talayco34089522010-02-07 23:07:41 -0800580
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800581 @retval A pair (msg, pkt) where msg is a message object and pkt
582 the string representing the packet as received from the socket.
583 This allows additional parsing by the receiver if necessary.
584
Dan Talayco34089522010-02-07 23:07:41 -0800585 The data members in the message are in host endian order.
Dan Talayco48370102010-03-03 15:17:33 -0800586 If an error occurs, (None, None) is returned
Dan Talayco34089522010-02-07 23:07:41 -0800587 """
Dan Talayco34089522010-02-07 23:07:41 -0800588
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700589 exp_msg_str = "unspecified"
sumithdev095542cf52013-07-12 14:56:28 -0400590 if exp_msg is not None:
Wilson Ngc11a9182013-10-28 16:02:03 -0700591 exp_msg_str = cfg_ofp.ofp_type_map.get(exp_msg, "unknown (%d)" %
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700592 exp_msg)
Rich Lane1879dc72013-03-11 22:08:51 -0700593
sumithdev095542cf52013-07-12 14:56:28 -0400594 if exp_msg is not None:
Rich Lane1879dc72013-03-11 22:08:51 -0700595 self.logger.debug("Poll for %s", exp_msg_str)
Ed Swierk9e55e282012-08-22 06:57:28 -0700596 else:
597 self.logger.debug("Poll for any OF message")
Rich Laneb64ce3d2012-07-26 15:37:57 -0700598
599 # Take the packet from the queue
Rich Lanec4f071b2012-07-11 17:25:57 -0700600 def grab():
601 if len(self.packets) > 0:
sumithdev095542cf52013-07-12 14:56:28 -0400602 if exp_msg is None:
Rich Lanec4f071b2012-07-11 17:25:57 -0700603 self.logger.debug("Looking for any packet")
604 (msg, pkt) = self.packets.pop(0)
605 return (msg, pkt)
606 else:
Rich Lane1879dc72013-03-11 22:08:51 -0700607 self.logger.debug("Looking for %s", exp_msg_str)
Rich Lanec4f071b2012-07-11 17:25:57 -0700608 for i in range(len(self.packets)):
609 msg = self.packets[i][0]
Wilson Ngc11a9182013-10-28 16:02:03 -0700610 msg_str = cfg_ofp.ofp_type_map.get(msg.type, "unknown (%d)" % msg.type)
Rich Lane7094ff12013-05-07 14:57:53 -0700611 self.logger.debug("Checking packets[%d] %s) against %s", i, msg_str, exp_msg_str)
Rich Laneb73808c2013-03-11 15:22:23 -0700612 if msg.type == exp_msg:
Rich Lanec4f071b2012-07-11 17:25:57 -0700613 (msg, pkt) = self.packets.pop(i)
614 return (msg, pkt)
615 # Not found
616 self.logger.debug("Packet not in queue")
Rich Laneb64ce3d2012-07-26 15:37:57 -0700617 return None
Dan Talayco21c75c72010-02-12 22:59:24 -0800618
Rich Lanec4f071b2012-07-11 17:25:57 -0700619 with self.packets_cv:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800620 ret = ofutils.timed_wait(self.packets_cv, grab, timeout=timeout)
Rich Lanec4f071b2012-07-11 17:25:57 -0700621
Rich Laneb64ce3d2012-07-26 15:37:57 -0700622 if ret != None:
623 (msg, pkt) = ret
624 self.logger.debug("Got message %s" % str(msg))
625 return (msg, pkt)
626 else:
627 return (None, None)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800628
Rich Lane8fbfd662013-03-11 15:30:44 -0700629 def transact(self, msg, timeout=-1):
Dan Talayco34089522010-02-07 23:07:41 -0800630 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800631 Run a message transaction with the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800632
633 Send the message in msg and wait for a reply with a matching
Dan Talayco21c75c72010-02-12 22:59:24 -0800634 transaction id. Transactions have the highest priority in
635 received message handling.
Dan Talaycoe37999f2010-02-09 15:27:12 -0800636
Dan Talayco21c75c72010-02-12 22:59:24 -0800637 @param msg The message object to send; must not be a string
Rich Lanee1da7ea2012-07-26 15:58:45 -0700638 @param timeout The timeout in seconds; if -1 use default.
Dan Talayco34089522010-02-07 23:07:41 -0800639 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800640
Rich Lane8fbfd662013-03-11 15:30:44 -0700641 if msg.xid == None:
Rich Laneb73808c2013-03-11 15:22:23 -0700642 msg.xid = ofutils.gen_xid()
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800643
Rich Laneb73808c2013-03-11 15:22:23 -0700644 self.logger.debug("Running transaction %d" % msg.xid)
Dan Talayco21c75c72010-02-12 22:59:24 -0800645
Rich Lane9aca1992012-07-11 17:26:31 -0700646 with self.xid_cv:
647 if self.xid:
648 self.logger.error("Can only run one transaction at a time")
649 return (None, None)
Dan Talaycof8de5182012-04-12 22:38:41 -0700650
Rich Laneb73808c2013-03-11 15:22:23 -0700651 self.xid = msg.xid
Dan Talaycod12b6612010-03-07 22:00:46 -0800652 self.xid_response = None
Rich Lane1fd43e32014-01-06 15:22:50 -0800653 self.message_send(msg)
Rich Lane9aca1992012-07-11 17:26:31 -0700654
Rich Laneb73808c2013-03-11 15:22:23 -0700655 self.logger.debug("Waiting for transaction %d" % msg.xid)
Rich Lanecd97d3d2013-01-07 18:50:06 -0800656 ofutils.timed_wait(self.xid_cv, lambda: self.xid_response, timeout=timeout)
Rich Lane9aca1992012-07-11 17:26:31 -0700657
658 if self.xid_response:
659 (resp, pkt) = self.xid_response
660 self.xid_response = None
661 else:
662 (resp, pkt) = (None, None)
663
Dan Talayco09c2c592010-05-13 14:21:52 -0700664 if resp is None:
665 self.logger.warning("No response for xid " + str(self.xid))
666 return (resp, pkt)
Dan Talayco34089522010-02-07 23:07:41 -0800667
Rich Lane8fbfd662013-03-11 15:30:44 -0700668 def message_send(self, msg):
Dan Talayco34089522010-02-07 23:07:41 -0800669 """
670 Send the message to the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800671
Dan Talayco11c26e72010-03-07 22:03:57 -0800672 @param msg A string or OpenFlow message object to be forwarded to
673 the switch.
Dan Talayco21c75c72010-02-12 22:59:24 -0800674 """
675
Dan Talayco1b3f6902010-02-15 14:14:19 -0800676 if not self.switch_socket:
677 # Sending a string indicates the message is ready to go
Ed Swierk9e55e282012-08-22 06:57:28 -0700678 raise Exception("no socket")
Dan Talayco21c75c72010-02-12 22:59:24 -0800679
Rich Lane1fd43e32014-01-06 15:22:50 -0800680 if msg.xid == None:
681 msg.xid = ofutils.gen_xid()
682
683 outpkt = msg.pack()
684
685 self.logger.debug("Msg out: version %d class %s len %d xid %d",
686 msg.version, type(msg).__name__, len(outpkt), msg.xid)
Rich Lanec9d3edd2013-10-09 00:21:01 -0700687
688 with self.tx_lock:
689 if self.switch_socket.sendall(outpkt) is not None:
690 raise AssertionError("failed to send message to switch")
Dan Talayco710438c2010-02-18 15:16:07 -0800691
Rich Lane5c3151c2013-01-03 17:15:41 -0800692 return 0 # for backwards compatibility
Dan Talayco21c75c72010-02-12 22:59:24 -0800693
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700694 def clear_queue(self):
695 """
696 Clear the input queue and report the number of messages
697 that were in it
698 """
Dan Talayco7071cf12013-04-16 11:02:13 -0700699 enqueued_pkt_count = len(self.packets)
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700700 with self.packets_cv:
701 self.packets = []
Dan Talayco7071cf12013-04-16 11:02:13 -0700702 return enqueued_pkt_count
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700703
Dan Talayco21c75c72010-02-12 22:59:24 -0800704 def __str__(self):
705 string = "Controller:\n"
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800706 string += " state " + self.dbg_state + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800707 string += " switch_addr " + str(self.switch_addr) + "\n"
708 string += " pending pkts " + str(len(self.packets)) + "\n"
709 string += " total pkts " + str(self.packets_total) + "\n"
710 string += " expired pkts " + str(self.packets_expired) + "\n"
711 string += " handled pkts " + str(self.packets_handled) + "\n"
Dan Talaycoe226eb12010-02-18 23:06:30 -0800712 string += " poll discards " + str(self.poll_discards) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800713 string += " parse errors " + str(self.parse_errors) + "\n"
714 string += " sock errrors " + str(self.socket_errors) + "\n"
715 string += " max pkts " + str(self.max_pkts) + "\n"
Dan Talayco69ca4d62012-11-15 11:50:22 -0800716 string += " target switch " + str(self.switch) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800717 string += " host " + str(self.host) + "\n"
718 string += " port " + str(self.port) + "\n"
719 string += " keep_alive " + str(self.keep_alive) + "\n"
Dan Talaycof8de5182012-04-12 22:38:41 -0700720 string += " pkt_in_run " + str(self.pkt_in_run) + "\n"
721 string += " pkt_in_dropped " + str(self.pkt_in_dropped) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800722 return string
723
724 def show(self):
725 print str(self)
726
727def sample_handler(controller, msg, pkt):
728 """
729 Sample message handler
730
731 This is the prototype for functions registered with the controller
732 class for packet reception
733
734 @param controller The controller calling the handler
735 @param msg The parsed message object
736 @param pkt The raw packet that was received on the socket. This is
737 in case the packet contains extra unparsed data.
738 @returns Boolean value indicating if the packet was handled. If
739 not handled, the packet is placed in the queue for pollers to received
740 """
741 pass