blob: 252726af8a16cf063391339741e2744ba7b7a13c [file] [log] [blame]
Dan Talayco34089522010-02-07 23:07:41 -08001"""
2OpenFlow Test Framework
3
Dan Talayco3087a462010-02-13 14:01:47 -08004DataPlane and DataPlanePort classes
Dan Talayco34089522010-02-07 23:07:41 -08005
6Provide the interface to the control the set of ports being used
7to stimulate the switch under test.
8
9See the class dataplaneport for more details. This class wraps
Dan Talaycoe226eb12010-02-18 23:06:30 -080010a set of those objects allowing general calls and parsing
Dan Talayco34089522010-02-07 23:07:41 -080011configuration.
12
Dan Talayco3087a462010-02-13 14:01:47 -080013@todo Add "filters" for matching packets. Actions supported
14for filters should include a callback or a counter
Dan Talayco34089522010-02-07 23:07:41 -080015"""
16
Dan Talayco3087a462010-02-13 14:01:47 -080017import sys
18import os
19import socket
20import time
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080021import netutils
Dan Talayco3087a462010-02-13 14:01:47 -080022from threading import Thread
23from threading import Lock
Dan Talaycoe226eb12010-02-18 23:06:30 -080024from threading import Condition
Dan Talayco710438c2010-02-18 15:16:07 -080025import select
Dan Talayco48370102010-03-03 15:17:33 -080026import logging
27from oft_assert import oft_assert
Dan Talayco3087a462010-02-13 14:01:47 -080028
Dan Talayco48370102010-03-03 15:17:33 -080029##@todo Find a better home for these identifiers (dataplane)
30RCV_SIZE_DEFAULT = 4096
Dan Talayco3087a462010-02-13 14:01:47 -080031ETH_P_ALL = 0x03
32RCV_TIMEOUT = 10000
Dan Talayco3087a462010-02-13 14:01:47 -080033
Ed Swierk506614a2012-03-29 08:16:59 -070034def match_exp_pkt(exp_pkt, pkt):
35 """
36 Compare the string value of pkt with the string value of exp_pkt,
37 and return True iff they are identical. If the length of exp_pkt is
38 less than the minimum Ethernet frame size (60 bytes), then padding
39 bytes in pkt are ignored.
40 """
41 e = str(exp_pkt)
42 p = str(pkt)
43 if len(e) < 60:
44 p = p[:len(e)]
45 return e == p
46
Dan Talayco3087a462010-02-13 14:01:47 -080047class DataPlanePort(Thread):
48 """
49 Class defining a port monitoring object.
50
51 Control a dataplane port connected to the switch under test.
52 Creates a promiscuous socket on a physical interface.
53 Queues the packets received on that interface with time stamps.
54 Inherits from Thread class as meant to run in background. Also
55 supports polling.
56 Use accessors to dequeue packets for proper synchronization.
Dan Talaycoe226eb12010-02-18 23:06:30 -080057
58 Currently assumes a controlling 'parent' which maintains a
59 common Lock object and a total packet-pending count. May want
60 to decouple that some day.
Dan Talayco3087a462010-02-13 14:01:47 -080061 """
62
Dan Talaycoe226eb12010-02-18 23:06:30 -080063 def __init__(self, interface_name, port_number, parent, max_pkts=1024):
Dan Talayco3087a462010-02-13 14:01:47 -080064 """
65 Set up a port monitor object
66 @param interface_name The name of the physical interface like eth1
Dan Talayco4d065972010-02-18 23:11:32 -080067 @param port_number The port number associated with this port
Dan Talaycoe226eb12010-02-18 23:06:30 -080068 @param parent The controlling dataplane object; for pkt wait CV
Dan Talayco3087a462010-02-13 14:01:47 -080069 @param max_pkts Maximum number of pkts to keep in queue
70 """
71 Thread.__init__(self)
72 self.interface_name = interface_name
73 self.max_pkts = max_pkts
Dan Talayco3087a462010-02-13 14:01:47 -080074 self.packets_total = 0
75 self.packets = []
Dan Talayco1b3f6902010-02-15 14:14:19 -080076 self.packets_discarded = 0
Dan Talaycoe226eb12010-02-18 23:06:30 -080077 self.port_number = port_number
Dan Talayco48370102010-03-03 15:17:33 -080078 logname = "dp-" + interface_name
79 self.logger = logging.getLogger(logname)
Dan Talayco0db53eb2010-03-10 14:00:02 -080080 try:
81 self.socket = self.interface_open(interface_name)
82 except:
83 self.logger.info("Could not open socket")
84 sys.exit(1)
Dan Talayco48370102010-03-03 15:17:33 -080085 self.logger.info("Openned port monitor socket")
Dan Talaycoe226eb12010-02-18 23:06:30 -080086 self.parent = parent
87 self.pkt_sync = self.parent.pkt_sync
Dan Talayco1b3f6902010-02-15 14:14:19 -080088
Dan Talayco3087a462010-02-13 14:01:47 -080089 def interface_open(self, interface_name):
90 """
91 Open a socket in a promiscuous mode for a data connection.
92 @param interface_name port name as a string such as 'eth1'
93 @retval s socket
94 """
Dan Talaycoe226eb12010-02-18 23:06:30 -080095 s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
Dan Talayco3087a462010-02-13 14:01:47 -080096 socket.htons(ETH_P_ALL))
97 s.bind((interface_name, 0))
Dan Talayco1b3f6902010-02-15 14:14:19 -080098 netutils.set_promisc(s, interface_name)
Dan Talayco3087a462010-02-13 14:01:47 -080099 s.settimeout(RCV_TIMEOUT)
100 return s
101
Dan Talayco3087a462010-02-13 14:01:47 -0800102 def run(self):
103 """
104 Activity function for class
105 """
106 self.running = True
Dan Talayco710438c2010-02-18 15:16:07 -0800107 self.socs = [self.socket]
108 error_warned = False # Have we warned about error?
Dan Talayco3087a462010-02-13 14:01:47 -0800109 while self.running:
110 try:
Dan Talayco710438c2010-02-18 15:16:07 -0800111 sel_in, sel_out, sel_err = \
112 select.select(self.socs, [], [], 1)
113 except:
114 print sys.exc_info()
Dan Talayco48370102010-03-03 15:17:33 -0800115 self.logger.error("Select error, exiting")
116 break
Dan Talayco710438c2010-02-18 15:16:07 -0800117
118 if not self.running:
119 break
120
Dan Talayco48370102010-03-03 15:17:33 -0800121 if (sel_in is None) or (len(sel_in) == 0):
Dan Talayco710438c2010-02-18 15:16:07 -0800122 continue
123
124 try:
Dan Talayco48370102010-03-03 15:17:33 -0800125 rcvmsg = self.socket.recv(RCV_SIZE_DEFAULT)
Dan Talayco3087a462010-02-13 14:01:47 -0800126 except socket.error:
Dan Talayco710438c2010-02-18 15:16:07 -0800127 if not error_warned:
Dan Talayco48370102010-03-03 15:17:33 -0800128 self.logger.info("Socket error on recv")
Dan Talayco710438c2010-02-18 15:16:07 -0800129 error_warned = True
Dan Talayco1b3f6902010-02-15 14:14:19 -0800130 continue
Dan Talayco710438c2010-02-18 15:16:07 -0800131
Dan Talayco1b3f6902010-02-15 14:14:19 -0800132 if len(rcvmsg) == 0:
Dan Talayco48370102010-03-03 15:17:33 -0800133 self.logger.info("Zero len pkt rcvd")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800134 self.kill()
Dan Talayco3087a462010-02-13 14:01:47 -0800135 break
136
Dan Talayco1b3f6902010-02-15 14:14:19 -0800137 rcvtime = time.clock()
Dan Talayco48370102010-03-03 15:17:33 -0800138 self.logger.debug("Pkt len " + str(len(rcvmsg)) +
Ed Swierk4e200302012-03-19 14:53:31 -0700139 " in at " + str(rcvtime) + " on port " +
140 str(self.port_number))
Dan Talayco1b3f6902010-02-15 14:14:19 -0800141
Dan Talaycoe226eb12010-02-18 23:06:30 -0800142 # Enqueue packet
143 self.pkt_sync.acquire()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800144 if len(self.packets) >= self.max_pkts:
Dan Talaycoe226eb12010-02-18 23:06:30 -0800145 # Queue full, throw away oldest
Dan Talayco1b3f6902010-02-15 14:14:19 -0800146 self.packets.pop(0)
147 self.packets_discarded += 1
Dan Talaycoe226eb12010-02-18 23:06:30 -0800148 else:
149 self.parent.packets_pending += 1
Dan Talayco48370102010-03-03 15:17:33 -0800150 # Check if parent is waiting on this (or any) port
Dan Talaycocf26b7a2011-08-05 10:15:35 -0700151 drop_pkt = False
Dan Talayco48370102010-03-03 15:17:33 -0800152 if self.parent.want_pkt:
153 if (not self.parent.want_pkt_port or
Dan Talaycoe226eb12010-02-18 23:06:30 -0800154 self.parent.want_pkt_port == self.port_number):
Dan Talaycocf26b7a2011-08-05 10:15:35 -0700155 if self.parent.exp_pkt:
Ed Swierk506614a2012-03-29 08:16:59 -0700156 if not match_exp_pkt(self.parent.exp_pkt, rcvmsg):
Dan Talaycocf26b7a2011-08-05 10:15:35 -0700157 drop_pkt = True
158 if not drop_pkt:
159 self.parent.got_pkt_port = self.port_number
160 self.parent.want_pkt = False
161 self.parent.pkt_sync.notify()
162 if not drop_pkt:
163 self.packets.append((rcvmsg, rcvtime))
164 self.packets_total += 1
Dan Talaycoe226eb12010-02-18 23:06:30 -0800165 self.pkt_sync.release()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800166
Dan Talayco48370102010-03-03 15:17:33 -0800167 self.logger.info("Thread exit ")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800168
169 def kill(self):
170 """
171 Terminate the running thread
172 """
Dan Talayco48370102010-03-03 15:17:33 -0800173 self.logger.debug("Port monitor kill")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800174 self.running = False
175 try:
176 self.socket.close()
177 except:
Dan Talayco48370102010-03-03 15:17:33 -0800178 self.logger.info("Ignoring dataplane soc shutdown error")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800179
Dan Talaycoe226eb12010-02-18 23:06:30 -0800180 def dequeue(self, use_lock=True):
Dan Talayco3087a462010-02-13 14:01:47 -0800181 """
182 Get the oldest packet in the queue
Dan Talaycoe226eb12010-02-18 23:06:30 -0800183 @param use_lock If True, acquires the packet sync lock (which is
184 really the parent's lock)
185 @return The pair packet, packet time-stamp
Dan Talayco3087a462010-02-13 14:01:47 -0800186 """
Dan Talaycoe226eb12010-02-18 23:06:30 -0800187 if use_lock:
188 self.pkt_sync.acquire()
189 if len(self.packets) > 0:
190 pkt, pkt_time = self.packets.pop(0)
191 self.parent.packets_pending -= 1
192 else:
193 pkt = pkt_time = None
194 if use_lock:
195 self.pkt_sync.release()
Dan Talayco3087a462010-02-13 14:01:47 -0800196 return pkt, pkt_time
197
198 def timestamp_head(self):
199 """
200 Return the timestamp of the head of queue or None if empty
201 """
Dan Talayco710438c2010-02-18 15:16:07 -0800202 rv = None
Dan Talaycoe226eb12010-02-18 23:06:30 -0800203 try:
Dan Talayco710438c2010-02-18 15:16:07 -0800204 rv = self.packets[0][1]
Dan Talaycoe226eb12010-02-18 23:06:30 -0800205 except:
206 rv = None
Dan Talayco710438c2010-02-18 15:16:07 -0800207 return rv
Dan Talayco3087a462010-02-13 14:01:47 -0800208
209 def flush(self):
210 """
211 Clear the packet queue
212 """
Dan Talaycoe226eb12010-02-18 23:06:30 -0800213 self.pkt_sync.acquire()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800214 self.packets_discarded += len(self.packets)
Dan Talaycoe226eb12010-02-18 23:06:30 -0800215 self.parent.packets_pending -= len(self.packets)
Dan Talayco3087a462010-02-13 14:01:47 -0800216 self.packets = []
217 self.packet_times = []
Dan Talaycoe226eb12010-02-18 23:06:30 -0800218 self.pkt_sync.release()
Dan Talayco3087a462010-02-13 14:01:47 -0800219
220
221 def send(self, packet):
222 """
223 Send a packet to the dataplane port
224 @param packet The packet data to send to the port
225 @retval The number of bytes sent
226 """
227 return self.socket.send(packet)
228
229
230 def register(self, handler):
231 """
232 Register a callback function to receive packets from this
Dan Talaycoe226eb12010-02-18 23:06:30 -0800233 port. The callback will be passed the packet, the
234 interface name and the port number (if set) on which the
Dan Talayco3087a462010-02-13 14:01:47 -0800235 packet was received.
236
237 To be implemented
238 """
239 pass
240
Dan Talayco1b3f6902010-02-15 14:14:19 -0800241 def show(self, prefix=''):
242 print prefix + "Name: " + self.interface_name
Dan Talayco710438c2010-02-18 15:16:07 -0800243 print prefix + "Pkts pending: " + str(len(self.packets))
Dan Talayco1b3f6902010-02-15 14:14:19 -0800244 print prefix + "Pkts total: " + str(self.packets_total)
245 print prefix + "socket: " + str(self.socket)
Dan Talaycoe226eb12010-02-18 23:06:30 -0800246
Dan Talayco34089522010-02-07 23:07:41 -0800247
248class DataPlane:
249 """
250 Class defining access primitives to the data plane
251 Controls a list of DataPlanePort objects
252 """
253 def __init__(self):
254 self.port_list = {}
Dan Talaycoe226eb12010-02-18 23:06:30 -0800255 # pkt_sync serves double duty as a regular top level lock and
256 # as a condition variable
257 self.pkt_sync = Condition()
258
259 # These are used to signal async pkt arrival for polling
260 self.want_pkt = False
Dan Talaycocf26b7a2011-08-05 10:15:35 -0700261 self.exp_pkt = None
Dan Talaycoe226eb12010-02-18 23:06:30 -0800262 self.want_pkt_port = None # What port required (or None)
263 self.got_pkt_port = None # On what port received?
264 self.packets_pending = 0 # Total pkts in all port queues
Dan Talayco48370102010-03-03 15:17:33 -0800265 self.logger = logging.getLogger("dataplane")
Dan Talayco34089522010-02-07 23:07:41 -0800266
267 def port_add(self, interface_name, port_number):
268 """
269 Add a port to the dataplane
270 TBD: Max packets for queue?
271 @param interface_name The name of the physical interface like eth1
272 @param port_number The port number used to refer to the port
273 """
Dan Talaycoe226eb12010-02-18 23:06:30 -0800274
Dan Talayco11c26e72010-03-07 22:03:57 -0800275 self.port_list[port_number] = DataPlanePort(interface_name,
Dan Talaycoe226eb12010-02-18 23:06:30 -0800276 port_number, self)
Dan Talayco34089522010-02-07 23:07:41 -0800277 self.port_list[port_number].start()
278
279 def send(self, port_number, packet):
280 """
281 Send a packet to the given port
282 @param port_number The port to send the data to
283 @param packet Raw packet data to send to port
284 """
Dan Talayco11c26e72010-03-07 22:03:57 -0800285 self.logger.debug("Sending %d bytes to port %d" %
286 (len(packet), port_number))
Dan Talayco34089522010-02-07 23:07:41 -0800287 bytes = self.port_list[port_number].send(packet)
288 if bytes != len(packet):
Dan Talayco48370102010-03-03 15:17:33 -0800289 self.logger.error("Unhandled send error, length mismatch %d != %d" %
Dan Talayco1b3f6902010-02-15 14:14:19 -0800290 (bytes, len(packet)))
Dan Talayco34089522010-02-07 23:07:41 -0800291 return bytes
292
293 def flood(self, packet):
294 """
295 Send a packet to all ports
296 @param packet Raw packet data to send to port
297 """
298 for port_number in self.port_list.keys():
299 bytes = self.port_list[port_number].send(packet)
300 if bytes != len(packet):
Dan Talayco48370102010-03-03 15:17:33 -0800301 self.logger.error("Unhandled send error" +
Dan Talayco1b3f6902010-02-15 14:14:19 -0800302 ", port %d, length mismatch %d != %d" %
303 (port_number, bytes, len(packet)))
Dan Talayco34089522010-02-07 23:07:41 -0800304
Dan Talaycoe226eb12010-02-18 23:06:30 -0800305 def _oldest_packet_find(self):
Dan Talayco34089522010-02-07 23:07:41 -0800306 # Find port with oldest packet
307 min_time = 0
308 min_port = -1
309 for port_number in self.port_list.keys():
310 ptime = self.port_list[port_number].timestamp_head()
311 if ptime:
312 if (min_port == -1) or (ptime < min_time):
Dan Talaycoe226eb12010-02-18 23:06:30 -0800313 min_time = ptime
Dan Talayco34089522010-02-07 23:07:41 -0800314 min_port = port_number
Dan Talaycoe226eb12010-02-18 23:06:30 -0800315 oft_assert(min_port != -1, "Could not find port when pkts pending")
Dan Talayco34089522010-02-07 23:07:41 -0800316
Dan Talaycoe226eb12010-02-18 23:06:30 -0800317 return min_port
318
Dan Talaycocf26b7a2011-08-05 10:15:35 -0700319 def poll(self, port_number=None, timeout=None, exp_pkt=None):
Dan Talaycoe226eb12010-02-18 23:06:30 -0800320 """
321 Poll one or all dataplane ports for a packet
322
323 If port_number is given, get the oldest packet from that port.
324 Otherwise, find the port with the oldest packet and return
325 that packet.
326 @param port_number If set, get packet from this port
Dan Talayco11c26e72010-03-07 22:03:57 -0800327 @param timeout If positive and no packet is available, block
Dan Talaycoe226eb12010-02-18 23:06:30 -0800328 until a packet is received or for this many seconds
Dan Talaycocf26b7a2011-08-05 10:15:35 -0700329 @param exp_pkt If not None, look for this packet and ignore any
330 others received. Requires port_number to be specified
Dan Talaycoe226eb12010-02-18 23:06:30 -0800331 @return The triple port_number, packet, pkt_time where packet
332 is received from port_number at time pkt_time. If a timeout
333 occurs, return None, None, None
334 """
335
336 self.pkt_sync.acquire()
337
Dan Talaycocf26b7a2011-08-05 10:15:35 -0700338 if exp_pkt and not port_number:
339 print "WARNING: Dataplane poll: exp_pkt without port number"
340
Dan Talaycoe226eb12010-02-18 23:06:30 -0800341 # Check if requested specific port and it has a packet
342 if port_number and len(self.port_list[port_number].packets) != 0:
Dan Talaycocf26b7a2011-08-05 10:15:35 -0700343 while len(self.port_list[port_number].packets) != 0:
344 pkt, time = self.port_list[port_number].dequeue(use_lock=False)
345 if not exp_pkt:
346 break
Ed Swierk506614a2012-03-29 08:16:59 -0700347 if match_exp_pkt(exp_pkt, pkt):
Dan Talaycocf26b7a2011-08-05 10:15:35 -0700348 break
349 pkt = None # Discard silently
350 if pkt:
351 self.pkt_sync.release()
352 oft_assert(pkt, "Poll: packet not found on port " +
353 str(port_number))
354 return port_number, pkt, time
Dan Talaycoe226eb12010-02-18 23:06:30 -0800355
356 # Check if requested any port and some packet pending
357 if not port_number and self.packets_pending != 0:
358 port = self._oldest_packet_find()
359 pkt, time = self.port_list[port].dequeue(use_lock=False)
360 self.pkt_sync.release()
361 oft_assert(pkt, "Poll: oldest packet not found")
362 return port, pkt, time
363
364 # No packet pending; blocking call requested?
365 if not timeout:
366 self.pkt_sync.release()
Dan Talayco34089522010-02-07 23:07:41 -0800367 return None, None, None
368
Dan Talaycoe226eb12010-02-18 23:06:30 -0800369 # Desired packet isn't available and timeout is specified
370 # Already holding pkt_sync; wait on pkt_sync variable
371 self.want_pkt = True
Dan Talaycocf26b7a2011-08-05 10:15:35 -0700372 self.exp_pkt = exp_pkt
Dan Talaycoe226eb12010-02-18 23:06:30 -0800373 self.want_pkt_port = port_number
374 self.got_pkt_port = None
375 self.pkt_sync.wait(timeout)
376 self.want_pkt = False
Dan Talaycocf26b7a2011-08-05 10:15:35 -0700377 self.exp_pkt = None
Dan Talaycoe226eb12010-02-18 23:06:30 -0800378 if self.got_pkt_port:
379 pkt, time = \
380 self.port_list[self.got_pkt_port].dequeue(use_lock=False)
381 self.pkt_sync.release()
382 oft_assert(pkt, "Poll: pkt reported, but not found at " +
Dan Talayco48370102010-03-03 15:17:33 -0800383 str(self.got_pkt_port))
Dan Talaycoe226eb12010-02-18 23:06:30 -0800384 return self.got_pkt_port, pkt, time
385
386 self.pkt_sync.release()
Dan Talaycoa99c21a2010-05-07 09:23:34 -0700387 self.logger.debug("Poll time out, no packet from " + str(port_number))
Dan Talaycoe226eb12010-02-18 23:06:30 -0800388
389 return None, None, None
Dan Talayco34089522010-02-07 23:07:41 -0800390
Dan Talayco48370102010-03-03 15:17:33 -0800391 def kill(self, join_threads=True):
Dan Talayco1b3f6902010-02-15 14:14:19 -0800392 """
393 Close all sockets for dataplane
Dan Talayco710438c2010-02-18 15:16:07 -0800394 @param join_threads If True call join on each thread
Dan Talayco1b3f6902010-02-15 14:14:19 -0800395 """
Dan Talayco34089522010-02-07 23:07:41 -0800396 for port_number in self.port_list.keys():
397 self.port_list[port_number].kill()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800398 if join_threads:
Dan Talayco48370102010-03-03 15:17:33 -0800399 self.logger.debug("Joining " + str(port_number))
Dan Talayco1b3f6902010-02-15 14:14:19 -0800400 self.port_list[port_number].join()
Dan Talayco34089522010-02-07 23:07:41 -0800401
Dan Talayco48370102010-03-03 15:17:33 -0800402 self.logger.info("DataPlane shutdown")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800403
404 def show(self, prefix=''):
405 print prefix + "Dataplane Controller"
Dan Talaycoe226eb12010-02-18 23:06:30 -0800406 print prefix + "Packets pending" + str(self.packets_pending)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800407 for pnum, port in self.port_list.items():
408 print prefix + "OpenFlow Port Number " + str(pnum)
409 port.show(prefix + ' ')
410