blob: 1d911bc7acc33968cfb83c376285eda4094f983b [file] [log] [blame]
Dan Talayco34089522010-02-07 23:07:41 -08001"""
2OpenFlow Test Framework
3
4Controller class
5
6Provide the interface to the control channel to the switch under test.
7
8Class inherits from thread so as to run in background allowing
9asynchronous callbacks (if needed, not required). Also supports
10polling.
11
12The controller thread maintains a queue. Incoming messages that
13are not handled by a callback function are placed in this queue for
14poll calls.
15
16Callbacks and polling support specifying the message type
17
18@todo Support transaction semantics via xid
Dan Talayco1b3f6902010-02-15 14:14:19 -080019@todo Support select and listen on an administrative socket (or
20use a timeout to support clean shutdown).
21
22Currently only one connection is accepted during the life of
23the controller. There seems
24to be no clean way to interrupt an accept call. Using select that also listens
25on an administrative socket and can shut down the socket might work.
26
Dan Talayco34089522010-02-07 23:07:41 -080027"""
28
Rich Lane720eaf22013-08-09 18:00:45 -070029import sys
Dan Talayco34089522010-02-07 23:07:41 -080030import os
31import socket
32import time
Rich Lanecd97d3d2013-01-07 18:50:06 -080033import struct
34import select
35import logging
Dan Talayco34089522010-02-07 23:07:41 -080036from threading import Thread
37from threading import Lock
Dan Talayco21c75c72010-02-12 22:59:24 -080038from threading import Condition
Rich Lane9fd05682013-01-10 15:30:38 -080039import ofp
Rich Lanecd97d3d2013-01-07 18:50:06 -080040import ofutils
Dan Talayco48370102010-03-03 15:17:33 -080041
Dan Talaycof8de5182012-04-12 22:38:41 -070042
43FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.'
44 for x in range(256)])
45
46def hex_dump_buffer(src, length=16):
47 """
48 Convert src to a hex dump string and return the string
49 @param src The source buffer
50 @param length The number of bytes shown in each line
51 @returns A string showing the hex dump
52 """
53 result = ["\n"]
54 for i in xrange(0, len(src), length):
55 chars = src[i:i+length]
56 hex = ' '.join(["%02x" % ord(x) for x in chars])
57 printable = ''.join(["%s" % ((ord(x) <= 127 and
58 FILTER[ord(x)]) or '.') for x in chars])
59 result.append("%04x %-*s %s\n" % (i, length*3, hex, printable))
60 return ''.join(result)
61
Dan Talayco48370102010-03-03 15:17:33 -080062##@todo Find a better home for these identifiers (controller)
Glen Gibb741b1182010-07-08 16:43:58 -070063RCV_SIZE_DEFAULT = 32768
Dan Talayco48370102010-03-03 15:17:33 -080064LISTEN_QUEUE_SIZE = 1
Dan Talayco34089522010-02-07 23:07:41 -080065
66class Controller(Thread):
67 """
68 Class abstracting the control interface to the switch.
69
70 For receiving messages, two mechanism will be implemented. First,
71 query the interface with poll. Second, register to have a
72 function called by message type. The callback is passed the
73 message type as well as the raw packet (or message object)
74
75 One of the main purposes of this object is to translate between network
76 and host byte order. 'Above' this object, things should be in host
77 byte order.
Dan Talayco21c75c72010-02-12 22:59:24 -080078
79 @todo Consider using SocketServer for listening socket
80 @todo Test transaction code
81
82 @var rcv_size The receive size to use for receive calls
83 @var max_pkts The max size of the receive queue
84 @var keep_alive If true, listen for echo requests and respond w/
85 echo replies
Dan Talayco710438c2010-02-18 15:16:07 -080086 @var initial_hello If true, will send a hello message immediately
87 upon connecting to the switch
Dan Talayco69ca4d62012-11-15 11:50:22 -080088 @var switch If not None, do an active connection to the switch
Dan Talayco21c75c72010-02-12 22:59:24 -080089 @var host The host to use for connect
90 @var port The port to connect on
91 @var packets_total Total number of packets received
92 @var packets_expired Number of packets popped from queue as queue full
93 @var packets_handled Number of packets handled by something
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080094 @var dbg_state Debug indication of state
Dan Talayco34089522010-02-07 23:07:41 -080095 """
96
Dan Talayco69ca4d62012-11-15 11:50:22 -080097 def __init__(self, switch=None, host='127.0.0.1', port=6633, max_pkts=1024):
Dan Talayco21c75c72010-02-12 22:59:24 -080098 Thread.__init__(self)
Dan Talayco1b3f6902010-02-15 14:14:19 -080099 # Socket related
Dan Talayco21c75c72010-02-12 22:59:24 -0800100 self.rcv_size = RCV_SIZE_DEFAULT
Dan Talayco1b3f6902010-02-15 14:14:19 -0800101 self.listen_socket = None
102 self.switch_socket = None
103 self.switch_addr = None
Dan Talayco710438c2010-02-18 15:16:07 -0800104 self.connect_cv = Condition()
Dan Talaycoe226eb12010-02-18 23:06:30 -0800105 self.message_cv = Condition()
Rich Lanec9d3edd2013-10-09 00:21:01 -0700106 self.tx_lock = Lock()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800107
Rich Lane4dfd5e12012-12-22 19:48:01 -0800108 # Used to wake up the event loop from another thread
Rich Lanecd97d3d2013-01-07 18:50:06 -0800109 self.waker = ofutils.EventDescriptor()
Rich Lane32797542012-12-22 17:46:05 -0800110
Dan Talayco1b3f6902010-02-15 14:14:19 -0800111 # Counters
Dan Talayco21c75c72010-02-12 22:59:24 -0800112 self.socket_errors = 0
113 self.parse_errors = 0
Dan Talayco21c75c72010-02-12 22:59:24 -0800114 self.packets_total = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -0800115 self.packets_expired = 0
116 self.packets_handled = 0
Dan Talaycoe226eb12010-02-18 23:06:30 -0800117 self.poll_discards = 0
Dan Talayco1b3f6902010-02-15 14:14:19 -0800118
119 # State
Dan Talayco21c75c72010-02-12 22:59:24 -0800120 self.sync = Lock()
121 self.handlers = {}
122 self.keep_alive = False
Dan Talayco710438c2010-02-18 15:16:07 -0800123 self.active = True
124 self.initial_hello = True
Dan Talayco1b3f6902010-02-15 14:14:19 -0800125
Rich Lanec4f071b2012-07-11 17:25:57 -0700126 # OpenFlow message/packet queue
127 # Protected by the packets_cv lock / condition variable
128 self.packets = []
129 self.packets_cv = Condition()
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700130 self.packet_in_count = 0
Rich Lanec4f071b2012-07-11 17:25:57 -0700131
Dan Talayco1b3f6902010-02-15 14:14:19 -0800132 # Settings
133 self.max_pkts = max_pkts
Dan Talayco69ca4d62012-11-15 11:50:22 -0800134 self.switch = switch
135 self.passive = not self.switch
Dan Talayco48370102010-03-03 15:17:33 -0800136 self.host = host
137 self.port = port
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800138 self.dbg_state = "init"
Dan Talayco48370102010-03-03 15:17:33 -0800139 self.logger = logging.getLogger("controller")
Dan Talaycof8de5182012-04-12 22:38:41 -0700140 self.filter_packet_in = False # Drop "excessive" packet ins
141 self.pkt_in_run = 0 # Count on run of packet ins
142 self.pkt_in_filter_limit = 50 # Count on run of packet ins
143 self.pkt_in_dropped = 0 # Total dropped packet ins
144 self.transact_to = 15 # Transact timeout default value; add to config
Dan Talayco1b3f6902010-02-15 14:14:19 -0800145
Dan Talaycoe226eb12010-02-18 23:06:30 -0800146 # Transaction and message type waiting variables
147 # xid_cv: Condition variable (semaphore) for packet waiters
Dan Talayco21c75c72010-02-12 22:59:24 -0800148 # xid: Transaction ID being waited on
149 # xid_response: Transaction response message
150 self.xid_cv = Condition()
151 self.xid = None
152 self.xid_response = None
153
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800154 self.buffered_input = ""
Dan Talaycoe226eb12010-02-18 23:06:30 -0800155
Rich Lane207502e2012-12-31 14:29:12 -0800156 # Create listen socket
157 if self.passive:
158 self.logger.info("Create/listen at " + self.host + ":" +
159 str(self.port))
160 self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
161 self.listen_socket.setsockopt(socket.SOL_SOCKET,
162 socket.SO_REUSEADDR, 1)
163 self.listen_socket.bind((self.host, self.port))
164 self.listen_socket.listen(LISTEN_QUEUE_SIZE)
165
Dan Talaycof8de5182012-04-12 22:38:41 -0700166 def filter_packet(self, rawmsg, hdr):
167 """
168 Check if packet should be filtered
169
170 Currently filters packet in messages
171 @return Boolean, True if packet should be dropped
172 """
Rich Lane1622bbb2013-03-11 17:11:53 -0700173 # XXX didn't actually check for packet-in...
174 return False
Dan Talaycof8de5182012-04-12 22:38:41 -0700175 # Add check for packet in and rate limit
176 if self.filter_packet_in:
Rich Lanec4f071b2012-07-11 17:25:57 -0700177 # If we were dropping packets, report number dropped
178 # TODO dont drop expected packet ins
179 if self.pkt_in_run > self.pkt_in_filter_limit:
180 self.logger.debug("Dropped %d packet ins (%d total)"
181 % ((self.pkt_in_run -
182 self.pkt_in_filter_limit),
183 self.pkt_in_dropped))
184 self.pkt_in_run = 0
Dan Talaycof8de5182012-04-12 22:38:41 -0700185
186 return False
187
Dan Talaycod12b6612010-03-07 22:00:46 -0800188 def _pkt_handle(self, pkt):
189 """
190 Check for all packet handling conditions
191
192 Parse and verify message
193 Check if XID matches something waiting
194 Check if message is being expected for a poll operation
195 Check if keep alive is on and message is an echo request
196 Check if any registered handler wants the packet
197 Enqueue if none of those conditions is met
198
199 an echo request in case keep_alive is true, followed by
200 registered message handlers.
Glen Gibb6d467062010-07-08 16:15:08 -0700201 @param pkt The raw packet (string) which may contain multiple OF msgs
Dan Talaycod12b6612010-03-07 22:00:46 -0800202 """
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800203
204 # snag any left over data from last read()
205 pkt = self.buffered_input + pkt
206 self.buffered_input = ""
207
Glen Gibb6d467062010-07-08 16:15:08 -0700208 # Process each of the OF msgs inside the pkt
209 offset = 0
210 while offset < len(pkt):
Rich Lane1622bbb2013-03-11 17:11:53 -0700211 if offset + 8 > len(pkt):
212 break
213
Glen Gibb6d467062010-07-08 16:15:08 -0700214 # Parse the header to get type
Rich Lane1622bbb2013-03-11 17:11:53 -0700215 hdr_version, hdr_type, hdr_length, hdr_xid = ofp.message.parse_header(pkt[offset:])
Dan Talaycod12b6612010-03-07 22:00:46 -0800216
Glen Gibb6d467062010-07-08 16:15:08 -0700217 # Extract the raw message bytes
Rich Lane1622bbb2013-03-11 17:11:53 -0700218 if (offset + hdr_length) > len(pkt):
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800219 break
Rich Lane1622bbb2013-03-11 17:11:53 -0700220 rawmsg = pkt[offset : offset + hdr_length]
221 offset += hdr_length
Dan Talaycof8de5182012-04-12 22:38:41 -0700222
Rich Lane1622bbb2013-03-11 17:11:53 -0700223 #if self.filter_packet(rawmsg, hdr):
224 # continue
Dan Talaycof8de5182012-04-12 22:38:41 -0700225
Rich Lane1879dc72013-03-11 22:08:51 -0700226 self.logger.debug("Msg in: version %d type %s (%d) len %d xid %d",
227 hdr_version,
228 ofp.ofp_type_map.get(hdr_type, "unknown"), hdr_type,
229 hdr_length, hdr_version)
Rich Lane1622bbb2013-03-11 17:11:53 -0700230 if hdr_version < ofp.OFP_VERSION:
Rich Lanec44b6242013-01-10 12:23:54 -0800231 self.logger.error("Switch only supports up to OpenFlow version %d (OFTest version is %d)",
Rich Lane1622bbb2013-03-11 17:11:53 -0700232 hdr_version, ofp.OFP_VERSION)
Rich Lanec44b6242013-01-10 12:23:54 -0800233 print "Switch only supports up to OpenFlow version %d (OFTest version is %d)" % \
Rich Lane1622bbb2013-03-11 17:11:53 -0700234 (hdr_version, ofp.OFP_VERSION)
Ken Chiangadc950f2012-10-05 13:50:03 -0700235 self.disconnect()
Dan Talaycof8de5182012-04-12 22:38:41 -0700236 return
Dan Talaycod12b6612010-03-07 22:00:46 -0800237
Rich Lanef6883512013-03-11 17:00:09 -0700238 msg = ofp.message.parse_message(rawmsg)
Glen Gibb6d467062010-07-08 16:15:08 -0700239 if not msg:
240 self.parse_errors += 1
241 self.logger.warn("Could not parse message")
242 continue
243
Rich Lanec4f071b2012-07-11 17:25:57 -0700244 with self.sync:
245 # Check if transaction is waiting
246 with self.xid_cv:
Rich Lane1622bbb2013-03-11 17:11:53 -0700247 if self.xid and hdr_xid == self.xid:
248 self.logger.debug("Matched expected XID " + str(hdr_xid))
Rich Lanec4f071b2012-07-11 17:25:57 -0700249 self.xid_response = (msg, rawmsg)
250 self.xid = None
251 self.xid_cv.notify()
252 continue
Glen Gibb6d467062010-07-08 16:15:08 -0700253
Rich Lanec4f071b2012-07-11 17:25:57 -0700254 # Check if keep alive is set; if so, respond to echo requests
255 if self.keep_alive:
Rich Lane1622bbb2013-03-11 17:11:53 -0700256 if hdr_type == ofp.OFPT_ECHO_REQUEST:
Rich Lanec4f071b2012-07-11 17:25:57 -0700257 self.logger.debug("Responding to echo request")
Rich Lane78ef8b92013-01-10 12:19:23 -0800258 rep = ofp.message.echo_reply()
Rich Lane1622bbb2013-03-11 17:11:53 -0700259 rep.xid = hdr_xid
Rich Lanec4f071b2012-07-11 17:25:57 -0700260 # Ignoring additional data
Rich Lane8fbfd662013-03-11 15:30:44 -0700261 self.message_send(rep.pack())
Rich Lanec4f071b2012-07-11 17:25:57 -0700262 continue
Glen Gibb6d467062010-07-08 16:15:08 -0700263
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700264 # Generalize to counters for all packet types?
265 if msg.type == ofp.OFPT_PACKET_IN:
266 self.packet_in_count += 1
267
Rich Lane5d63b9c2013-01-11 14:12:37 -0800268 # Log error messages
Rich Lane1622bbb2013-03-11 17:11:53 -0700269 if hdr_type == ofp.OFPT_ERROR:
Rich Laneb73808c2013-03-11 15:22:23 -0700270 if msg.err_type in ofp.ofp_error_type_map:
271 type_str = ofp.ofp_error_type_map[msg.err_type]
272 if msg.err_type == ofp.OFPET_HELLO_FAILED:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800273 code_map = ofp.ofp_hello_failed_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700274 elif msg.err_type == ofp.OFPET_BAD_REQUEST:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800275 code_map = ofp.ofp_bad_request_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700276 elif msg.err_type == ofp.OFPET_BAD_ACTION:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800277 code_map = ofp.ofp_bad_action_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700278 elif msg.err_type == ofp.OFPET_FLOW_MOD_FAILED:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800279 code_map = ofp.ofp_flow_mod_failed_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700280 elif msg.err_type == ofp.OFPET_PORT_MOD_FAILED:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800281 code_map = ofp.ofp_port_mod_failed_code_map
Rich Laneb73808c2013-03-11 15:22:23 -0700282 elif msg.err_type == ofp.OFPET_QUEUE_OP_FAILED:
Rich Lane5d63b9c2013-01-11 14:12:37 -0800283 code_map = ofp.ofp_queue_op_failed_code_map
284 else:
285 code_map = None
286
287 if code_map and msg.code in code_map:
288 code_str = code_map[msg.code]
289 else:
290 code_str = "unknown"
291 else:
292 type_str = "unknown"
Rich Lane1879dc72013-03-11 22:08:51 -0700293 code_str = "unknown"
Rich Lane5d63b9c2013-01-11 14:12:37 -0800294 self.logger.warn("Received error message: xid=%d type=%s (%d) code=%s (%d)",
Rich Lane1622bbb2013-03-11 17:11:53 -0700295 hdr_xid, type_str, msg.err_type, code_str, msg.code)
Rich Lane5d63b9c2013-01-11 14:12:37 -0800296
Rich Lanec4f071b2012-07-11 17:25:57 -0700297 # Now check for message handlers; preference is given to
298 # handlers for a specific packet
299 handled = False
Rich Lane1622bbb2013-03-11 17:11:53 -0700300 if hdr_type in self.handlers.keys():
301 handled = self.handlers[hdr_type](self, msg, rawmsg)
Rich Lanec4f071b2012-07-11 17:25:57 -0700302 if not handled and ("all" in self.handlers.keys()):
303 handled = self.handlers["all"](self, msg, rawmsg)
Glen Gibb6d467062010-07-08 16:15:08 -0700304
Rich Lanec4f071b2012-07-11 17:25:57 -0700305 if not handled: # Not handled, enqueue
Rich Lane1879dc72013-03-11 22:08:51 -0700306 self.logger.debug("Enqueuing pkt type %s (%d)",
307 ofp.ofp_type_map.get(hdr_type, "unknown"),
308 hdr_type)
Rich Lanec4f071b2012-07-11 17:25:57 -0700309 with self.packets_cv:
310 if len(self.packets) >= self.max_pkts:
311 self.packets.pop(0)
312 self.packets_expired += 1
313 self.packets.append((msg, rawmsg))
314 self.packets_cv.notify_all()
315 self.packets_total += 1
316 else:
317 self.packets_handled += 1
318 self.logger.debug("Message handled by callback")
Glen Gibb6d467062010-07-08 16:15:08 -0700319
Rob Sherwoode3e452a2012-03-06 09:24:26 -0800320 # end of 'while offset < len(pkt)'
321 # note that if offset = len(pkt), this is
322 # appends a harmless empty string
323 self.buffered_input += pkt[offset:]
Dan Talaycod12b6612010-03-07 22:00:46 -0800324
Dan Talayco710438c2010-02-18 15:16:07 -0800325 def _socket_ready_handle(self, s):
326 """
327 Handle an input-ready socket
Dan Talaycof8de5182012-04-12 22:38:41 -0700328
Dan Talayco710438c2010-02-18 15:16:07 -0800329 @param s The socket object that is ready
Dan Talaycof8de5182012-04-12 22:38:41 -0700330 @returns 0 on success, -1 on error
Dan Talayco710438c2010-02-18 15:16:07 -0800331 """
332
Dan Talayco69ca4d62012-11-15 11:50:22 -0800333 if self.passive and s and s == self.listen_socket:
Dan Talayco710438c2010-02-18 15:16:07 -0800334 if self.switch_socket:
Rich Lanee1da7ea2012-07-26 15:58:45 -0700335 self.logger.warning("Ignoring incoming connection; already connected to switch")
Rich Laneb4f8ecb2012-09-25 09:36:26 -0700336 (sock, addr) = self.listen_socket.accept()
337 sock.close()
Rich Lanee1da7ea2012-07-26 15:58:45 -0700338 return 0
Dan Talayco710438c2010-02-18 15:16:07 -0800339
Ken Chiange875baf2012-10-09 15:24:40 -0700340 try:
341 (sock, addr) = self.listen_socket.accept()
342 except:
343 self.logger.warning("Error on listen socket accept")
344 return -1
Ken Chiang77173992012-10-30 15:44:39 -0700345 self.logger.info(self.host+":"+str(self.port)+": Incoming connection from "+str(addr))
Rich Lanee1da7ea2012-07-26 15:58:45 -0700346
Rich Laneee3586c2012-07-11 17:26:02 -0700347 with self.connect_cv:
Rich Lanee1da7ea2012-07-26 15:58:45 -0700348 (self.switch_socket, self.switch_addr) = (sock, addr)
Rich Lane82ef1832012-12-22 17:04:35 -0800349 self.switch_socket.setsockopt(socket.IPPROTO_TCP,
350 socket.TCP_NODELAY, True)
Rich Lane1a8d5aa2012-10-08 15:40:03 -0700351 if self.initial_hello:
Rich Lane78ef8b92013-01-10 12:19:23 -0800352 self.message_send(ofp.message.hello())
Rich Lanee1da7ea2012-07-26 15:58:45 -0700353 self.connect_cv.notify() # Notify anyone waiting
Rich Laned929b8d2013-04-15 15:59:14 -0700354
355 # Prevent further connections
356 self.listen_socket.close()
357 self.listen_socket = None
Dan Talaycof8de5182012-04-12 22:38:41 -0700358 elif s and s == self.switch_socket:
359 for idx in range(3): # debug: try a couple of times
360 try:
361 pkt = self.switch_socket.recv(self.rcv_size)
362 except:
363 self.logger.warning("Error on switch read")
364 return -1
365
366 if not self.active:
367 return 0
368
369 if len(pkt) == 0:
370 self.logger.warning("Zero-length switch read, %d" % idx)
371 else:
372 break
Dan Talayco710438c2010-02-18 15:16:07 -0800373
Dan Talaycof8de5182012-04-12 22:38:41 -0700374 if len(pkt) == 0: # Still no packet
Dan Talayco0fc08bd2012-04-09 16:56:18 -0700375 self.logger.warning("Zero-length switch read; closing cxn")
Dan Talaycof8de5182012-04-12 22:38:41 -0700376 self.logger.info(str(self))
377 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800378
Dan Talaycod12b6612010-03-07 22:00:46 -0800379 self._pkt_handle(pkt)
Rich Lane4dfd5e12012-12-22 19:48:01 -0800380 elif s and s == self.waker:
381 self.waker.wait()
Dan Talayco710438c2010-02-18 15:16:07 -0800382 else:
Dan Talayco48370102010-03-03 15:17:33 -0800383 self.logger.error("Unknown socket ready: " + str(s))
Dan Talaycof8de5182012-04-12 22:38:41 -0700384 return -1
Dan Talayco710438c2010-02-18 15:16:07 -0800385
Dan Talaycof8de5182012-04-12 22:38:41 -0700386 return 0
Dan Talayco710438c2010-02-18 15:16:07 -0800387
Dan Talayco69ca4d62012-11-15 11:50:22 -0800388 def active_connect(self):
389 """
390 Actively connect to a switch IP addr
391 """
392 try:
393 self.logger.info("Trying active connection to %s" % self.switch)
394 soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
395 soc.connect((self.switch, self.port))
396 self.logger.info("Connected to " + self.switch + " on " +
397 str(self.port))
Rich Lane82ef1832012-12-22 17:04:35 -0800398 soc.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800399 self.switch_addr = (self.switch, self.port)
400 return soc
401 except (StandardError, socket.error), e:
402 self.logger.error("Could not connect to %s at %d:: %s" %
403 (self.switch, self.port, str(e)))
404 return None
405
Rich Lane32797542012-12-22 17:46:05 -0800406 def wakeup(self):
407 """
408 Wake up the event loop, presumably from another thread.
409 """
Rich Lane4dfd5e12012-12-22 19:48:01 -0800410 self.waker.notify()
Rich Lane32797542012-12-22 17:46:05 -0800411
412 def sockets(self):
413 """
414 Return list of sockets to select on.
415 """
Rich Lane4dfd5e12012-12-22 19:48:01 -0800416 socs = [self.listen_socket, self.switch_socket, self.waker]
Rich Lane32797542012-12-22 17:46:05 -0800417 return [x for x in socs if x]
418
Dan Talayco1b3f6902010-02-15 14:14:19 -0800419 def run(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800420 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800421 Activity function for class
Dan Talayco21c75c72010-02-12 22:59:24 -0800422
Dan Talayco1b3f6902010-02-15 14:14:19 -0800423 Assumes connection to switch already exists. Listens on
424 switch_socket for messages until an error (or zero len pkt)
425 occurs.
Dan Talayco21c75c72010-02-12 22:59:24 -0800426
Dan Talayco1b3f6902010-02-15 14:14:19 -0800427 When there is a message on the socket, check for handlers; queue the
428 packet if no one handles the packet.
429
430 See note for controller describing the limitation of a single
431 connection for now.
432 """
433
Rich Lane207502e2012-12-31 14:29:12 -0800434 self.dbg_state = "running"
Ken Chiangadc950f2012-10-05 13:50:03 -0700435
Dan Talayco710438c2010-02-18 15:16:07 -0800436 while self.active:
Dan Talayco710438c2010-02-18 15:16:07 -0800437 try:
438 sel_in, sel_out, sel_err = \
Rich Lane32797542012-12-22 17:46:05 -0800439 select.select(self.sockets(), [], self.sockets(), 1)
Dan Talayco710438c2010-02-18 15:16:07 -0800440 except:
441 print sys.exc_info()
Ken Chiangadc950f2012-10-05 13:50:03 -0700442 self.logger.error("Select error, disconnecting")
443 self.disconnect()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800444
Dan Talayco710438c2010-02-18 15:16:07 -0800445 for s in sel_err:
Ken Chiangadc950f2012-10-05 13:50:03 -0700446 self.logger.error("Got socket error on: " + str(s) + ", disconnecting")
447 self.disconnect()
Dan Talaycof8de5182012-04-12 22:38:41 -0700448
449 for s in sel_in:
450 if self._socket_ready_handle(s) == -1:
Ken Chiangadc950f2012-10-05 13:50:03 -0700451 self.disconnect()
Dan Talayco710438c2010-02-18 15:16:07 -0800452
Dan Talayco710438c2010-02-18 15:16:07 -0800453 # End of main loop
454 self.dbg_state = "closing"
Dan Talayco48370102010-03-03 15:17:33 -0800455 self.logger.info("Exiting controller thread")
Dan Talayco710438c2010-02-18 15:16:07 -0800456 self.shutdown()
457
Rich Lane8806bc42012-07-26 19:18:37 -0700458 def connect(self, timeout=-1):
Dan Talayco710438c2010-02-18 15:16:07 -0800459 """
460 Connect to the switch
461
Rich Lane8806bc42012-07-26 19:18:37 -0700462 @param timeout Block for up to timeout seconds. Pass -1 for the default.
Dan Talayco710438c2010-02-18 15:16:07 -0800463 @return Boolean, True if connected
464 """
465
Dan Talayco69ca4d62012-11-15 11:50:22 -0800466 if not self.passive: # Do active connection now
467 self.logger.info("Attempting to connect to %s on port %s" %
468 (self.switch, str(self.port)))
469 soc = self.active_connect()
470 if soc:
471 self.logger.info("Connected to %s", self.switch)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800472 self.dbg_state = "running"
473 self.switch_socket = soc
Rich Lane32797542012-12-22 17:46:05 -0800474 self.wakeup()
Dan Talayco69ca4d62012-11-15 11:50:22 -0800475 with self.connect_cv:
476 if self.initial_hello:
Rich Lane720eaf22013-08-09 18:00:45 -0700477 self.message_send(ofp.message.hello())
Dan Talayco69ca4d62012-11-15 11:50:22 -0800478 self.connect_cv.notify() # Notify anyone waiting
479 else:
480 self.logger.error("Could not actively connect to switch %s",
481 self.switch)
482 self.active = False
483 else:
484 with self.connect_cv:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800485 ofutils.timed_wait(self.connect_cv, lambda: self.switch_socket,
486 timeout=timeout)
Dan Talayco69ca4d62012-11-15 11:50:22 -0800487
Dan Talayco710438c2010-02-18 15:16:07 -0800488 return self.switch_socket is not None
489
Ken Chiangadc950f2012-10-05 13:50:03 -0700490 def disconnect(self, timeout=-1):
491 """
492 If connected to a switch, disconnect.
493 """
494 if self.switch_socket:
Ken Chiangadc950f2012-10-05 13:50:03 -0700495 self.switch_socket.close()
496 self.switch_socket = None
497 self.switch_addr = None
Ken Chiang74be4722012-12-21 13:07:03 -0800498 with self.packets_cv:
499 self.packets = []
Ken Chiange875baf2012-10-09 15:24:40 -0700500 with self.connect_cv:
501 self.connect_cv.notifyAll()
Ken Chiangadc950f2012-10-05 13:50:03 -0700502
503 def wait_disconnected(self, timeout=-1):
504 """
505 @param timeout Block for up to timeout seconds. Pass -1 for the default.
506 @return Boolean, True if disconnected
507 """
508
Ken Chiange875baf2012-10-09 15:24:40 -0700509 with self.connect_cv:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800510 ofutils.timed_wait(self.connect_cv,
511 lambda: True if not self.switch_socket else None,
512 timeout=timeout)
Ken Chiangadc950f2012-10-05 13:50:03 -0700513 return self.switch_socket is None
514
Dan Talayco710438c2010-02-18 15:16:07 -0800515 def kill(self):
516 """
517 Force the controller thread to quit
Dan Talayco710438c2010-02-18 15:16:07 -0800518 """
519 self.active = False
Rich Lane32797542012-12-22 17:46:05 -0800520 self.wakeup()
Rich Lane376bb402012-12-31 15:20:16 -0800521 self.join()
Dan Talayco21c75c72010-02-12 22:59:24 -0800522
Dan Talayco1b3f6902010-02-15 14:14:19 -0800523 def shutdown(self):
Dan Talayco21c75c72010-02-12 22:59:24 -0800524 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800525 Shutdown the controller closing all sockets
Dan Talayco21c75c72010-02-12 22:59:24 -0800526
Dan Talayco1b3f6902010-02-15 14:14:19 -0800527 @todo Might want to synchronize shutdown with self.sync...
528 """
Dan Talaycof8de5182012-04-12 22:38:41 -0700529
Dan Talayco710438c2010-02-18 15:16:07 -0800530 self.active = False
531 try:
532 self.switch_socket.shutdown(socket.SHUT_RDWR)
533 except:
Dan Talayco48370102010-03-03 15:17:33 -0800534 self.logger.info("Ignoring switch soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800535 self.switch_socket = None
Dan Talayco1b3f6902010-02-15 14:14:19 -0800536
Dan Talayco710438c2010-02-18 15:16:07 -0800537 try:
538 self.listen_socket.shutdown(socket.SHUT_RDWR)
539 except:
Dan Talayco48370102010-03-03 15:17:33 -0800540 self.logger.info("Ignoring listen soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800541 self.listen_socket = None
Dan Talaycof8de5182012-04-12 22:38:41 -0700542
Rich Laneee3586c2012-07-11 17:26:02 -0700543 # Wakeup condition variables on which controller may be wait
544 with self.xid_cv:
545 self.xid_cv.notifyAll()
Dan Talaycof8de5182012-04-12 22:38:41 -0700546
Rich Laneee3586c2012-07-11 17:26:02 -0700547 with self.connect_cv:
548 self.connect_cv.notifyAll()
Dan Talaycof8de5182012-04-12 22:38:41 -0700549
Rich Lane32797542012-12-22 17:46:05 -0800550 self.wakeup()
Dan Talayco710438c2010-02-18 15:16:07 -0800551 self.dbg_state = "down"
552
Dan Talayco34089522010-02-07 23:07:41 -0800553 def register(self, msg_type, handler):
554 """
555 Register a callback to receive a specific message type.
556
557 Only one handler may be registered for a given message type.
Dan Talaycod12b6612010-03-07 22:00:46 -0800558
559 WARNING: A lock is held during the handler call back, so
560 the handler should not make any blocking calls
561
Dan Talayco34089522010-02-07 23:07:41 -0800562 @param msg_type The type of message to receive. May be DEFAULT
Dan Talayco21c75c72010-02-12 22:59:24 -0800563 for all non-handled packets. The special type, the string "all"
564 will send all packets to the handler.
Dan Talayco34089522010-02-07 23:07:41 -0800565 @param handler The function to call when a message of the given
566 type is received.
567 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800568 # Should check type is valid
569 if not handler and msg_type in self.handlers.keys():
570 del self.handlers[msg_type]
571 return
572 self.handlers[msg_type] = handler
Dan Talayco34089522010-02-07 23:07:41 -0800573
sumithdev095542cf52013-07-12 14:56:28 -0400574 def poll(self, exp_msg=None, timeout=-1):
Dan Talayco34089522010-02-07 23:07:41 -0800575 """
576 Wait for the next OF message received from the switch.
577
578 @param exp_msg If set, return only when this type of message
Dan Talayco48370102010-03-03 15:17:33 -0800579 is received (unless timeout occurs).
Rich Laneb64ce3d2012-07-26 15:37:57 -0700580
581 @param timeout Maximum number of seconds to wait for the message.
582 Pass -1 for the default timeout.
Dan Talayco34089522010-02-07 23:07:41 -0800583
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800584 @retval A pair (msg, pkt) where msg is a message object and pkt
585 the string representing the packet as received from the socket.
586 This allows additional parsing by the receiver if necessary.
587
Dan Talayco34089522010-02-07 23:07:41 -0800588 The data members in the message are in host endian order.
Dan Talayco48370102010-03-03 15:17:33 -0800589 If an error occurs, (None, None) is returned
Dan Talayco34089522010-02-07 23:07:41 -0800590 """
Dan Talayco34089522010-02-07 23:07:41 -0800591
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700592 exp_msg_str = "unspecified"
sumithdev095542cf52013-07-12 14:56:28 -0400593 if exp_msg is not None:
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700594 exp_msg_str = ofp.ofp_type_map.get(exp_msg, "unknown (%d)" %
595 exp_msg)
Rich Lane1879dc72013-03-11 22:08:51 -0700596
sumithdev095542cf52013-07-12 14:56:28 -0400597 if exp_msg is not None:
Rich Lane1879dc72013-03-11 22:08:51 -0700598 self.logger.debug("Poll for %s", exp_msg_str)
Ed Swierk9e55e282012-08-22 06:57:28 -0700599 else:
600 self.logger.debug("Poll for any OF message")
Rich Laneb64ce3d2012-07-26 15:37:57 -0700601
602 # Take the packet from the queue
Rich Lanec4f071b2012-07-11 17:25:57 -0700603 def grab():
604 if len(self.packets) > 0:
sumithdev095542cf52013-07-12 14:56:28 -0400605 if exp_msg is None:
Rich Lanec4f071b2012-07-11 17:25:57 -0700606 self.logger.debug("Looking for any packet")
607 (msg, pkt) = self.packets.pop(0)
608 return (msg, pkt)
609 else:
Rich Lane1879dc72013-03-11 22:08:51 -0700610 self.logger.debug("Looking for %s", exp_msg_str)
Rich Lanec4f071b2012-07-11 17:25:57 -0700611 for i in range(len(self.packets)):
612 msg = self.packets[i][0]
Rich Lane7094ff12013-05-07 14:57:53 -0700613 msg_str = ofp.ofp_type_map.get(msg.type, "unknown (%d)" % msg.type)
614 self.logger.debug("Checking packets[%d] %s) against %s", i, msg_str, exp_msg_str)
Rich Laneb73808c2013-03-11 15:22:23 -0700615 if msg.type == exp_msg:
Rich Lanec4f071b2012-07-11 17:25:57 -0700616 (msg, pkt) = self.packets.pop(i)
617 return (msg, pkt)
618 # Not found
619 self.logger.debug("Packet not in queue")
Rich Laneb64ce3d2012-07-26 15:37:57 -0700620 return None
Dan Talayco21c75c72010-02-12 22:59:24 -0800621
Rich Lanec4f071b2012-07-11 17:25:57 -0700622 with self.packets_cv:
Rich Lanecd97d3d2013-01-07 18:50:06 -0800623 ret = ofutils.timed_wait(self.packets_cv, grab, timeout=timeout)
Rich Lanec4f071b2012-07-11 17:25:57 -0700624
Rich Laneb64ce3d2012-07-26 15:37:57 -0700625 if ret != None:
626 (msg, pkt) = ret
627 self.logger.debug("Got message %s" % str(msg))
628 return (msg, pkt)
629 else:
630 return (None, None)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800631
Rich Lane8fbfd662013-03-11 15:30:44 -0700632 def transact(self, msg, timeout=-1):
Dan Talayco34089522010-02-07 23:07:41 -0800633 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800634 Run a message transaction with the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800635
636 Send the message in msg and wait for a reply with a matching
Dan Talayco21c75c72010-02-12 22:59:24 -0800637 transaction id. Transactions have the highest priority in
638 received message handling.
Dan Talaycoe37999f2010-02-09 15:27:12 -0800639
Dan Talayco21c75c72010-02-12 22:59:24 -0800640 @param msg The message object to send; must not be a string
Rich Lanee1da7ea2012-07-26 15:58:45 -0700641 @param timeout The timeout in seconds; if -1 use default.
Dan Talayco34089522010-02-07 23:07:41 -0800642 """
Dan Talayco21c75c72010-02-12 22:59:24 -0800643
Rich Lane8fbfd662013-03-11 15:30:44 -0700644 if msg.xid == None:
Rich Laneb73808c2013-03-11 15:22:23 -0700645 msg.xid = ofutils.gen_xid()
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800646
Rich Laneb73808c2013-03-11 15:22:23 -0700647 self.logger.debug("Running transaction %d" % msg.xid)
Dan Talayco21c75c72010-02-12 22:59:24 -0800648
Rich Lane9aca1992012-07-11 17:26:31 -0700649 with self.xid_cv:
650 if self.xid:
651 self.logger.error("Can only run one transaction at a time")
652 return (None, None)
Dan Talaycof8de5182012-04-12 22:38:41 -0700653
Rich Laneb73808c2013-03-11 15:22:23 -0700654 self.xid = msg.xid
Dan Talaycod12b6612010-03-07 22:00:46 -0800655 self.xid_response = None
Rich Lane5c3151c2013-01-03 17:15:41 -0800656 self.message_send(msg.pack())
Rich Lane9aca1992012-07-11 17:26:31 -0700657
Rich Laneb73808c2013-03-11 15:22:23 -0700658 self.logger.debug("Waiting for transaction %d" % msg.xid)
Rich Lanecd97d3d2013-01-07 18:50:06 -0800659 ofutils.timed_wait(self.xid_cv, lambda: self.xid_response, timeout=timeout)
Rich Lane9aca1992012-07-11 17:26:31 -0700660
661 if self.xid_response:
662 (resp, pkt) = self.xid_response
663 self.xid_response = None
664 else:
665 (resp, pkt) = (None, None)
666
Dan Talayco09c2c592010-05-13 14:21:52 -0700667 if resp is None:
668 self.logger.warning("No response for xid " + str(self.xid))
669 return (resp, pkt)
Dan Talayco34089522010-02-07 23:07:41 -0800670
Rich Lane8fbfd662013-03-11 15:30:44 -0700671 def message_send(self, msg):
Dan Talayco34089522010-02-07 23:07:41 -0800672 """
673 Send the message to the switch
Dan Talaycoe37999f2010-02-09 15:27:12 -0800674
Dan Talayco11c26e72010-03-07 22:03:57 -0800675 @param msg A string or OpenFlow message object to be forwarded to
676 the switch.
Dan Talayco21c75c72010-02-12 22:59:24 -0800677 """
678
Dan Talayco1b3f6902010-02-15 14:14:19 -0800679 if not self.switch_socket:
680 # Sending a string indicates the message is ready to go
Ed Swierk9e55e282012-08-22 06:57:28 -0700681 raise Exception("no socket")
Dan Talayco710438c2010-02-18 15:16:07 -0800682 #@todo If not string, try to pack
Dan Talayco21c75c72010-02-12 22:59:24 -0800683 if type(msg) != type(""):
Rich Lane8fbfd662013-03-11 15:30:44 -0700684 if msg.xid == None:
Rich Laneb73808c2013-03-11 15:22:23 -0700685 msg.xid = ofutils.gen_xid()
Ed Swierk9e55e282012-08-22 06:57:28 -0700686 outpkt = msg.pack()
Dan Talayco710438c2010-02-18 15:16:07 -0800687 else:
688 outpkt = msg
Dan Talayco21c75c72010-02-12 22:59:24 -0800689
Rich Lanef18980d2012-12-31 17:11:41 -0800690 msg_version, msg_type, msg_len, msg_xid = struct.unpack_from("!BBHL", outpkt)
Rich Lane5d63b9c2013-01-11 14:12:37 -0800691 self.logger.debug("Msg out: buf len %d. hdr.type %s. hdr.len %d hdr.version %d hdr.xid %d",
Rich Lanef18980d2012-12-31 17:11:41 -0800692 len(outpkt),
Rich Laned7b0ffa2013-03-08 15:53:42 -0800693 ofp.ofp_type_map.get(msg_type, "unknown (%d)" % msg_type),
Rich Lanecd97d3d2013-01-07 18:50:06 -0800694 msg_len,
Rich Lane5d63b9c2013-01-11 14:12:37 -0800695 msg_version,
696 msg_xid)
Rich Lanec9d3edd2013-10-09 00:21:01 -0700697
698 with self.tx_lock:
699 if self.switch_socket.sendall(outpkt) is not None:
700 raise AssertionError("failed to send message to switch")
Dan Talayco710438c2010-02-18 15:16:07 -0800701
Rich Lane5c3151c2013-01-03 17:15:41 -0800702 return 0 # for backwards compatibility
Dan Talayco21c75c72010-02-12 22:59:24 -0800703
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700704 def clear_queue(self):
705 """
706 Clear the input queue and report the number of messages
707 that were in it
708 """
Dan Talayco7071cf12013-04-16 11:02:13 -0700709 enqueued_pkt_count = len(self.packets)
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700710 with self.packets_cv:
711 self.packets = []
Dan Talayco7071cf12013-04-16 11:02:13 -0700712 return enqueued_pkt_count
Dan Talaycodd6b6ff2013-04-12 08:20:18 -0700713
Dan Talayco21c75c72010-02-12 22:59:24 -0800714 def __str__(self):
715 string = "Controller:\n"
Dan Talaycod7e2dbe2010-02-13 21:51:15 -0800716 string += " state " + self.dbg_state + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800717 string += " switch_addr " + str(self.switch_addr) + "\n"
718 string += " pending pkts " + str(len(self.packets)) + "\n"
719 string += " total pkts " + str(self.packets_total) + "\n"
720 string += " expired pkts " + str(self.packets_expired) + "\n"
721 string += " handled pkts " + str(self.packets_handled) + "\n"
Dan Talaycoe226eb12010-02-18 23:06:30 -0800722 string += " poll discards " + str(self.poll_discards) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800723 string += " parse errors " + str(self.parse_errors) + "\n"
724 string += " sock errrors " + str(self.socket_errors) + "\n"
725 string += " max pkts " + str(self.max_pkts) + "\n"
Dan Talayco69ca4d62012-11-15 11:50:22 -0800726 string += " target switch " + str(self.switch) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800727 string += " host " + str(self.host) + "\n"
728 string += " port " + str(self.port) + "\n"
729 string += " keep_alive " + str(self.keep_alive) + "\n"
Dan Talaycof8de5182012-04-12 22:38:41 -0700730 string += " pkt_in_run " + str(self.pkt_in_run) + "\n"
731 string += " pkt_in_dropped " + str(self.pkt_in_dropped) + "\n"
Dan Talayco21c75c72010-02-12 22:59:24 -0800732 return string
733
734 def show(self):
735 print str(self)
736
737def sample_handler(controller, msg, pkt):
738 """
739 Sample message handler
740
741 This is the prototype for functions registered with the controller
742 class for packet reception
743
744 @param controller The controller calling the handler
745 @param msg The parsed message object
746 @param pkt The raw packet that was received on the socket. This is
747 in case the packet contains extra unparsed data.
748 @returns Boolean value indicating if the packet was handled. If
749 not handled, the packet is placed in the queue for pollers to received
750 """
751 pass