blob: 0272b7077e4537efedb52e8511ceaa7c7ca51a7e [file] [log] [blame]
Dan Talayco34089522010-02-07 23:07:41 -08001"""
2OpenFlow Test Framework
3
4Controller class
5
6Provide the interface to the control channel to the switch under test.
7
8Class inherits from thread so as to run in background allowing
9asynchronous callbacks (if needed, not required). Also supports
10polling.
11
12The controller thread maintains a queue. Incoming messages that
13are not handled by a callback function are placed in this queue for
14poll calls.
15
16Callbacks and polling support specifying the message type
17
18@todo Support transaction semantics via xid
Dan Talayco1b3f6902010-02-15 14:14:19 -080019@todo Support select and listen on an administrative socket (or
20use a timeout to support clean shutdown).
21
22Currently only one connection is accepted during the life of
23the controller. There seems
24to be no clean way to interrupt an accept call. Using select that also listens
25on an administrative socket and can shut down the socket might work.
26
Dan Talayco34089522010-02-07 23:07:41 -080027"""
28
Rich Lane720eaf22013-08-09 18:00:45 -070029import sys
Dan Talayco34089522010-02-07 23:07:41 -080030import os
31import socket
32import time
Rich Lanecd97d3d2013-01-07 18:50:06 -080033import struct
34import select
35import logging
Dan Talayco34089522010-02-07 23:07:41 -080036from threading import Thread
37from threading import Lock
Dan Talayco21c75c72010-02-12 22:59:24 -080038from threading import Condition
Dan Talayco48370102010-03-03 15:17:33 -080039
Wilson Ngc11a9182013-10-28 16:02:03 -070040import ofutils
41import loxi
42
43# Configured openflow version
44import ofp as cfg_ofp
Dan Talaycof8de5182012-04-12 22:38:41 -070045
46FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.'
47 for x in range(256)])
48
49def hex_dump_buffer(src, length=16):
50 """
51 Convert src to a hex dump string and return the string
52 @param src The source buffer
53 @param length The number of bytes shown in each line
54 @returns A string showing the hex dump
55 """
56 result = ["\n"]
57 for i in xrange(0, len(src), length):
58 chars = src[i:i+length]
59 hex = ' '.join(["%02x" % ord(x) for x in chars])
60 printable = ''.join(["%s" % ((ord(x) <= 127 and
61 FILTER[ord(x)]) or '.') for x in chars])
62 result.append("%04x %-*s %s\n" % (i, length*3, hex, printable))
63 return ''.join(result)
64
Dan Talayco48370102010-03-03 15:17:33 -080065##@todo Find a better home for these identifiers (controller)
Glen Gibb741b1182010-07-08 16:43:58 -070066RCV_SIZE_DEFAULT = 32768
Dan Talayco48370102010-03-03 15:17:33 -080067LISTEN_QUEUE_SIZE = 1
Dan Talayco34089522010-02-07 23:07:41 -080068
69class Controller(Thread):
70 """
71 Class abstracting the control interface to the switch.
72
73 For receiving messages, two mechanism will be implemented. First,
74 query the interface with poll. Second, register to have a
75 function called by message type. The callback is passed the
76 message type as well as the raw packet (or message object)
77
78 One of the main purposes of this object is to translate between network
79 and host byte order. 'Above' this object, things should be in host
80 byte order.
Dan Talayco21c75c72010-02-12 22:59:24 -080081
82 @todo Consider using SocketServer for listening socket
83 @todo Test transaction code
84
85 @var rcv_size The receive size to use for receive calls
86 @var max_pkts The max size of the receive queue
87 @var keep_alive If true, listen for echo requests and respond w/
88 echo replies
Dan Talayco710438c2010-02-18 15:16:07 -080089 @var initial_hello If true, will send a hello message immediately
90 upon connecting to the switch
Dan Talayco69ca4d62012-11-15 11:50:22 -080091 @var switch If not None, do an active connection to the switch
Dan Talayco21c75c72010-02-12 22:59:24 -080092 @var host The host to use for connect
93 @var port The port to connect on
94 @var packets_total Total number of packets received
95 @var packets_expired Number of packets popped from queue as queue full
96 @var packets_handled Number of packets handled by something
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080097 @var dbg_state Debug indication of state
Dan Talayco34089522010-02-07 23:07:41 -080098 """
99
Rich Lane4d1f3eb2013-10-03 13:45:57 -0700100 def __init__(self, switch=None, host='127.0.0.1', port=6653, max_pkts=1024):
Dan Talayco21c75c72010-02-12 22:59:24 -0800101 Thread.__init__(self)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800102 # Socket related
Dan Talayco21c75c72010-02-12 22:59:24 -0800103 self.rcv_size = RCV_SIZE_DEFAULT
Dan Talayco1b3f6902010-02-15 14:14:19 -0800104 self.listen_socket = None
105 self.switch_socket = None
106 self.switch_addr = None
Dan Talayco710438c2010-02-18 15:16:07 -0800107 self.connect_cv = Condition()
Dan Talaycoe226eb12010-02-18 23:06:30 -0800108 self.message_cv = Condition()
Rich Lanec9d3edd2013-10-09 00:21:01 -0700109 self.tx_lock = Lock()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800110
Rich Lane4dfd5e12012-12-22 19:48:01 -0800111 # Used to wake up the event loop from another thread
Rich Lanecd97d3d2013-01-07 18:50:06 -0800112 self.waker = ofutils.EventDescriptor()
Rich Lane32797542012-12-22 17:46:05 -0800113
Dan Talayco1b3f6902010-02-15 14:14:19 -0800114 # Counters
Dan Talayco21c75c72010-02-12 22:59:24 -0800115 self.socket_errors = 0
116 self.parse_errors = 0
Dan Talayco21c75c72010-02-12 22:59:24 -0800117 self.packets_total = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -0800118 self.packets_expired = 0
119 self.packets_handled = 0
Dan Talaycoe226eb12010-02-18 23:06:30 -0800120 self.poll_discards = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -0800121
122 # State
Dan Talayco21c75c72010-02-12 22:59:24 -0800123 self.sync = Lock()
124 self.handlers = {}
125 self.keep_alive = False
Dan Talayco710438c2010-02-18 15:16:07 -0800126 self.active = True
127 self.initial_hello = True
Dan Talayco1b3f6902010-02-15 14:14:19 -0800128
Rich Lanec4f071b2012-07-11 17:25:57 -0700129 # OpenFlow message/packet queue
130 # Protected by the packets_cv lock / condition variable
131 self.packets = []
132 self.packets_cv = Condition()
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700133 self.packet_in_count = 0
Rich Lanec4f071b2012-07-11 17:25:57 -0700134
Dan Talayco1b3f6902010-02-15 14:14:19 -0800135 # Settings
136 self.max_pkts = max_pkts
Dan Talayco69ca4d62012-11-15 11:50:22 -0800137 self.switch = switch
138 self.passive = not self.switch
Dan Talayco48370102010-03-03 15:17:33 -0800139 self.host = host
140 self.port = port
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800141 self.dbg_state = "init"
Dan Talayco48370102010-03-03 15:17:33 -0800142 self.logger = logging.getLogger("controller")
Dan Talaycof8de5182012-04-12 22:38:41 -0700143 self.filter_packet_in = False # Drop "excessive" packet ins
144 self.pkt_in_run = 0 # Count on run of packet ins
145 self.pkt_in_filter_limit = 50 # Count on run of packet ins
146 self.pkt_in_dropped = 0 # Total dropped packet ins
147 self.transact_to = 15 # Transact timeout default value; add to config
Dan Talayco1b3f6902010-02-15 14:14:19 -0800148
Dan Talaycoe226eb12010-02-18 23:06:30 -0800149 # Transaction and message type waiting variables
150 # xid_cv: Condition variable (semaphore) for packet waiters
Dan Talayco21c75c72010-02-12 22:59:24 -0800151 # xid: Transaction ID being waited on
152 # xid_response: Transaction response message
153 self.xid_cv = Condition()
154 self.xid = None
155 self.xid_response = None
156
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800157 self.buffered_input = ""
Dan Talaycoe226eb12010-02-18 23:06:30 -0800158
Rich Lane207502e2012-12-31 14:29:12 -0800159 # Create listen socket
160 if self.passive:
161 self.logger.info("Create/listen at " + self.host + ":" +
162 str(self.port))
163 self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
164 self.listen_socket.setsockopt(socket.SOL_SOCKET,
165 socket.SO_REUSEADDR, 1)
166 self.listen_socket.bind((self.host, self.port))
167 self.listen_socket.listen(LISTEN_QUEUE_SIZE)
168
Dan Talaycof8de5182012-04-12 22:38:41 -0700169 def filter_packet(self, rawmsg, hdr):
170 """
171 Check if packet should be filtered
172
173 Currently filters packet in messages
174 @return Boolean, True if packet should be dropped
175 """
Rich Lane1622bbb2013-03-11 17:11:53 -0700176 # XXX didn't actually check for packet-in...
177 return False
Dan Talaycof8de5182012-04-12 22:38:41 -0700178 # Add check for packet in and rate limit
179 if self.filter_packet_in:
Rich Lanec4f071b2012-07-11 17:25:57 -0700180 # If we were dropping packets, report number dropped
181 # TODO dont drop expected packet ins
182 if self.pkt_in_run > self.pkt_in_filter_limit:
183 self.logger.debug("Dropped %d packet ins (%d total)"
184 % ((self.pkt_in_run -
185 self.pkt_in_filter_limit),
186 self.pkt_in_dropped))
187 self.pkt_in_run = 0
Dan Talaycof8de5182012-04-12 22:38:41 -0700188
189 return False
190
Dan Talaycod12b6612010-03-07 22:00:46 -0800191 def _pkt_handle(self, pkt):
192 """
193 Check for all packet handling conditions
194
195 Parse and verify message
196 Check if XID matches something waiting
197 Check if message is being expected for a poll operation
198 Check if keep alive is on and message is an echo request
199 Check if any registered handler wants the packet
200 Enqueue if none of those conditions is met
201
202 an echo request in case keep_alive is true, followed by
203 registered message handlers.
Glen Gibb6d467062010-07-08 16:15:08 -0700204 @param pkt The raw packet (string) which may contain multiple OF msgs
Dan Talaycod12b6612010-03-07 22:00:46 -0800205 """
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800206
207 # snag any left over data from last read()
208 pkt = self.buffered_input + pkt
209 self.buffered_input = ""
210
Glen Gibb6d467062010-07-08 16:15:08 -0700211 # Process each of the OF msgs inside the pkt
212 offset = 0
213 while offset < len(pkt):
Rich Lane1622bbb2013-03-11 17:11:53 -0700214 if offset + 8 > len(pkt):
215 break
216
Glen Gibb6d467062010-07-08 16:15:08 -0700217 # Parse the header to get type
Wilson Ngc11a9182013-10-28 16:02:03 -0700218 hdr_version, hdr_type, hdr_length, hdr_xid = cfg_ofp.message.parse_header(pkt[offset:])
219
220 # Use loxi to resolve to ofp of matching version
221 ofp = loxi.protocol(hdr_version)
Dan Talaycod12b6612010-03-07 22:00:46 -0800222
Glen Gibb6d467062010-07-08 16:15:08 -0700223 # Extract the raw message bytes
Rich Lane1622bbb2013-03-11 17:11:53 -0700224 if (offset + hdr_length) > len(pkt):
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800225 break
Rich Lane1622bbb2013-03-11 17:11:53 -0700226 rawmsg = pkt[offset : offset + hdr_length]
227 offset += hdr_length
Dan Talaycof8de5182012-04-12 22:38:41 -0700228
Rich Lane1622bbb2013-03-11 17:11:53 -0700229 #if self.filter_packet(rawmsg, hdr):
230 # continue
Dan Talaycof8de5182012-04-12 22:38:41 -0700231
Wilson Ngc11a9182013-10-28 16:02:03 -0700232 # Check that supported version on switch is as least as recent as
233 # the configured Openflow version in oftests
Wilson Ngc11a9182013-10-28 16:02:03 -0700234 if hdr_version < cfg_ofp.OFP_VERSION:
Rich Lanec44b6242013-01-10 12:23:54 -0800235 self.logger.error("Switch only supports up to OpenFlow version %d (OFTest version is %d)",
Wilson Ngc11a9182013-10-28 16:02:03 -0700236 hdr_version, cfg_ofp.OFP_VERSION)
Rich Lanec44b6242013-01-10 12:23:54 -0800237 print "Switch only supports up to OpenFlow version %d (OFTest version is %d)" % \
Wilson Ngc11a9182013-10-28 16:02:03 -0700238 (hdr_version, cfg_ofp.OFP_VERSION)
Ken Chiangadc950f2012-10-05 13:50:03 -0700239 self.disconnect()
Dan Talaycof8de5182012-04-12 22:38:41 -0700240 return
Dan Talaycod12b6612010-03-07 22:00:46 -0800241
Rich Lanef6883512013-03-11 17:00:09 -0700242 msg = ofp.message.parse_message(rawmsg)
Glen Gibb6d467062010-07-08 16:15:08 -0700243 if not msg:
244 self.parse_errors += 1
245 self.logger.warn("Could not parse message")
246 continue
247
Rich Lane1fd43e32014-01-06 15:22:50 -0800248 self.logger.debug("Msg in: version %d class %s len %d xid %d",
249 hdr_version, type(msg).__name__, hdr_length, hdr_xid)
250
Rich Lanec4f071b2012-07-11 17:25:57 -0700251 with self.sync:
252 # Check if transaction is waiting
253 with self.xid_cv:
Rich Lane1622bbb2013-03-11 17:11:53 -0700254 if self.xid and hdr_xid == self.xid:
255 self.logger.debug("Matched expected XID " + str(hdr_xid))
Rich Lanec4f071b2012-07-11 17:25:57 -0700256 self.xid_response = (msg, rawmsg)
257 self.xid = None
258 self.xid_cv.notify()
259 continue
Glen Gibb6d467062010-07-08 16:15:08 -0700260
Rich Lanec4f071b2012-07-11 17:25:57 -0700261 # Check if keep alive is set; if so, respond to echo requests
262 if self.keep_alive:
Rich Lane1622bbb2013-03-11 17:11:53 -0700263 if hdr_type == ofp.OFPT_ECHO_REQUEST:
Rich Lanec4f071b2012-07-11 17:25:57 -0700264 self.logger.debug("Responding to echo request")
Rich Lane78ef8b92013-01-10 12:19:23 -0800265 rep = ofp.message.echo_reply()
Rich Lane1622bbb2013-03-11 17:11:53 -0700266 rep.xid = hdr_xid
Rich Lanec4f071b2012-07-11 17:25:57 -0700267 # Ignoring additional data
Rich Lane1fd43e32014-01-06 15:22:50 -0800268 self.message_send(rep)
Rich Lanec4f071b2012-07-11 17:25:57 -0700269 continue
Glen Gibb6d467062010-07-08 16:15:08 -0700270
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700271 # Generalize to counters for all packet types?
272 if msg.type == ofp.OFPT_PACKET_IN:
273 self.packet_in_count += 1
274
Rich Lane5d63b9c2013-01-11 14:12:37 -0800275 # Log error messages
Rich Lane1622bbb2013-03-11 17:11:53 -0700276 if hdr_type == ofp.OFPT_ERROR:
Rich Laneb73808c2013-03-11 15:22:23 -0700277 if msg.err_type in ofp.ofp_error_type_map:
278 type_str = ofp.ofp_error_type_map[msg.err_type]
279 if msg.err_type == ofp.OFPET_HELLO_FAILED:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800280 code_map = ofp.ofp_hello_failed_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700281 elif msg.err_type == ofp.OFPET_BAD_REQUEST:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800282 code_map = ofp.ofp_bad_request_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700283 elif msg.err_type == ofp.OFPET_BAD_ACTION:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800284 code_map = ofp.ofp_bad_action_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700285 elif msg.err_type == ofp.OFPET_FLOW_MOD_FAILED:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800286 code_map = ofp.ofp_flow_mod_failed_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700287 elif msg.err_type == ofp.OFPET_PORT_MOD_FAILED:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800288 code_map = ofp.ofp_port_mod_failed_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700289 elif msg.err_type == ofp.OFPET_QUEUE_OP_FAILED:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800290 code_map = ofp.ofp_queue_op_failed_code_map
291 else:
292 code_map = None
293
294 if code_map and msg.code in code_map:
295 code_str = code_map[msg.code]
296 else:
297 code_str = "unknown"
298 else:
299 type_str = "unknown"
Rich Lane1879dc72013-03-11 22:08:51 -0700300 code_str = "unknown"
Rich Lane5d63b9c2013-01-11 14:12:37 -0800301 self.logger.warn("Received error message: xid=%d type=%s (%d) code=%s (%d)",
Rich Lane1622bbb2013-03-11 17:11:53 -0700302 hdr_xid, type_str, msg.err_type, code_str, msg.code)
Rich Lane5d63b9c2013-01-11 14:12:37 -0800303
Rich Lanec4f071b2012-07-11 17:25:57 -0700304 # Now check for message handlers; preference is given to
305 # handlers for a specific packet
306 handled = False
Rich Lane1622bbb2013-03-11 17:11:53 -0700307 if hdr_type in self.handlers.keys():
308 handled = self.handlers[hdr_type](self, msg, rawmsg)
Rich Lanec4f071b2012-07-11 17:25:57 -0700309 if not handled and ("all" in self.handlers.keys()):
310 handled = self.handlers["all"](self, msg, rawmsg)
Glen Gibb6d467062010-07-08 16:15:08 -0700311
Rich Lanec4f071b2012-07-11 17:25:57 -0700312 if not handled: # Not handled, enqueue
Rich Lane1879dc72013-03-11 22:08:51 -0700313 self.logger.debug("Enqueuing pkt type %s (%d)",
314 ofp.ofp_type_map.get(hdr_type, "unknown"),
315 hdr_type)
Rich Lanec4f071b2012-07-11 17:25:57 -0700316 with self.packets_cv:
317 if len(self.packets) >= self.max_pkts:
318 self.packets.pop(0)
319 self.packets_expired += 1
320 self.packets.append((msg, rawmsg))
321 self.packets_cv.notify_all()
322 self.packets_total += 1
323 else:
324 self.packets_handled += 1
325 self.logger.debug("Message handled by callback")
Glen Gibb6d467062010-07-08 16:15:08 -0700326
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800327 # end of 'while offset < len(pkt)'
328 # note that if offset = len(pkt), this is
329 # appends a harmless empty string
330 self.buffered_input += pkt[offset:]
Dan Talaycod12b6612010-03-07 22:00:46 -0800331
Dan Talayco710438c2010-02-18 15:16:07 -0800332 def _socket_ready_handle(self, s):
333 """
334 Handle an input-ready socket
Dan Talaycof8de5182012-04-12 22:38:41 -0700335
Dan Talayco710438c2010-02-18 15:16:07 -0800336 @param s The socket object that is ready
Dan Talaycof8de5182012-04-12 22:38:41 -0700337 @returns 0 on success, -1 on error
Dan Talayco710438c2010-02-18 15:16:07 -0800338 """
339
Dan Talayco69ca4d62012-11-15 11:50:22 -0800340 if self.passive and s and s == self.listen_socket:
Dan Talayco710438c2010-02-18 15:16:07 -0800341 if self.switch_socket:
Rich Lanee1da7ea2012-07-26 15:58:45 -0700342 self.logger.warning("Ignoring incoming connection; already connected to switch")
Rich Laneb4f8ecb2012-09-25 09:36:26 -0700343 (sock, addr) = self.listen_socket.accept()
344 sock.close()
Rich Lanee1da7ea2012-07-26 15:58:45 -0700345 return 0
Dan Talayco710438c2010-02-18 15:16:07 -0800346
Ken Chiange875baf2012-10-09 15:24:40 -0700347 try:
348 (sock, addr) = self.listen_socket.accept()
349 except:
350 self.logger.warning("Error on listen socket accept")
351 return -1
Ken Chiang77173992012-10-30 15:44:39 -0700352 self.logger.info(self.host+":"+str(self.port)+": Incoming connection from "+str(addr))
Rich Lanee1da7ea2012-07-26 15:58:45 -0700353
Rich Laneee3586c2012-07-11 17:26:02 -0700354 with self.connect_cv:
Rich Lanee1da7ea2012-07-26 15:58:45 -0700355 (self.switch_socket, self.switch_addr) = (sock, addr)
Rich Lane82ef1832012-12-22 17:04:35 -0800356 self.switch_socket.setsockopt(socket.IPPROTO_TCP,
357 socket.TCP_NODELAY, True)
Rich Lane1a8d5aa2012-10-08 15:40:03 -0700358 if self.initial_hello:
Wilson Ngc11a9182013-10-28 16:02:03 -0700359 self.message_send(cfg_ofp.message.hello())
Rich Lanee1da7ea2012-07-26 15:58:45 -0700360 self.connect_cv.notify() # Notify anyone waiting
Rich Laned929b8d2013-04-15 15:59:14 -0700361
362 # Prevent further connections
363 self.listen_socket.close()
364 self.listen_socket = None
Dan Talaycof8de5182012-04-12 22:38:41 -0700365 elif s and s == self.switch_socket:
366 for idx in range(3): # debug: try a couple of times
367 try:
368 pkt = self.switch_socket.recv(self.rcv_size)
369 except:
370 self.logger.warning("Error on switch read")
371 return -1
372
373 if not self.active:
374 return 0
375
376 if len(pkt) == 0:
377 self.logger.warning("Zero-length switch read, %d" % idx)
378 else:
379 break
Dan Talayco710438c2010-02-18 15:16:07 -0800380
Dan Talaycof8de5182012-04-12 22:38:41 -0700381 if len(pkt) == 0: # Still no packet
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700382 self.logger.warning("Zero-length switch read; closing cxn")
Dan Talaycof8de5182012-04-12 22:38:41 -0700383 self.logger.info(str(self))
384 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800385
Dan Talaycod12b6612010-03-07 22:00:46 -0800386 self._pkt_handle(pkt)
Rich Lane4dfd5e12012-12-22 19:48:01 -0800387 elif s and s == self.waker:
388 self.waker.wait()
Dan Talayco710438c2010-02-18 15:16:07 -0800389 else:
Dan Talayco48370102010-03-03 15:17:33 -0800390 self.logger.error("Unknown socket ready: " + str(s))
Dan Talaycof8de5182012-04-12 22:38:41 -0700391 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800392
Dan Talaycof8de5182012-04-12 22:38:41 -0700393 return 0
Dan Talayco710438c2010-02-18 15:16:07 -0800394
Dan Talayco69ca4d62012-11-15 11:50:22 -0800395 def active_connect(self):
396 """
397 Actively connect to a switch IP addr
398 """
399 try:
400 self.logger.info("Trying active connection to %s" % self.switch)
401 soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
402 soc.connect((self.switch, self.port))
403 self.logger.info("Connected to " + self.switch + " on " +
404 str(self.port))
Rich Lane82ef1832012-12-22 17:04:35 -0800405 soc.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800406 self.switch_addr = (self.switch, self.port)
407 return soc
408 except (StandardError, socket.error), e:
409 self.logger.error("Could not connect to %s at %d:: %s" %
410 (self.switch, self.port, str(e)))
411 return None
412
Rich Lane32797542012-12-22 17:46:05 -0800413 def wakeup(self):
414 """
415 Wake up the event loop, presumably from another thread.
416 """
Rich Lane4dfd5e12012-12-22 19:48:01 -0800417 self.waker.notify()
Rich Lane32797542012-12-22 17:46:05 -0800418
419 def sockets(self):
420 """
421 Return list of sockets to select on.
422 """
Rich Lane4dfd5e12012-12-22 19:48:01 -0800423 socs = [self.listen_socket, self.switch_socket, self.waker]
Rich Lane32797542012-12-22 17:46:05 -0800424 return [x for x in socs if x]
425
Dan Talayco1b3f6902010-02-15 14:14:19 -0800426 def run(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800427 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800428 Activity function for class
Dan Talayco21c75c72010-02-12 22:59:24 -0800429
Dan Talayco1b3f6902010-02-15 14:14:19 -0800430 Assumes connection to switch already exists. Listens on
431 switch_socket for messages until an error (or zero len pkt)
432 occurs.
Dan Talayco21c75c72010-02-12 22:59:24 -0800433
Dan Talayco1b3f6902010-02-15 14:14:19 -0800434 When there is a message on the socket, check for handlers; queue the
435 packet if no one handles the packet.
436
437 See note for controller describing the limitation of a single
438 connection for now.
439 """
440
Rich Lane207502e2012-12-31 14:29:12 -0800441 self.dbg_state = "running"
Ken Chiangadc950f2012-10-05 13:50:03 -0700442
Dan Talayco710438c2010-02-18 15:16:07 -0800443 while self.active:
Dan Talayco710438c2010-02-18 15:16:07 -0800444 try:
445 sel_in, sel_out, sel_err = \
Rich Lane32797542012-12-22 17:46:05 -0800446 select.select(self.sockets(), [], self.sockets(), 1)
Dan Talayco710438c2010-02-18 15:16:07 -0800447 except:
448 print sys.exc_info()
Ken Chiangadc950f2012-10-05 13:50:03 -0700449 self.logger.error("Select error, disconnecting")
450 self.disconnect()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800451
Dan Talayco710438c2010-02-18 15:16:07 -0800452 for s in sel_err:
Ken Chiangadc950f2012-10-05 13:50:03 -0700453 self.logger.error("Got socket error on: " + str(s) + ", disconnecting")
454 self.disconnect()
Dan Talaycof8de5182012-04-12 22:38:41 -0700455
456 for s in sel_in:
457 if self._socket_ready_handle(s) == -1:
Ken Chiangadc950f2012-10-05 13:50:03 -0700458 self.disconnect()
Dan Talayco710438c2010-02-18 15:16:07 -0800459
Dan Talayco710438c2010-02-18 15:16:07 -0800460 # End of main loop
461 self.dbg_state = "closing"
Dan Talayco48370102010-03-03 15:17:33 -0800462 self.logger.info("Exiting controller thread")
Dan Talayco710438c2010-02-18 15:16:07 -0800463 self.shutdown()
464
Rich Lane8806bc42012-07-26 19:18:37 -0700465 def connect(self, timeout=-1):
Dan Talayco710438c2010-02-18 15:16:07 -0800466 """
467 Connect to the switch
468
Rich Lane8806bc42012-07-26 19:18:37 -0700469 @param timeout Block for up to timeout seconds. Pass -1 for the default.
Dan Talayco710438c2010-02-18 15:16:07 -0800470 @return Boolean, True if connected
471 """
472
Dan Talayco69ca4d62012-11-15 11:50:22 -0800473 if not self.passive: # Do active connection now
474 self.logger.info("Attempting to connect to %s on port %s" %
475 (self.switch, str(self.port)))
476 soc = self.active_connect()
477 if soc:
478 self.logger.info("Connected to %s", self.switch)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800479 self.dbg_state = "running"
480 self.switch_socket = soc
Rich Lane32797542012-12-22 17:46:05 -0800481 self.wakeup()
Dan Talayco69ca4d62012-11-15 11:50:22 -0800482 with self.connect_cv:
483 if self.initial_hello:
Wilson Ngc11a9182013-10-28 16:02:03 -0700484 self.message_send(cfg_ofp.message.hello())
Dan Talayco69ca4d62012-11-15 11:50:22 -0800485 self.connect_cv.notify() # Notify anyone waiting
486 else:
487 self.logger.error("Could not actively connect to switch %s",
488 self.switch)
489 self.active = False
490 else:
491 with self.connect_cv:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800492 ofutils.timed_wait(self.connect_cv, lambda: self.switch_socket,
493 timeout=timeout)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800494
Dan Talayco710438c2010-02-18 15:16:07 -0800495 return self.switch_socket is not None
496
Ken Chiangadc950f2012-10-05 13:50:03 -0700497 def disconnect(self, timeout=-1):
498 """
499 If connected to a switch, disconnect.
500 """
501 if self.switch_socket:
Ken Chiangadc950f2012-10-05 13:50:03 -0700502 self.switch_socket.close()
503 self.switch_socket = None
504 self.switch_addr = None
Ken Chiang74be4722012-12-21 13:07:03 -0800505 with self.packets_cv:
506 self.packets = []
Ken Chiange875baf2012-10-09 15:24:40 -0700507 with self.connect_cv:
508 self.connect_cv.notifyAll()
Ken Chiangadc950f2012-10-05 13:50:03 -0700509
510 def wait_disconnected(self, timeout=-1):
511 """
512 @param timeout Block for up to timeout seconds. Pass -1 for the default.
513 @return Boolean, True if disconnected
514 """
515
Ken Chiange875baf2012-10-09 15:24:40 -0700516 with self.connect_cv:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800517 ofutils.timed_wait(self.connect_cv,
518 lambda: True if not self.switch_socket else None,
519 timeout=timeout)
Ken Chiangadc950f2012-10-05 13:50:03 -0700520 return self.switch_socket is None
521
Dan Talayco710438c2010-02-18 15:16:07 -0800522 def kill(self):
523 """
524 Force the controller thread to quit
Dan Talayco710438c2010-02-18 15:16:07 -0800525 """
526 self.active = False
Rich Lane32797542012-12-22 17:46:05 -0800527 self.wakeup()
Rich Lane376bb402012-12-31 15:20:16 -0800528 self.join()
Dan Talayco21c75c72010-02-12 22:59:24 -0800529
Dan Talayco1b3f6902010-02-15 14:14:19 -0800530 def shutdown(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800531 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800532 Shutdown the controller closing all sockets
Dan Talayco21c75c72010-02-12 22:59:24 -0800533
Dan Talayco1b3f6902010-02-15 14:14:19 -0800534 @todo Might want to synchronize shutdown with self.sync...
535 """
Dan Talaycof8de5182012-04-12 22:38:41 -0700536
Dan Talayco710438c2010-02-18 15:16:07 -0800537 self.active = False
538 try:
539 self.switch_socket.shutdown(socket.SHUT_RDWR)
540 except:
Dan Talayco48370102010-03-03 15:17:33 -0800541 self.logger.info("Ignoring switch soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800542 self.switch_socket = None
Dan Talayco1b3f6902010-02-15 14:14:19 -0800543
Dan Talayco710438c2010-02-18 15:16:07 -0800544 try:
545 self.listen_socket.shutdown(socket.SHUT_RDWR)
546 except:
Dan Talayco48370102010-03-03 15:17:33 -0800547 self.logger.info("Ignoring listen soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800548 self.listen_socket = None
Dan Talaycof8de5182012-04-12 22:38:41 -0700549
Rich Laneee3586c2012-07-11 17:26:02 -0700550 # Wakeup condition variables on which controller may be wait
551 with self.xid_cv:
552 self.xid_cv.notifyAll()
Dan Talaycof8de5182012-04-12 22:38:41 -0700553
Rich Laneee3586c2012-07-11 17:26:02 -0700554 with self.connect_cv:
555 self.connect_cv.notifyAll()
Dan Talaycof8de5182012-04-12 22:38:41 -0700556
Rich Lane32797542012-12-22 17:46:05 -0800557 self.wakeup()
Dan Talayco710438c2010-02-18 15:16:07 -0800558 self.dbg_state = "down"
559
Dan Talayco34089522010-02-07 23:07:41 -0800560 def register(self, msg_type, handler):
561 """
562 Register a callback to receive a specific message type.
563
564 Only one handler may be registered for a given message type.
Dan Talaycod12b6612010-03-07 22:00:46 -0800565
566 WARNING: A lock is held during the handler call back, so
567 the handler should not make any blocking calls
568
Dan Talayco34089522010-02-07 23:07:41 -0800569 @param msg_type The type of message to receive. May be DEFAULT
Dan Talayco21c75c72010-02-12 22:59:24 -0800570 for all non-handled packets. The special type, the string "all"
571 will send all packets to the handler.
Dan Talayco34089522010-02-07 23:07:41 -0800572 @param handler The function to call when a message of the given
573 type is received.
574 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800575 # Should check type is valid
576 if not handler and msg_type in self.handlers.keys():
577 del self.handlers[msg_type]
578 return
579 self.handlers[msg_type] = handler
Dan Talayco34089522010-02-07 23:07:41 -0800580
sumithdev095542cf52013-07-12 14:56:28 -0400581 def poll(self, exp_msg=None, timeout=-1):
Dan Talayco34089522010-02-07 23:07:41 -0800582 """
583 Wait for the next OF message received from the switch.
584
585 @param exp_msg If set, return only when this type of message
Dan Talayco48370102010-03-03 15:17:33 -0800586 is received (unless timeout occurs).
Rich Laneb64ce3d2012-07-26 15:37:57 -0700587
588 @param timeout Maximum number of seconds to wait for the message.
589 Pass -1 for the default timeout.
Dan Talayco34089522010-02-07 23:07:41 -0800590
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800591 @retval A pair (msg, pkt) where msg is a message object and pkt
592 the string representing the packet as received from the socket.
593 This allows additional parsing by the receiver if necessary.
594
Dan Talayco34089522010-02-07 23:07:41 -0800595 The data members in the message are in host endian order.
Dan Talayco48370102010-03-03 15:17:33 -0800596 If an error occurs, (None, None) is returned
Dan Talayco34089522010-02-07 23:07:41 -0800597 """
Dan Talayco34089522010-02-07 23:07:41 -0800598
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700599 exp_msg_str = "unspecified"
sumithdev095542cf52013-07-12 14:56:28 -0400600 if exp_msg is not None:
Wilson Ngc11a9182013-10-28 16:02:03 -0700601 exp_msg_str = cfg_ofp.ofp_type_map.get(exp_msg, "unknown (%d)" %
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700602 exp_msg)
Rich Lane1879dc72013-03-11 22:08:51 -0700603
sumithdev095542cf52013-07-12 14:56:28 -0400604 if exp_msg is not None:
Rich Lane1879dc72013-03-11 22:08:51 -0700605 self.logger.debug("Poll for %s", exp_msg_str)
Ed Swierk9e55e282012-08-22 06:57:28 -0700606 else:
607 self.logger.debug("Poll for any OF message")
Rich Laneb64ce3d2012-07-26 15:37:57 -0700608
609 # Take the packet from the queue
Rich Lanec4f071b2012-07-11 17:25:57 -0700610 def grab():
611 if len(self.packets) > 0:
sumithdev095542cf52013-07-12 14:56:28 -0400612 if exp_msg is None:
Rich Lanec4f071b2012-07-11 17:25:57 -0700613 self.logger.debug("Looking for any packet")
614 (msg, pkt) = self.packets.pop(0)
615 return (msg, pkt)
616 else:
Rich Lane1879dc72013-03-11 22:08:51 -0700617 self.logger.debug("Looking for %s", exp_msg_str)
Rich Lanec4f071b2012-07-11 17:25:57 -0700618 for i in range(len(self.packets)):
619 msg = self.packets[i][0]
Wilson Ngc11a9182013-10-28 16:02:03 -0700620 msg_str = cfg_ofp.ofp_type_map.get(msg.type, "unknown (%d)" % msg.type)
Rich Lane7094ff12013-05-07 14:57:53 -0700621 self.logger.debug("Checking packets[%d] %s) against %s", i, msg_str, exp_msg_str)
Rich Laneb73808c2013-03-11 15:22:23 -0700622 if msg.type == exp_msg:
Rich Lanec4f071b2012-07-11 17:25:57 -0700623 (msg, pkt) = self.packets.pop(i)
624 return (msg, pkt)
625 # Not found
626 self.logger.debug("Packet not in queue")
Rich Laneb64ce3d2012-07-26 15:37:57 -0700627 return None
Dan Talayco21c75c72010-02-12 22:59:24 -0800628
Rich Lanec4f071b2012-07-11 17:25:57 -0700629 with self.packets_cv:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800630 ret = ofutils.timed_wait(self.packets_cv, grab, timeout=timeout)
Rich Lanec4f071b2012-07-11 17:25:57 -0700631
Rich Laneb64ce3d2012-07-26 15:37:57 -0700632 if ret != None:
633 (msg, pkt) = ret
634 self.logger.debug("Got message %s" % str(msg))
635 return (msg, pkt)
636 else:
637 return (None, None)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800638
Rich Lane8fbfd662013-03-11 15:30:44 -0700639 def transact(self, msg, timeout=-1):
Dan Talayco34089522010-02-07 23:07:41 -0800640 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800641 Run a message transaction with the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800642
643 Send the message in msg and wait for a reply with a matching
Dan Talayco21c75c72010-02-12 22:59:24 -0800644 transaction id. Transactions have the highest priority in
645 received message handling.
Dan Talaycoe37999f2010-02-09 15:27:12 -0800646
Dan Talayco21c75c72010-02-12 22:59:24 -0800647 @param msg The message object to send; must not be a string
Rich Lanee1da7ea2012-07-26 15:58:45 -0700648 @param timeout The timeout in seconds; if -1 use default.
Dan Talayco34089522010-02-07 23:07:41 -0800649 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800650
Rich Lane8fbfd662013-03-11 15:30:44 -0700651 if msg.xid == None:
Rich Laneb73808c2013-03-11 15:22:23 -0700652 msg.xid = ofutils.gen_xid()
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800653
Rich Laneb73808c2013-03-11 15:22:23 -0700654 self.logger.debug("Running transaction %d" % msg.xid)
Dan Talayco21c75c72010-02-12 22:59:24 -0800655
Rich Lane9aca1992012-07-11 17:26:31 -0700656 with self.xid_cv:
657 if self.xid:
658 self.logger.error("Can only run one transaction at a time")
659 return (None, None)
Dan Talaycof8de5182012-04-12 22:38:41 -0700660
Rich Laneb73808c2013-03-11 15:22:23 -0700661 self.xid = msg.xid
Dan Talaycod12b6612010-03-07 22:00:46 -0800662 self.xid_response = None
Rich Lane1fd43e32014-01-06 15:22:50 -0800663 self.message_send(msg)
Rich Lane9aca1992012-07-11 17:26:31 -0700664
Rich Laneb73808c2013-03-11 15:22:23 -0700665 self.logger.debug("Waiting for transaction %d" % msg.xid)
Rich Lanecd97d3d2013-01-07 18:50:06 -0800666 ofutils.timed_wait(self.xid_cv, lambda: self.xid_response, timeout=timeout)
Rich Lane9aca1992012-07-11 17:26:31 -0700667
668 if self.xid_response:
669 (resp, pkt) = self.xid_response
670 self.xid_response = None
671 else:
672 (resp, pkt) = (None, None)
673
Dan Talayco09c2c592010-05-13 14:21:52 -0700674 if resp is None:
675 self.logger.warning("No response for xid " + str(self.xid))
676 return (resp, pkt)
Dan Talayco34089522010-02-07 23:07:41 -0800677
Rich Lane8fbfd662013-03-11 15:30:44 -0700678 def message_send(self, msg):
Dan Talayco34089522010-02-07 23:07:41 -0800679 """
680 Send the message to the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800681
Dan Talayco11c26e72010-03-07 22:03:57 -0800682 @param msg A string or OpenFlow message object to be forwarded to
683 the switch.
Dan Talayco21c75c72010-02-12 22:59:24 -0800684 """
685
Dan Talayco1b3f6902010-02-15 14:14:19 -0800686 if not self.switch_socket:
687 # Sending a string indicates the message is ready to go
Ed Swierk9e55e282012-08-22 06:57:28 -0700688 raise Exception("no socket")
Dan Talayco21c75c72010-02-12 22:59:24 -0800689
Rich Lane1fd43e32014-01-06 15:22:50 -0800690 if msg.xid == None:
691 msg.xid = ofutils.gen_xid()
692
693 outpkt = msg.pack()
694
695 self.logger.debug("Msg out: version %d class %s len %d xid %d",
696 msg.version, type(msg).__name__, len(outpkt), msg.xid)
Rich Lanec9d3edd2013-10-09 00:21:01 -0700697
698 with self.tx_lock:
699 if self.switch_socket.sendall(outpkt) is not None:
700 raise AssertionError("failed to send message to switch")
Dan Talayco710438c2010-02-18 15:16:07 -0800701
Rich Lane5c3151c2013-01-03 17:15:41 -0800702 return 0 # for backwards compatibility
Dan Talayco21c75c72010-02-12 22:59:24 -0800703
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700704 def clear_queue(self):
705 """
706 Clear the input queue and report the number of messages
707 that were in it
708 """
Dan Talayco7071cf12013-04-16 11:02:13 -0700709 enqueued_pkt_count = len(self.packets)
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700710 with self.packets_cv:
711 self.packets = []
Dan Talayco7071cf12013-04-16 11:02:13 -0700712 return enqueued_pkt_count
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700713
Dan Talayco21c75c72010-02-12 22:59:24 -0800714 def __str__(self):
715 string = "Controller:\n"
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800716 string += " state " + self.dbg_state + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800717 string += " switch_addr " + str(self.switch_addr) + "\n"
718 string += " pending pkts " + str(len(self.packets)) + "\n"
719 string += " total pkts " + str(self.packets_total) + "\n"
720 string += " expired pkts " + str(self.packets_expired) + "\n"
721 string += " handled pkts " + str(self.packets_handled) + "\n"
Dan Talaycoe226eb12010-02-18 23:06:30 -0800722 string += " poll discards " + str(self.poll_discards) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800723 string += " parse errors " + str(self.parse_errors) + "\n"
724 string += " sock errrors " + str(self.socket_errors) + "\n"
725 string += " max pkts " + str(self.max_pkts) + "\n"
Dan Talayco69ca4d62012-11-15 11:50:22 -0800726 string += " target switch " + str(self.switch) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800727 string += " host " + str(self.host) + "\n"
728 string += " port " + str(self.port) + "\n"
729 string += " keep_alive " + str(self.keep_alive) + "\n"
Dan Talaycof8de5182012-04-12 22:38:41 -0700730 string += " pkt_in_run " + str(self.pkt_in_run) + "\n"
731 string += " pkt_in_dropped " + str(self.pkt_in_dropped) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800732 return string
733
734 def show(self):
735 print str(self)
736
737def sample_handler(controller, msg, pkt):
738 """
739 Sample message handler
740
741 This is the prototype for functions registered with the controller
742 class for packet reception
743
744 @param controller The controller calling the handler
745 @param msg The parsed message object
746 @param pkt The raw packet that was received on the socket. This is
747 in case the packet contains extra unparsed data.
748 @returns Boolean value indicating if the packet was handled. If
749 not handled, the packet is placed in the queue for pollers to received
750 """
751 pass