blob: 7ce27381b393f640982615f08901963f94888047 [file] [log] [blame]
Dan Talayco34089522010-02-07 23:07:41 -08001"""
2OpenFlow Test Framework
3
Dan Talayco3087a462010-02-13 14:01:47 -08004DataPlane and DataPlanePort classes
Dan Talayco34089522010-02-07 23:07:41 -08005
6Provide the interface to the control the set of ports being used
7to stimulate the switch under test.
8
9See the class dataplaneport for more details. This class wraps
Dan Talaycoe226eb12010-02-18 23:06:30 -080010a set of those objects allowing general calls and parsing
Dan Talayco34089522010-02-07 23:07:41 -080011configuration.
12
Dan Talayco3087a462010-02-13 14:01:47 -080013@todo Add "filters" for matching packets. Actions supported
14for filters should include a callback or a counter
Dan Talayco34089522010-02-07 23:07:41 -080015"""
16
Dan Talayco3087a462010-02-13 14:01:47 -080017import sys
18import os
19import socket
20import time
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080021import netutils
Dan Talayco3087a462010-02-13 14:01:47 -080022from threading import Thread
23from threading import Lock
Dan Talaycoe226eb12010-02-18 23:06:30 -080024from threading import Condition
Dan Talayco710438c2010-02-18 15:16:07 -080025import select
Dan Talayco48370102010-03-03 15:17:33 -080026import logging
27from oft_assert import oft_assert
Dan Talayco3087a462010-02-13 14:01:47 -080028
Dan Talayco48370102010-03-03 15:17:33 -080029##@todo Find a better home for these identifiers (dataplane)
30RCV_SIZE_DEFAULT = 4096
Dan Talayco3087a462010-02-13 14:01:47 -080031ETH_P_ALL = 0x03
32RCV_TIMEOUT = 10000
Dan Talayco3087a462010-02-13 14:01:47 -080033
34class DataPlanePort(Thread):
35 """
36 Class defining a port monitoring object.
37
38 Control a dataplane port connected to the switch under test.
39 Creates a promiscuous socket on a physical interface.
40 Queues the packets received on that interface with time stamps.
41 Inherits from Thread class as meant to run in background. Also
42 supports polling.
43 Use accessors to dequeue packets for proper synchronization.
Dan Talaycoe226eb12010-02-18 23:06:30 -080044
45 Currently assumes a controlling 'parent' which maintains a
46 common Lock object and a total packet-pending count. May want
47 to decouple that some day.
Dan Talayco3087a462010-02-13 14:01:47 -080048 """
49
Dan Talaycoe226eb12010-02-18 23:06:30 -080050 def __init__(self, interface_name, port_number, parent, max_pkts=1024):
Dan Talayco3087a462010-02-13 14:01:47 -080051 """
52 Set up a port monitor object
53 @param interface_name The name of the physical interface like eth1
Dan Talayco4d065972010-02-18 23:11:32 -080054 @param port_number The port number associated with this port
Dan Talaycoe226eb12010-02-18 23:06:30 -080055 @param parent The controlling dataplane object; for pkt wait CV
Dan Talayco3087a462010-02-13 14:01:47 -080056 @param max_pkts Maximum number of pkts to keep in queue
57 """
58 Thread.__init__(self)
59 self.interface_name = interface_name
60 self.max_pkts = max_pkts
Dan Talayco3087a462010-02-13 14:01:47 -080061 self.packets_total = 0
62 self.packets = []
Dan Talayco1b3f6902010-02-15 14:14:19 -080063 self.packets_discarded = 0
Dan Talaycoe226eb12010-02-18 23:06:30 -080064 self.port_number = port_number
Dan Talayco48370102010-03-03 15:17:33 -080065 logname = "dp-" + interface_name
66 self.logger = logging.getLogger(logname)
Dan Talayco0db53eb2010-03-10 14:00:02 -080067 try:
68 self.socket = self.interface_open(interface_name)
69 except:
70 self.logger.info("Could not open socket")
71 sys.exit(1)
Dan Talayco48370102010-03-03 15:17:33 -080072 self.logger.info("Openned port monitor socket")
Dan Talaycoe226eb12010-02-18 23:06:30 -080073 self.parent = parent
74 self.pkt_sync = self.parent.pkt_sync
Dan Talayco1b3f6902010-02-15 14:14:19 -080075
Dan Talayco3087a462010-02-13 14:01:47 -080076 def interface_open(self, interface_name):
77 """
78 Open a socket in a promiscuous mode for a data connection.
79 @param interface_name port name as a string such as 'eth1'
80 @retval s socket
81 """
Dan Talaycoe226eb12010-02-18 23:06:30 -080082 s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
Dan Talayco3087a462010-02-13 14:01:47 -080083 socket.htons(ETH_P_ALL))
84 s.bind((interface_name, 0))
Dan Talayco1b3f6902010-02-15 14:14:19 -080085 netutils.set_promisc(s, interface_name)
Dan Talayco3087a462010-02-13 14:01:47 -080086 s.settimeout(RCV_TIMEOUT)
87 return s
88
Dan Talayco3087a462010-02-13 14:01:47 -080089 def run(self):
90 """
91 Activity function for class
92 """
93 self.running = True
Dan Talayco710438c2010-02-18 15:16:07 -080094 self.socs = [self.socket]
95 error_warned = False # Have we warned about error?
Dan Talayco3087a462010-02-13 14:01:47 -080096 while self.running:
97 try:
Dan Talayco710438c2010-02-18 15:16:07 -080098 sel_in, sel_out, sel_err = \
99 select.select(self.socs, [], [], 1)
100 except:
101 print sys.exc_info()
Dan Talayco48370102010-03-03 15:17:33 -0800102 self.logger.error("Select error, exiting")
103 break
Dan Talayco710438c2010-02-18 15:16:07 -0800104
105 if not self.running:
106 break
107
Dan Talayco48370102010-03-03 15:17:33 -0800108 if (sel_in is None) or (len(sel_in) == 0):
Dan Talayco710438c2010-02-18 15:16:07 -0800109 continue
110
111 try:
Dan Talayco48370102010-03-03 15:17:33 -0800112 rcvmsg = self.socket.recv(RCV_SIZE_DEFAULT)
Dan Talayco3087a462010-02-13 14:01:47 -0800113 except socket.error:
Dan Talayco710438c2010-02-18 15:16:07 -0800114 if not error_warned:
Dan Talayco48370102010-03-03 15:17:33 -0800115 self.logger.info("Socket error on recv")
Dan Talayco710438c2010-02-18 15:16:07 -0800116 error_warned = True
Dan Talayco1b3f6902010-02-15 14:14:19 -0800117 continue
Dan Talayco710438c2010-02-18 15:16:07 -0800118
Dan Talayco1b3f6902010-02-15 14:14:19 -0800119 if len(rcvmsg) == 0:
Dan Talayco48370102010-03-03 15:17:33 -0800120 self.logger.info("Zero len pkt rcvd")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800121 self.kill()
Dan Talayco3087a462010-02-13 14:01:47 -0800122 break
123
Dan Talayco1b3f6902010-02-15 14:14:19 -0800124 rcvtime = time.clock()
Dan Talayco48370102010-03-03 15:17:33 -0800125 self.logger.debug("Pkt len " + str(len(rcvmsg)) +
Ed Swierk4e200302012-03-19 14:53:31 -0700126 " in at " + str(rcvtime) + " on port " +
127 str(self.port_number))
Dan Talayco1b3f6902010-02-15 14:14:19 -0800128
Dan Talaycoe226eb12010-02-18 23:06:30 -0800129 # Enqueue packet
130 self.pkt_sync.acquire()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800131 if len(self.packets) >= self.max_pkts:
Dan Talaycoe226eb12010-02-18 23:06:30 -0800132 # Queue full, throw away oldest
Dan Talayco1b3f6902010-02-15 14:14:19 -0800133 self.packets.pop(0)
134 self.packets_discarded += 1
Dan Talaycoe226eb12010-02-18 23:06:30 -0800135 else:
136 self.parent.packets_pending += 1
Dan Talayco48370102010-03-03 15:17:33 -0800137 # Check if parent is waiting on this (or any) port
Dan Talaycocf26b7a2011-08-05 10:15:35 -0700138 drop_pkt = False
Dan Talayco48370102010-03-03 15:17:33 -0800139 if self.parent.want_pkt:
140 if (not self.parent.want_pkt_port or
Dan Talaycoe226eb12010-02-18 23:06:30 -0800141 self.parent.want_pkt_port == self.port_number):
Dan Talaycocf26b7a2011-08-05 10:15:35 -0700142 if self.parent.exp_pkt:
143 if str(self.parent.exp_pkt) != str(rcvmsg):
144 drop_pkt = True
145 if not drop_pkt:
146 self.parent.got_pkt_port = self.port_number
147 self.parent.want_pkt = False
148 self.parent.pkt_sync.notify()
149 if not drop_pkt:
150 self.packets.append((rcvmsg, rcvtime))
151 self.packets_total += 1
Dan Talaycoe226eb12010-02-18 23:06:30 -0800152 self.pkt_sync.release()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800153
Dan Talayco48370102010-03-03 15:17:33 -0800154 self.logger.info("Thread exit ")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800155
156 def kill(self):
157 """
158 Terminate the running thread
159 """
Dan Talayco48370102010-03-03 15:17:33 -0800160 self.logger.debug("Port monitor kill")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800161 self.running = False
162 try:
163 self.socket.close()
164 except:
Dan Talayco48370102010-03-03 15:17:33 -0800165 self.logger.info("Ignoring dataplane soc shutdown error")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800166
Dan Talaycoe226eb12010-02-18 23:06:30 -0800167 def dequeue(self, use_lock=True):
Dan Talayco3087a462010-02-13 14:01:47 -0800168 """
169 Get the oldest packet in the queue
Dan Talaycoe226eb12010-02-18 23:06:30 -0800170 @param use_lock If True, acquires the packet sync lock (which is
171 really the parent's lock)
172 @return The pair packet, packet time-stamp
Dan Talayco3087a462010-02-13 14:01:47 -0800173 """
Dan Talaycoe226eb12010-02-18 23:06:30 -0800174 if use_lock:
175 self.pkt_sync.acquire()
176 if len(self.packets) > 0:
177 pkt, pkt_time = self.packets.pop(0)
178 self.parent.packets_pending -= 1
179 else:
180 pkt = pkt_time = None
181 if use_lock:
182 self.pkt_sync.release()
Dan Talayco3087a462010-02-13 14:01:47 -0800183 return pkt, pkt_time
184
185 def timestamp_head(self):
186 """
187 Return the timestamp of the head of queue or None if empty
188 """
Dan Talayco710438c2010-02-18 15:16:07 -0800189 rv = None
Dan Talaycoe226eb12010-02-18 23:06:30 -0800190 try:
Dan Talayco710438c2010-02-18 15:16:07 -0800191 rv = self.packets[0][1]
Dan Talaycoe226eb12010-02-18 23:06:30 -0800192 except:
193 rv = None
Dan Talayco710438c2010-02-18 15:16:07 -0800194 return rv
Dan Talayco3087a462010-02-13 14:01:47 -0800195
196 def flush(self):
197 """
198 Clear the packet queue
199 """
Dan Talaycoe226eb12010-02-18 23:06:30 -0800200 self.pkt_sync.acquire()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800201 self.packets_discarded += len(self.packets)
Dan Talaycoe226eb12010-02-18 23:06:30 -0800202 self.parent.packets_pending -= len(self.packets)
Dan Talayco3087a462010-02-13 14:01:47 -0800203 self.packets = []
204 self.packet_times = []
Dan Talaycoe226eb12010-02-18 23:06:30 -0800205 self.pkt_sync.release()
Dan Talayco3087a462010-02-13 14:01:47 -0800206
207
208 def send(self, packet):
209 """
210 Send a packet to the dataplane port
211 @param packet The packet data to send to the port
212 @retval The number of bytes sent
213 """
214 return self.socket.send(packet)
215
216
217 def register(self, handler):
218 """
219 Register a callback function to receive packets from this
Dan Talaycoe226eb12010-02-18 23:06:30 -0800220 port. The callback will be passed the packet, the
221 interface name and the port number (if set) on which the
Dan Talayco3087a462010-02-13 14:01:47 -0800222 packet was received.
223
224 To be implemented
225 """
226 pass
227
Dan Talayco1b3f6902010-02-15 14:14:19 -0800228 def show(self, prefix=''):
229 print prefix + "Name: " + self.interface_name
Dan Talayco710438c2010-02-18 15:16:07 -0800230 print prefix + "Pkts pending: " + str(len(self.packets))
Dan Talayco1b3f6902010-02-15 14:14:19 -0800231 print prefix + "Pkts total: " + str(self.packets_total)
232 print prefix + "socket: " + str(self.socket)
Dan Talaycoe226eb12010-02-18 23:06:30 -0800233
Dan Talayco34089522010-02-07 23:07:41 -0800234
235class DataPlane:
236 """
237 Class defining access primitives to the data plane
238 Controls a list of DataPlanePort objects
239 """
240 def __init__(self):
241 self.port_list = {}
Dan Talaycoe226eb12010-02-18 23:06:30 -0800242 # pkt_sync serves double duty as a regular top level lock and
243 # as a condition variable
244 self.pkt_sync = Condition()
245
246 # These are used to signal async pkt arrival for polling
247 self.want_pkt = False
Dan Talaycocf26b7a2011-08-05 10:15:35 -0700248 self.exp_pkt = None
Dan Talaycoe226eb12010-02-18 23:06:30 -0800249 self.want_pkt_port = None # What port required (or None)
250 self.got_pkt_port = None # On what port received?
251 self.packets_pending = 0 # Total pkts in all port queues
Dan Talayco48370102010-03-03 15:17:33 -0800252 self.logger = logging.getLogger("dataplane")
Dan Talayco34089522010-02-07 23:07:41 -0800253
254 def port_add(self, interface_name, port_number):
255 """
256 Add a port to the dataplane
257 TBD: Max packets for queue?
258 @param interface_name The name of the physical interface like eth1
259 @param port_number The port number used to refer to the port
260 """
Dan Talaycoe226eb12010-02-18 23:06:30 -0800261
Dan Talayco11c26e72010-03-07 22:03:57 -0800262 self.port_list[port_number] = DataPlanePort(interface_name,
Dan Talaycoe226eb12010-02-18 23:06:30 -0800263 port_number, self)
Dan Talayco34089522010-02-07 23:07:41 -0800264 self.port_list[port_number].start()
265
266 def send(self, port_number, packet):
267 """
268 Send a packet to the given port
269 @param port_number The port to send the data to
270 @param packet Raw packet data to send to port
271 """
Dan Talayco11c26e72010-03-07 22:03:57 -0800272 self.logger.debug("Sending %d bytes to port %d" %
273 (len(packet), port_number))
Dan Talayco34089522010-02-07 23:07:41 -0800274 bytes = self.port_list[port_number].send(packet)
275 if bytes != len(packet):
Dan Talayco48370102010-03-03 15:17:33 -0800276 self.logger.error("Unhandled send error, length mismatch %d != %d" %
Dan Talayco1b3f6902010-02-15 14:14:19 -0800277 (bytes, len(packet)))
Dan Talayco34089522010-02-07 23:07:41 -0800278 return bytes
279
280 def flood(self, packet):
281 """
282 Send a packet to all ports
283 @param packet Raw packet data to send to port
284 """
285 for port_number in self.port_list.keys():
286 bytes = self.port_list[port_number].send(packet)
287 if bytes != len(packet):
Dan Talayco48370102010-03-03 15:17:33 -0800288 self.logger.error("Unhandled send error" +
Dan Talayco1b3f6902010-02-15 14:14:19 -0800289 ", port %d, length mismatch %d != %d" %
290 (port_number, bytes, len(packet)))
Dan Talayco34089522010-02-07 23:07:41 -0800291
Dan Talaycoe226eb12010-02-18 23:06:30 -0800292 def _oldest_packet_find(self):
Dan Talayco34089522010-02-07 23:07:41 -0800293 # Find port with oldest packet
294 min_time = 0
295 min_port = -1
296 for port_number in self.port_list.keys():
297 ptime = self.port_list[port_number].timestamp_head()
298 if ptime:
299 if (min_port == -1) or (ptime < min_time):
Dan Talaycoe226eb12010-02-18 23:06:30 -0800300 min_time = ptime
Dan Talayco34089522010-02-07 23:07:41 -0800301 min_port = port_number
Dan Talaycoe226eb12010-02-18 23:06:30 -0800302 oft_assert(min_port != -1, "Could not find port when pkts pending")
Dan Talayco34089522010-02-07 23:07:41 -0800303
Dan Talaycoe226eb12010-02-18 23:06:30 -0800304 return min_port
305
Dan Talaycocf26b7a2011-08-05 10:15:35 -0700306 def poll(self, port_number=None, timeout=None, exp_pkt=None):
Dan Talaycoe226eb12010-02-18 23:06:30 -0800307 """
308 Poll one or all dataplane ports for a packet
309
310 If port_number is given, get the oldest packet from that port.
311 Otherwise, find the port with the oldest packet and return
312 that packet.
313 @param port_number If set, get packet from this port
Dan Talayco11c26e72010-03-07 22:03:57 -0800314 @param timeout If positive and no packet is available, block
Dan Talaycoe226eb12010-02-18 23:06:30 -0800315 until a packet is received or for this many seconds
Dan Talaycocf26b7a2011-08-05 10:15:35 -0700316 @param exp_pkt If not None, look for this packet and ignore any
317 others received. Requires port_number to be specified
Dan Talaycoe226eb12010-02-18 23:06:30 -0800318 @return The triple port_number, packet, pkt_time where packet
319 is received from port_number at time pkt_time. If a timeout
320 occurs, return None, None, None
321 """
322
323 self.pkt_sync.acquire()
324
Dan Talaycocf26b7a2011-08-05 10:15:35 -0700325 if exp_pkt and not port_number:
326 print "WARNING: Dataplane poll: exp_pkt without port number"
327
Dan Talaycoe226eb12010-02-18 23:06:30 -0800328 # Check if requested specific port and it has a packet
329 if port_number and len(self.port_list[port_number].packets) != 0:
Dan Talaycocf26b7a2011-08-05 10:15:35 -0700330 while len(self.port_list[port_number].packets) != 0:
331 pkt, time = self.port_list[port_number].dequeue(use_lock=False)
332 if not exp_pkt:
333 break
334 if str(pkt) == str(exp_pkt):
335 break
336 pkt = None # Discard silently
337 if pkt:
338 self.pkt_sync.release()
339 oft_assert(pkt, "Poll: packet not found on port " +
340 str(port_number))
341 return port_number, pkt, time
Dan Talaycoe226eb12010-02-18 23:06:30 -0800342
343 # Check if requested any port and some packet pending
344 if not port_number and self.packets_pending != 0:
345 port = self._oldest_packet_find()
346 pkt, time = self.port_list[port].dequeue(use_lock=False)
347 self.pkt_sync.release()
348 oft_assert(pkt, "Poll: oldest packet not found")
349 return port, pkt, time
350
351 # No packet pending; blocking call requested?
352 if not timeout:
353 self.pkt_sync.release()
Dan Talayco34089522010-02-07 23:07:41 -0800354 return None, None, None
355
Dan Talaycoe226eb12010-02-18 23:06:30 -0800356 # Desired packet isn't available and timeout is specified
357 # Already holding pkt_sync; wait on pkt_sync variable
358 self.want_pkt = True
Dan Talaycocf26b7a2011-08-05 10:15:35 -0700359 self.exp_pkt = exp_pkt
Dan Talaycoe226eb12010-02-18 23:06:30 -0800360 self.want_pkt_port = port_number
361 self.got_pkt_port = None
362 self.pkt_sync.wait(timeout)
363 self.want_pkt = False
Dan Talaycocf26b7a2011-08-05 10:15:35 -0700364 self.exp_pkt = None
Dan Talaycoe226eb12010-02-18 23:06:30 -0800365 if self.got_pkt_port:
366 pkt, time = \
367 self.port_list[self.got_pkt_port].dequeue(use_lock=False)
368 self.pkt_sync.release()
369 oft_assert(pkt, "Poll: pkt reported, but not found at " +
Dan Talayco48370102010-03-03 15:17:33 -0800370 str(self.got_pkt_port))
Dan Talaycoe226eb12010-02-18 23:06:30 -0800371 return self.got_pkt_port, pkt, time
372
373 self.pkt_sync.release()
Dan Talaycoa99c21a2010-05-07 09:23:34 -0700374 self.logger.debug("Poll time out, no packet from " + str(port_number))
Dan Talaycoe226eb12010-02-18 23:06:30 -0800375
376 return None, None, None
Dan Talayco34089522010-02-07 23:07:41 -0800377
Dan Talayco48370102010-03-03 15:17:33 -0800378 def kill(self, join_threads=True):
Dan Talayco1b3f6902010-02-15 14:14:19 -0800379 """
380 Close all sockets for dataplane
Dan Talayco710438c2010-02-18 15:16:07 -0800381 @param join_threads If True call join on each thread
Dan Talayco1b3f6902010-02-15 14:14:19 -0800382 """
Dan Talayco34089522010-02-07 23:07:41 -0800383 for port_number in self.port_list.keys():
384 self.port_list[port_number].kill()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800385 if join_threads:
Dan Talayco48370102010-03-03 15:17:33 -0800386 self.logger.debug("Joining " + str(port_number))
Dan Talayco1b3f6902010-02-15 14:14:19 -0800387 self.port_list[port_number].join()
Dan Talayco34089522010-02-07 23:07:41 -0800388
Dan Talayco48370102010-03-03 15:17:33 -0800389 self.logger.info("DataPlane shutdown")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800390
391 def show(self, prefix=''):
392 print prefix + "Dataplane Controller"
Dan Talaycoe226eb12010-02-18 23:06:30 -0800393 print prefix + "Packets pending" + str(self.packets_pending)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800394 for pnum, port in self.port_list.items():
395 print prefix + "OpenFlow Port Number " + str(pnum)
396 port.show(prefix + ' ')
397