blob: 7d788cbb90106921b1b505b36c4945444904233c [file] [log] [blame]
Dan Talayco34089522010-02-07 23:07:41 -08001"""
2OpenFlow Test Framework
3
Dan Talayco3087a462010-02-13 14:01:47 -08004DataPlane and DataPlanePort classes
Dan Talayco34089522010-02-07 23:07:41 -08005
6Provide the interface to the control the set of ports being used
7to stimulate the switch under test.
8
9See the class dataplaneport for more details. This class wraps
Dan Talaycoe226eb12010-02-18 23:06:30 -080010a set of those objects allowing general calls and parsing
Dan Talayco34089522010-02-07 23:07:41 -080011configuration.
12
Dan Talayco3087a462010-02-13 14:01:47 -080013@todo Add "filters" for matching packets. Actions supported
14for filters should include a callback or a counter
Dan Talayco34089522010-02-07 23:07:41 -080015"""
16
Dan Talayco3087a462010-02-13 14:01:47 -080017import sys
18import os
19import socket
20import time
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080021import netutils
Dan Talayco3087a462010-02-13 14:01:47 -080022from threading import Thread
23from threading import Lock
Dan Talaycoe226eb12010-02-18 23:06:30 -080024from threading import Condition
Dan Talayco710438c2010-02-18 15:16:07 -080025import select
Dan Talayco48370102010-03-03 15:17:33 -080026import logging
27from oft_assert import oft_assert
Dan Talayco3087a462010-02-13 14:01:47 -080028
Dan Talayco48370102010-03-03 15:17:33 -080029##@todo Find a better home for these identifiers (dataplane)
30RCV_SIZE_DEFAULT = 4096
Dan Talayco3087a462010-02-13 14:01:47 -080031ETH_P_ALL = 0x03
32RCV_TIMEOUT = 10000
Dan Talayco3087a462010-02-13 14:01:47 -080033
34class DataPlanePort(Thread):
35 """
36 Class defining a port monitoring object.
37
38 Control a dataplane port connected to the switch under test.
39 Creates a promiscuous socket on a physical interface.
40 Queues the packets received on that interface with time stamps.
41 Inherits from Thread class as meant to run in background. Also
42 supports polling.
43 Use accessors to dequeue packets for proper synchronization.
Dan Talaycoe226eb12010-02-18 23:06:30 -080044
45 Currently assumes a controlling 'parent' which maintains a
46 common Lock object and a total packet-pending count. May want
47 to decouple that some day.
Dan Talayco3087a462010-02-13 14:01:47 -080048 """
49
Dan Talaycoe226eb12010-02-18 23:06:30 -080050 def __init__(self, interface_name, port_number, parent, max_pkts=1024):
Dan Talayco3087a462010-02-13 14:01:47 -080051 """
52 Set up a port monitor object
53 @param interface_name The name of the physical interface like eth1
Dan Talayco4d065972010-02-18 23:11:32 -080054 @param port_number The port number associated with this port
Dan Talaycoe226eb12010-02-18 23:06:30 -080055 @param parent The controlling dataplane object; for pkt wait CV
Dan Talayco3087a462010-02-13 14:01:47 -080056 @param max_pkts Maximum number of pkts to keep in queue
57 """
58 Thread.__init__(self)
59 self.interface_name = interface_name
60 self.max_pkts = max_pkts
Dan Talayco3087a462010-02-13 14:01:47 -080061 self.packets_total = 0
62 self.packets = []
Dan Talayco1b3f6902010-02-15 14:14:19 -080063 self.packets_discarded = 0
Dan Talaycoe226eb12010-02-18 23:06:30 -080064 self.port_number = port_number
Dan Talayco48370102010-03-03 15:17:33 -080065 logname = "dp-" + interface_name
66 self.logger = logging.getLogger(logname)
Dan Talayco0db53eb2010-03-10 14:00:02 -080067 try:
68 self.socket = self.interface_open(interface_name)
69 except:
70 self.logger.info("Could not open socket")
71 sys.exit(1)
Dan Talayco48370102010-03-03 15:17:33 -080072 self.logger.info("Openned port monitor socket")
Dan Talaycoe226eb12010-02-18 23:06:30 -080073 self.parent = parent
74 self.pkt_sync = self.parent.pkt_sync
Dan Talayco1b3f6902010-02-15 14:14:19 -080075
Dan Talayco3087a462010-02-13 14:01:47 -080076 def interface_open(self, interface_name):
77 """
78 Open a socket in a promiscuous mode for a data connection.
79 @param interface_name port name as a string such as 'eth1'
80 @retval s socket
81 """
Dan Talaycoe226eb12010-02-18 23:06:30 -080082 s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
Dan Talayco3087a462010-02-13 14:01:47 -080083 socket.htons(ETH_P_ALL))
84 s.bind((interface_name, 0))
Dan Talayco1b3f6902010-02-15 14:14:19 -080085 netutils.set_promisc(s, interface_name)
Dan Talayco3087a462010-02-13 14:01:47 -080086 s.settimeout(RCV_TIMEOUT)
87 return s
88
Dan Talayco3087a462010-02-13 14:01:47 -080089 def run(self):
90 """
91 Activity function for class
92 """
93 self.running = True
Dan Talayco710438c2010-02-18 15:16:07 -080094 self.socs = [self.socket]
95 error_warned = False # Have we warned about error?
Dan Talayco3087a462010-02-13 14:01:47 -080096 while self.running:
97 try:
Dan Talayco710438c2010-02-18 15:16:07 -080098 sel_in, sel_out, sel_err = \
99 select.select(self.socs, [], [], 1)
100 except:
101 print sys.exc_info()
Dan Talayco48370102010-03-03 15:17:33 -0800102 self.logger.error("Select error, exiting")
103 break
Dan Talayco710438c2010-02-18 15:16:07 -0800104
105 if not self.running:
106 break
107
Dan Talayco48370102010-03-03 15:17:33 -0800108 if (sel_in is None) or (len(sel_in) == 0):
Dan Talayco710438c2010-02-18 15:16:07 -0800109 continue
110
111 try:
Dan Talayco48370102010-03-03 15:17:33 -0800112 rcvmsg = self.socket.recv(RCV_SIZE_DEFAULT)
Dan Talayco3087a462010-02-13 14:01:47 -0800113 except socket.error:
Dan Talayco710438c2010-02-18 15:16:07 -0800114 if not error_warned:
Dan Talayco48370102010-03-03 15:17:33 -0800115 self.logger.info("Socket error on recv")
Dan Talayco710438c2010-02-18 15:16:07 -0800116 error_warned = True
Dan Talayco1b3f6902010-02-15 14:14:19 -0800117 continue
Dan Talayco710438c2010-02-18 15:16:07 -0800118
Dan Talayco1b3f6902010-02-15 14:14:19 -0800119 if len(rcvmsg) == 0:
Dan Talayco48370102010-03-03 15:17:33 -0800120 self.logger.info("Zero len pkt rcvd")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800121 self.kill()
Dan Talayco3087a462010-02-13 14:01:47 -0800122 break
123
Dan Talayco1b3f6902010-02-15 14:14:19 -0800124 rcvtime = time.clock()
Dan Talayco48370102010-03-03 15:17:33 -0800125 self.logger.debug("Pkt len " + str(len(rcvmsg)) +
Dan Talayco710438c2010-02-18 15:16:07 -0800126 " in at " + str(rcvtime))
Dan Talayco1b3f6902010-02-15 14:14:19 -0800127
Dan Talaycoe226eb12010-02-18 23:06:30 -0800128 # Enqueue packet
129 self.pkt_sync.acquire()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800130 if len(self.packets) >= self.max_pkts:
Dan Talaycoe226eb12010-02-18 23:06:30 -0800131 # Queue full, throw away oldest
Dan Talayco1b3f6902010-02-15 14:14:19 -0800132 self.packets.pop(0)
133 self.packets_discarded += 1
Dan Talaycoe226eb12010-02-18 23:06:30 -0800134 else:
135 self.parent.packets_pending += 1
Dan Talayco48370102010-03-03 15:17:33 -0800136 # Check if parent is waiting on this (or any) port
Dan Talaycocf26b7a2011-08-05 10:15:35 -0700137 drop_pkt = False
Dan Talayco48370102010-03-03 15:17:33 -0800138 if self.parent.want_pkt:
139 if (not self.parent.want_pkt_port or
Dan Talaycoe226eb12010-02-18 23:06:30 -0800140 self.parent.want_pkt_port == self.port_number):
Dan Talaycocf26b7a2011-08-05 10:15:35 -0700141 if self.parent.exp_pkt:
142 if str(self.parent.exp_pkt) != str(rcvmsg):
143 drop_pkt = True
144 if not drop_pkt:
145 self.parent.got_pkt_port = self.port_number
146 self.parent.want_pkt = False
147 self.parent.pkt_sync.notify()
148 if not drop_pkt:
149 self.packets.append((rcvmsg, rcvtime))
150 self.packets_total += 1
Dan Talaycoe226eb12010-02-18 23:06:30 -0800151 self.pkt_sync.release()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800152
Dan Talayco48370102010-03-03 15:17:33 -0800153 self.logger.info("Thread exit ")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800154
155 def kill(self):
156 """
157 Terminate the running thread
158 """
Dan Talayco48370102010-03-03 15:17:33 -0800159 self.logger.debug("Port monitor kill")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800160 self.running = False
161 try:
162 self.socket.close()
163 except:
Dan Talayco48370102010-03-03 15:17:33 -0800164 self.logger.info("Ignoring dataplane soc shutdown error")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800165
Dan Talaycoe226eb12010-02-18 23:06:30 -0800166 def dequeue(self, use_lock=True):
Dan Talayco3087a462010-02-13 14:01:47 -0800167 """
168 Get the oldest packet in the queue
Dan Talaycoe226eb12010-02-18 23:06:30 -0800169 @param use_lock If True, acquires the packet sync lock (which is
170 really the parent's lock)
171 @return The pair packet, packet time-stamp
Dan Talayco3087a462010-02-13 14:01:47 -0800172 """
Dan Talaycoe226eb12010-02-18 23:06:30 -0800173 if use_lock:
174 self.pkt_sync.acquire()
175 if len(self.packets) > 0:
176 pkt, pkt_time = self.packets.pop(0)
177 self.parent.packets_pending -= 1
178 else:
179 pkt = pkt_time = None
180 if use_lock:
181 self.pkt_sync.release()
Dan Talayco3087a462010-02-13 14:01:47 -0800182 return pkt, pkt_time
183
184 def timestamp_head(self):
185 """
186 Return the timestamp of the head of queue or None if empty
187 """
Dan Talayco710438c2010-02-18 15:16:07 -0800188 rv = None
Dan Talaycoe226eb12010-02-18 23:06:30 -0800189 try:
Dan Talayco710438c2010-02-18 15:16:07 -0800190 rv = self.packets[0][1]
Dan Talaycoe226eb12010-02-18 23:06:30 -0800191 except:
192 rv = None
Dan Talayco710438c2010-02-18 15:16:07 -0800193 return rv
Dan Talayco3087a462010-02-13 14:01:47 -0800194
195 def flush(self):
196 """
197 Clear the packet queue
198 """
Dan Talaycoe226eb12010-02-18 23:06:30 -0800199 self.pkt_sync.acquire()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800200 self.packets_discarded += len(self.packets)
Dan Talaycoe226eb12010-02-18 23:06:30 -0800201 self.parent.packets_pending -= len(self.packets)
Dan Talayco3087a462010-02-13 14:01:47 -0800202 self.packets = []
203 self.packet_times = []
Dan Talaycoe226eb12010-02-18 23:06:30 -0800204 self.pkt_sync.release()
Dan Talayco3087a462010-02-13 14:01:47 -0800205
206
207 def send(self, packet):
208 """
209 Send a packet to the dataplane port
210 @param packet The packet data to send to the port
211 @retval The number of bytes sent
212 """
213 return self.socket.send(packet)
214
215
216 def register(self, handler):
217 """
218 Register a callback function to receive packets from this
Dan Talaycoe226eb12010-02-18 23:06:30 -0800219 port. The callback will be passed the packet, the
220 interface name and the port number (if set) on which the
Dan Talayco3087a462010-02-13 14:01:47 -0800221 packet was received.
222
223 To be implemented
224 """
225 pass
226
Dan Talayco1b3f6902010-02-15 14:14:19 -0800227 def show(self, prefix=''):
228 print prefix + "Name: " + self.interface_name
Dan Talayco710438c2010-02-18 15:16:07 -0800229 print prefix + "Pkts pending: " + str(len(self.packets))
Dan Talayco1b3f6902010-02-15 14:14:19 -0800230 print prefix + "Pkts total: " + str(self.packets_total)
231 print prefix + "socket: " + str(self.socket)
Dan Talaycoe226eb12010-02-18 23:06:30 -0800232
Dan Talayco34089522010-02-07 23:07:41 -0800233
234class DataPlane:
235 """
236 Class defining access primitives to the data plane
237 Controls a list of DataPlanePort objects
238 """
239 def __init__(self):
240 self.port_list = {}
Dan Talaycoe226eb12010-02-18 23:06:30 -0800241 # pkt_sync serves double duty as a regular top level lock and
242 # as a condition variable
243 self.pkt_sync = Condition()
244
245 # These are used to signal async pkt arrival for polling
246 self.want_pkt = False
Dan Talaycocf26b7a2011-08-05 10:15:35 -0700247 self.exp_pkt = None
Dan Talaycoe226eb12010-02-18 23:06:30 -0800248 self.want_pkt_port = None # What port required (or None)
249 self.got_pkt_port = None # On what port received?
250 self.packets_pending = 0 # Total pkts in all port queues
Dan Talayco48370102010-03-03 15:17:33 -0800251 self.logger = logging.getLogger("dataplane")
Dan Talayco34089522010-02-07 23:07:41 -0800252
253 def port_add(self, interface_name, port_number):
254 """
255 Add a port to the dataplane
256 TBD: Max packets for queue?
257 @param interface_name The name of the physical interface like eth1
258 @param port_number The port number used to refer to the port
259 """
Dan Talaycoe226eb12010-02-18 23:06:30 -0800260
Dan Talayco11c26e72010-03-07 22:03:57 -0800261 self.port_list[port_number] = DataPlanePort(interface_name,
Dan Talaycoe226eb12010-02-18 23:06:30 -0800262 port_number, self)
Dan Talayco34089522010-02-07 23:07:41 -0800263 self.port_list[port_number].start()
264
265 def send(self, port_number, packet):
266 """
267 Send a packet to the given port
268 @param port_number The port to send the data to
269 @param packet Raw packet data to send to port
270 """
Dan Talayco11c26e72010-03-07 22:03:57 -0800271 self.logger.debug("Sending %d bytes to port %d" %
272 (len(packet), port_number))
Dan Talayco34089522010-02-07 23:07:41 -0800273 bytes = self.port_list[port_number].send(packet)
274 if bytes != len(packet):
Dan Talayco48370102010-03-03 15:17:33 -0800275 self.logger.error("Unhandled send error, length mismatch %d != %d" %
Dan Talayco1b3f6902010-02-15 14:14:19 -0800276 (bytes, len(packet)))
Dan Talayco34089522010-02-07 23:07:41 -0800277 return bytes
278
279 def flood(self, packet):
280 """
281 Send a packet to all ports
282 @param packet Raw packet data to send to port
283 """
284 for port_number in self.port_list.keys():
285 bytes = self.port_list[port_number].send(packet)
286 if bytes != len(packet):
Dan Talayco48370102010-03-03 15:17:33 -0800287 self.logger.error("Unhandled send error" +
Dan Talayco1b3f6902010-02-15 14:14:19 -0800288 ", port %d, length mismatch %d != %d" %
289 (port_number, bytes, len(packet)))
Dan Talayco34089522010-02-07 23:07:41 -0800290
Dan Talaycoe226eb12010-02-18 23:06:30 -0800291 def _oldest_packet_find(self):
Dan Talayco34089522010-02-07 23:07:41 -0800292 # Find port with oldest packet
293 min_time = 0
294 min_port = -1
295 for port_number in self.port_list.keys():
296 ptime = self.port_list[port_number].timestamp_head()
297 if ptime:
298 if (min_port == -1) or (ptime < min_time):
Dan Talaycoe226eb12010-02-18 23:06:30 -0800299 min_time = ptime
Dan Talayco34089522010-02-07 23:07:41 -0800300 min_port = port_number
Dan Talaycoe226eb12010-02-18 23:06:30 -0800301 oft_assert(min_port != -1, "Could not find port when pkts pending")
Dan Talayco34089522010-02-07 23:07:41 -0800302
Dan Talaycoe226eb12010-02-18 23:06:30 -0800303 return min_port
304
Dan Talaycocf26b7a2011-08-05 10:15:35 -0700305 def poll(self, port_number=None, timeout=None, exp_pkt=None):
Dan Talaycoe226eb12010-02-18 23:06:30 -0800306 """
307 Poll one or all dataplane ports for a packet
308
309 If port_number is given, get the oldest packet from that port.
310 Otherwise, find the port with the oldest packet and return
311 that packet.
312 @param port_number If set, get packet from this port
Dan Talayco11c26e72010-03-07 22:03:57 -0800313 @param timeout If positive and no packet is available, block
Dan Talaycoe226eb12010-02-18 23:06:30 -0800314 until a packet is received or for this many seconds
Dan Talaycocf26b7a2011-08-05 10:15:35 -0700315 @param exp_pkt If not None, look for this packet and ignore any
316 others received. Requires port_number to be specified
Dan Talaycoe226eb12010-02-18 23:06:30 -0800317 @return The triple port_number, packet, pkt_time where packet
318 is received from port_number at time pkt_time. If a timeout
319 occurs, return None, None, None
320 """
321
322 self.pkt_sync.acquire()
323
Dan Talaycocf26b7a2011-08-05 10:15:35 -0700324 if exp_pkt and not port_number:
325 print "WARNING: Dataplane poll: exp_pkt without port number"
326
Dan Talaycoe226eb12010-02-18 23:06:30 -0800327 # Check if requested specific port and it has a packet
328 if port_number and len(self.port_list[port_number].packets) != 0:
Dan Talaycocf26b7a2011-08-05 10:15:35 -0700329 while len(self.port_list[port_number].packets) != 0:
330 pkt, time = self.port_list[port_number].dequeue(use_lock=False)
331 if not exp_pkt:
332 break
333 if str(pkt) == str(exp_pkt):
334 break
335 pkt = None # Discard silently
336 if pkt:
337 self.pkt_sync.release()
338 oft_assert(pkt, "Poll: packet not found on port " +
339 str(port_number))
340 return port_number, pkt, time
Dan Talaycoe226eb12010-02-18 23:06:30 -0800341
342 # Check if requested any port and some packet pending
343 if not port_number and self.packets_pending != 0:
344 port = self._oldest_packet_find()
345 pkt, time = self.port_list[port].dequeue(use_lock=False)
346 self.pkt_sync.release()
347 oft_assert(pkt, "Poll: oldest packet not found")
348 return port, pkt, time
349
350 # No packet pending; blocking call requested?
351 if not timeout:
352 self.pkt_sync.release()
Dan Talayco34089522010-02-07 23:07:41 -0800353 return None, None, None
354
Dan Talaycoe226eb12010-02-18 23:06:30 -0800355 # Desired packet isn't available and timeout is specified
356 # Already holding pkt_sync; wait on pkt_sync variable
357 self.want_pkt = True
Dan Talaycocf26b7a2011-08-05 10:15:35 -0700358 self.exp_pkt = exp_pkt
Dan Talaycoe226eb12010-02-18 23:06:30 -0800359 self.want_pkt_port = port_number
360 self.got_pkt_port = None
361 self.pkt_sync.wait(timeout)
362 self.want_pkt = False
Dan Talaycocf26b7a2011-08-05 10:15:35 -0700363 self.exp_pkt = None
Dan Talaycoe226eb12010-02-18 23:06:30 -0800364 if self.got_pkt_port:
365 pkt, time = \
366 self.port_list[self.got_pkt_port].dequeue(use_lock=False)
367 self.pkt_sync.release()
368 oft_assert(pkt, "Poll: pkt reported, but not found at " +
Dan Talayco48370102010-03-03 15:17:33 -0800369 str(self.got_pkt_port))
Dan Talaycoe226eb12010-02-18 23:06:30 -0800370 return self.got_pkt_port, pkt, time
371
372 self.pkt_sync.release()
Dan Talaycoa99c21a2010-05-07 09:23:34 -0700373 self.logger.debug("Poll time out, no packet from " + str(port_number))
Dan Talaycoe226eb12010-02-18 23:06:30 -0800374
375 return None, None, None
Dan Talayco34089522010-02-07 23:07:41 -0800376
Dan Talayco48370102010-03-03 15:17:33 -0800377 def kill(self, join_threads=True):
Dan Talayco1b3f6902010-02-15 14:14:19 -0800378 """
379 Close all sockets for dataplane
Dan Talayco710438c2010-02-18 15:16:07 -0800380 @param join_threads If True call join on each thread
Dan Talayco1b3f6902010-02-15 14:14:19 -0800381 """
Dan Talayco34089522010-02-07 23:07:41 -0800382 for port_number in self.port_list.keys():
383 self.port_list[port_number].kill()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800384 if join_threads:
Dan Talayco48370102010-03-03 15:17:33 -0800385 self.logger.debug("Joining " + str(port_number))
Dan Talayco1b3f6902010-02-15 14:14:19 -0800386 self.port_list[port_number].join()
Dan Talayco34089522010-02-07 23:07:41 -0800387
Dan Talayco48370102010-03-03 15:17:33 -0800388 self.logger.info("DataPlane shutdown")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800389
390 def show(self, prefix=''):
391 print prefix + "Dataplane Controller"
Dan Talaycoe226eb12010-02-18 23:06:30 -0800392 print prefix + "Packets pending" + str(self.packets_pending)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800393 for pnum, port in self.port_list.items():
394 print prefix + "OpenFlow Port Number " + str(pnum)
395 port.show(prefix + ' ')
396