blob: 29b7fadbdba18f19218e1efc0480d971f2302033 [file] [log] [blame]
Dan Talayco34089522010-02-07 23:07:41 -08001"""
2OpenFlow Test Framework
3
Dan Talayco3087a462010-02-13 14:01:47 -08004DataPlane and DataPlanePort classes
Dan Talayco34089522010-02-07 23:07:41 -08005
6Provide the interface to the control the set of ports being used
7to stimulate the switch under test.
8
9See the class dataplaneport for more details. This class wraps
Dan Talaycoe226eb12010-02-18 23:06:30 -080010a set of those objects allowing general calls and parsing
Dan Talayco34089522010-02-07 23:07:41 -080011configuration.
12
Dan Talayco3087a462010-02-13 14:01:47 -080013@todo Add "filters" for matching packets. Actions supported
14for filters should include a callback or a counter
Dan Talayco34089522010-02-07 23:07:41 -080015"""
16
Dan Talayco3087a462010-02-13 14:01:47 -080017import sys
18import os
19import socket
20import time
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080021import netutils
Dan Talayco3087a462010-02-13 14:01:47 -080022from threading import Thread
23from threading import Lock
Dan Talaycoe226eb12010-02-18 23:06:30 -080024from threading import Condition
Dan Talayco710438c2010-02-18 15:16:07 -080025from oft_config import *
26import select
Dan Talayco3087a462010-02-13 14:01:47 -080027
28#@todo Move these identifiers into config
29ETH_P_ALL = 0x03
30RCV_TIMEOUT = 10000
31RCV_SIZE = 4096
32
33class DataPlanePort(Thread):
34 """
35 Class defining a port monitoring object.
36
37 Control a dataplane port connected to the switch under test.
38 Creates a promiscuous socket on a physical interface.
39 Queues the packets received on that interface with time stamps.
40 Inherits from Thread class as meant to run in background. Also
41 supports polling.
42 Use accessors to dequeue packets for proper synchronization.
Dan Talaycoe226eb12010-02-18 23:06:30 -080043
44 Currently assumes a controlling 'parent' which maintains a
45 common Lock object and a total packet-pending count. May want
46 to decouple that some day.
Dan Talayco3087a462010-02-13 14:01:47 -080047 """
48
Dan Talaycoe226eb12010-02-18 23:06:30 -080049 def __init__(self, interface_name, port_number, parent, max_pkts=1024):
Dan Talayco3087a462010-02-13 14:01:47 -080050 """
51 Set up a port monitor object
52 @param interface_name The name of the physical interface like eth1
Dan Talayco4d065972010-02-18 23:11:32 -080053 @param port_number The port number associated with this port
Dan Talaycoe226eb12010-02-18 23:06:30 -080054 @param parent The controlling dataplane object; for pkt wait CV
Dan Talayco3087a462010-02-13 14:01:47 -080055 @param max_pkts Maximum number of pkts to keep in queue
56 """
57 Thread.__init__(self)
58 self.interface_name = interface_name
Dan Talayco710438c2010-02-18 15:16:07 -080059 self.debug_level = debug_level_default
Dan Talayco3087a462010-02-13 14:01:47 -080060 self.max_pkts = max_pkts
Dan Talayco3087a462010-02-13 14:01:47 -080061 self.packets_total = 0
62 self.packets = []
Dan Talayco1b3f6902010-02-15 14:14:19 -080063 self.packets_discarded = 0
Dan Talaycoe226eb12010-02-18 23:06:30 -080064 self.port_number = port_number
Dan Talayco3087a462010-02-13 14:01:47 -080065 self.socket = self.interface_open(interface_name)
Dan Talayco710438c2010-02-18 15:16:07 -080066 self.dbg(DEBUG_INFO, "Openned port monitor socket")
Dan Talaycoe226eb12010-02-18 23:06:30 -080067 self.parent = parent
68 self.pkt_sync = self.parent.pkt_sync
Dan Talayco1b3f6902010-02-15 14:14:19 -080069
70 def dbg(self, level, string):
Dan Talaycoe226eb12010-02-18 23:06:30 -080071 debug_log("DPLANE", self.debug_level, level,
Dan Talayco710438c2010-02-18 15:16:07 -080072 self.interface_name + ": " + string)
Dan Talayco3087a462010-02-13 14:01:47 -080073
74 def interface_open(self, interface_name):
75 """
76 Open a socket in a promiscuous mode for a data connection.
77 @param interface_name port name as a string such as 'eth1'
78 @retval s socket
79 """
Dan Talaycoe226eb12010-02-18 23:06:30 -080080 s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
Dan Talayco3087a462010-02-13 14:01:47 -080081 socket.htons(ETH_P_ALL))
82 s.bind((interface_name, 0))
Dan Talayco1b3f6902010-02-15 14:14:19 -080083 netutils.set_promisc(s, interface_name)
Dan Talayco3087a462010-02-13 14:01:47 -080084 s.settimeout(RCV_TIMEOUT)
85 return s
86
Dan Talayco3087a462010-02-13 14:01:47 -080087 def run(self):
88 """
89 Activity function for class
90 """
91 self.running = True
Dan Talayco710438c2010-02-18 15:16:07 -080092 self.socs = [self.socket]
93 error_warned = False # Have we warned about error?
Dan Talayco3087a462010-02-13 14:01:47 -080094 while self.running:
95 try:
Dan Talayco710438c2010-02-18 15:16:07 -080096 sel_in, sel_out, sel_err = \
97 select.select(self.socs, [], [], 1)
98 except:
99 print sys.exc_info()
100 self.dbg(DEBUG_ERROR, "Select error, exiting")
101 sys.exit(1)
102
103 #if not sel_err is None:
104 # self.dbg(DEBUG_VERBOSE, "Socket error from select set")
105
106 if not self.running:
107 break
108
109 if sel_in is None:
110 continue
111
112 try:
Dan Talayco3087a462010-02-13 14:01:47 -0800113 rcvmsg = self.socket.recv(RCV_SIZE)
Dan Talayco3087a462010-02-13 14:01:47 -0800114 except socket.error:
Dan Talayco710438c2010-02-18 15:16:07 -0800115 if not error_warned:
116 self.dbg(DEBUG_INFO, "Socket error on recv")
117 error_warned = True
Dan Talayco1b3f6902010-02-15 14:14:19 -0800118 continue
Dan Talayco710438c2010-02-18 15:16:07 -0800119
Dan Talayco1b3f6902010-02-15 14:14:19 -0800120 if len(rcvmsg) == 0:
Dan Talayco710438c2010-02-18 15:16:07 -0800121 self.dbg(DEBUG_INFO, "Zero len pkt rcvd")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800122 self.kill()
Dan Talayco3087a462010-02-13 14:01:47 -0800123 break
124
Dan Talayco1b3f6902010-02-15 14:14:19 -0800125 rcvtime = time.clock()
Dan Talaycoe226eb12010-02-18 23:06:30 -0800126 self.dbg(DEBUG_VERBOSE, "Pkt len " + str(len(rcvmsg)) +
Dan Talayco710438c2010-02-18 15:16:07 -0800127 " in at " + str(rcvtime))
Dan Talayco1b3f6902010-02-15 14:14:19 -0800128
Dan Talaycoe226eb12010-02-18 23:06:30 -0800129 # Enqueue packet
130 self.pkt_sync.acquire()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800131 if len(self.packets) >= self.max_pkts:
Dan Talaycoe226eb12010-02-18 23:06:30 -0800132 # Queue full, throw away oldest
Dan Talayco1b3f6902010-02-15 14:14:19 -0800133 self.packets.pop(0)
134 self.packets_discarded += 1
Dan Talaycoe226eb12010-02-18 23:06:30 -0800135 else:
136 self.parent.packets_pending += 1
137 # Check if parent is waiting on this (or any) port
138 if self.parent.want_pkt:
139 if (not self.parent.want_pkt_port or
140 self.parent.want_pkt_port == self.port_number):
141 self.parent.got_pkt_port = self.port_number
142 self.parent.want_pkt = False
143 self.parent.want_pkt.notify()
Dan Talayco710438c2010-02-18 15:16:07 -0800144 self.packets.append((rcvmsg, rcvtime))
Dan Talayco1b3f6902010-02-15 14:14:19 -0800145 self.packets_total += 1
Dan Talaycoe226eb12010-02-18 23:06:30 -0800146 self.pkt_sync.release()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800147
Dan Talayco710438c2010-02-18 15:16:07 -0800148 self.dbg(DEBUG_INFO, "Thread exit ")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800149
150 def kill(self):
151 """
152 Terminate the running thread
153 """
154 self.running = False
155 try:
156 self.socket.close()
157 except:
158 self.dbg(DEBUG_INFO, "Ignoring dataplane soc shutdown error")
Dan Talaycoe226eb12010-02-18 23:06:30 -0800159 self.dbg(DEBUG_INFO, "Port monitor exiting")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800160
Dan Talaycoe226eb12010-02-18 23:06:30 -0800161 def dequeue(self, use_lock=True):
Dan Talayco3087a462010-02-13 14:01:47 -0800162 """
163 Get the oldest packet in the queue
Dan Talaycoe226eb12010-02-18 23:06:30 -0800164 @param use_lock If True, acquires the packet sync lock (which is
165 really the parent's lock)
166 @return The pair packet, packet time-stamp
Dan Talayco3087a462010-02-13 14:01:47 -0800167 """
Dan Talaycoe226eb12010-02-18 23:06:30 -0800168 if use_lock:
169 self.pkt_sync.acquire()
170 if len(self.packets) > 0:
171 pkt, pkt_time = self.packets.pop(0)
172 self.parent.packets_pending -= 1
173 else:
174 pkt = pkt_time = None
175 if use_lock:
176 self.pkt_sync.release()
Dan Talayco3087a462010-02-13 14:01:47 -0800177 return pkt, pkt_time
178
179 def timestamp_head(self):
180 """
181 Return the timestamp of the head of queue or None if empty
182 """
Dan Talayco710438c2010-02-18 15:16:07 -0800183 rv = None
Dan Talaycoe226eb12010-02-18 23:06:30 -0800184 try:
Dan Talayco710438c2010-02-18 15:16:07 -0800185 rv = self.packets[0][1]
Dan Talaycoe226eb12010-02-18 23:06:30 -0800186 except:
187 rv = None
Dan Talayco710438c2010-02-18 15:16:07 -0800188 return rv
Dan Talayco3087a462010-02-13 14:01:47 -0800189
190 def flush(self):
191 """
192 Clear the packet queue
193 """
Dan Talaycoe226eb12010-02-18 23:06:30 -0800194 self.pkt_sync.acquire()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800195 self.packets_discarded += len(self.packets)
Dan Talaycoe226eb12010-02-18 23:06:30 -0800196 self.parent.packets_pending -= len(self.packets)
Dan Talayco3087a462010-02-13 14:01:47 -0800197 self.packets = []
198 self.packet_times = []
Dan Talaycoe226eb12010-02-18 23:06:30 -0800199 self.pkt_sync.release()
Dan Talayco3087a462010-02-13 14:01:47 -0800200
201
202 def send(self, packet):
203 """
204 Send a packet to the dataplane port
205 @param packet The packet data to send to the port
206 @retval The number of bytes sent
207 """
Dan Talayco710438c2010-02-18 15:16:07 -0800208 self.dbg(DEBUG_VERBOSE,
Dan Talayco1b3f6902010-02-15 14:14:19 -0800209 "port sending " + str(len(packet)) + " bytes")
Dan Talayco3087a462010-02-13 14:01:47 -0800210 return self.socket.send(packet)
211
212
213 def register(self, handler):
214 """
215 Register a callback function to receive packets from this
Dan Talaycoe226eb12010-02-18 23:06:30 -0800216 port. The callback will be passed the packet, the
217 interface name and the port number (if set) on which the
Dan Talayco3087a462010-02-13 14:01:47 -0800218 packet was received.
219
220 To be implemented
221 """
222 pass
223
Dan Talayco1b3f6902010-02-15 14:14:19 -0800224 def show(self, prefix=''):
225 print prefix + "Name: " + self.interface_name
Dan Talayco710438c2010-02-18 15:16:07 -0800226 print prefix + "Pkts pending: " + str(len(self.packets))
Dan Talayco1b3f6902010-02-15 14:14:19 -0800227 print prefix + "Pkts total: " + str(self.packets_total)
228 print prefix + "socket: " + str(self.socket)
Dan Talaycoe226eb12010-02-18 23:06:30 -0800229
Dan Talayco34089522010-02-07 23:07:41 -0800230
231class DataPlane:
232 """
233 Class defining access primitives to the data plane
234 Controls a list of DataPlanePort objects
235 """
236 def __init__(self):
237 self.port_list = {}
Dan Talayco710438c2010-02-18 15:16:07 -0800238 self.debug_level = debug_level_default
Dan Talaycoe226eb12010-02-18 23:06:30 -0800239 # pkt_sync serves double duty as a regular top level lock and
240 # as a condition variable
241 self.pkt_sync = Condition()
242
243 # These are used to signal async pkt arrival for polling
244 self.want_pkt = False
245 self.want_pkt_port = None # What port required (or None)
246 self.got_pkt_port = None # On what port received?
247 self.packets_pending = 0 # Total pkts in all port queues
Dan Talayco1b3f6902010-02-15 14:14:19 -0800248
249 def dbg(self, level, string):
Dan Talayco710438c2010-02-18 15:16:07 -0800250 debug_log("DPORT", self.debug_level, level, string)
Dan Talayco34089522010-02-07 23:07:41 -0800251
252 def port_add(self, interface_name, port_number):
253 """
254 Add a port to the dataplane
255 TBD: Max packets for queue?
256 @param interface_name The name of the physical interface like eth1
257 @param port_number The port number used to refer to the port
258 """
Dan Talaycoe226eb12010-02-18 23:06:30 -0800259
260 self.port_list[port_number] = DataPlanePort(interface_name,
261 port_number, self)
Dan Talayco34089522010-02-07 23:07:41 -0800262 self.port_list[port_number].start()
263
264 def send(self, port_number, packet):
265 """
266 Send a packet to the given port
267 @param port_number The port to send the data to
268 @param packet Raw packet data to send to port
269 """
Dan Talayco710438c2010-02-18 15:16:07 -0800270 self.dbg(DEBUG_VERBOSE,
Dan Talayco1b3f6902010-02-15 14:14:19 -0800271 "Sending %d bytes to port %d" % (len(packet), port_number))
Dan Talayco34089522010-02-07 23:07:41 -0800272 bytes = self.port_list[port_number].send(packet)
273 if bytes != len(packet):
Dan Talaycoe226eb12010-02-18 23:06:30 -0800274 self.dbg(DEBUG_ERROR,"Unhandled send error, " +
Dan Talayco1b3f6902010-02-15 14:14:19 -0800275 "length mismatch %d != %d" %
276 (bytes, len(packet)))
Dan Talayco34089522010-02-07 23:07:41 -0800277 return bytes
278
279 def flood(self, packet):
280 """
281 Send a packet to all ports
282 @param packet Raw packet data to send to port
283 """
284 for port_number in self.port_list.keys():
285 bytes = self.port_list[port_number].send(packet)
286 if bytes != len(packet):
Dan Talayco1b3f6902010-02-15 14:14:19 -0800287 self.dbg(DEBUG_ERROR, "Unhandled send error" +
288 ", port %d, length mismatch %d != %d" %
289 (port_number, bytes, len(packet)))
Dan Talayco34089522010-02-07 23:07:41 -0800290
Dan Talaycoe226eb12010-02-18 23:06:30 -0800291 def _oldest_packet_find(self):
Dan Talayco34089522010-02-07 23:07:41 -0800292 # Find port with oldest packet
293 min_time = 0
294 min_port = -1
295 for port_number in self.port_list.keys():
296 ptime = self.port_list[port_number].timestamp_head()
297 if ptime:
298 if (min_port == -1) or (ptime < min_time):
Dan Talaycoe226eb12010-02-18 23:06:30 -0800299 min_time = ptime
Dan Talayco34089522010-02-07 23:07:41 -0800300 min_port = port_number
Dan Talaycoe226eb12010-02-18 23:06:30 -0800301 oft_assert(min_port != -1, "Could not find port when pkts pending")
Dan Talayco34089522010-02-07 23:07:41 -0800302
Dan Talaycoe226eb12010-02-18 23:06:30 -0800303 return min_port
304
305 def poll(self, port_number=None, timeout=None):
306 """
307 Poll one or all dataplane ports for a packet
308
309 If port_number is given, get the oldest packet from that port.
310 Otherwise, find the port with the oldest packet and return
311 that packet.
312 @param port_number If set, get packet from this port
313 @param timeout If positive and no packet is available, block
314 until a packet is received or for this many seconds
315 @return The triple port_number, packet, pkt_time where packet
316 is received from port_number at time pkt_time. If a timeout
317 occurs, return None, None, None
318 """
319
320 self.pkt_sync.acquire()
321
322 # Check if requested specific port and it has a packet
323 if port_number and len(self.port_list[port_number].packets) != 0:
324 pkt, time = self.port_list[port_number].dequeue(use_lock=False)
325 self.pkt_sync.release()
326 oft_assert(pkt, "Poll: packet not found on port " +
327 str(port_number))
328 return port_number, pkt, time
329
330 # Check if requested any port and some packet pending
331 if not port_number and self.packets_pending != 0:
332 port = self._oldest_packet_find()
333 pkt, time = self.port_list[port].dequeue(use_lock=False)
334 self.pkt_sync.release()
335 oft_assert(pkt, "Poll: oldest packet not found")
336 return port, pkt, time
337
338 # No packet pending; blocking call requested?
339 if not timeout:
340 self.pkt_sync.release()
Dan Talayco34089522010-02-07 23:07:41 -0800341 return None, None, None
342
Dan Talaycoe226eb12010-02-18 23:06:30 -0800343 # Desired packet isn't available and timeout is specified
344 # Already holding pkt_sync; wait on pkt_sync variable
345 self.want_pkt = True
346 self.want_pkt_port = port_number
347 self.got_pkt_port = None
348 self.pkt_sync.wait(timeout)
349 self.want_pkt = False
350 if self.got_pkt_port:
351 pkt, time = \
352 self.port_list[self.got_pkt_port].dequeue(use_lock=False)
353 self.pkt_sync.release()
354 oft_assert(pkt, "Poll: pkt reported, but not found at " +
355 self.got_pkt_port)
356 return self.got_pkt_port, pkt, time
357
358 self.pkt_sync.release()
359 self.dbg(DEBUG_VERBOSE, "Poll time out, no packet")
360
361 return None, None, None
Dan Talayco34089522010-02-07 23:07:41 -0800362
Dan Talayco710438c2010-02-18 15:16:07 -0800363 def kill(self, join_threads=False):
Dan Talayco1b3f6902010-02-15 14:14:19 -0800364 """
365 Close all sockets for dataplane
Dan Talayco710438c2010-02-18 15:16:07 -0800366 @param join_threads If True call join on each thread
Dan Talayco1b3f6902010-02-15 14:14:19 -0800367 """
Dan Talayco34089522010-02-07 23:07:41 -0800368 for port_number in self.port_list.keys():
369 self.port_list[port_number].kill()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800370 if join_threads:
Dan Talayco710438c2010-02-18 15:16:07 -0800371 self.dbg(DEBUG_INFO, "Joining " + str(port_number))
Dan Talayco1b3f6902010-02-15 14:14:19 -0800372 self.port_list[port_number].join()
Dan Talayco34089522010-02-07 23:07:41 -0800373
Dan Talayco710438c2010-02-18 15:16:07 -0800374 self.dbg(DEBUG_INFO, "DataPlane shutdown")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800375
376 def show(self, prefix=''):
377 print prefix + "Dataplane Controller"
Dan Talaycoe226eb12010-02-18 23:06:30 -0800378 print prefix + "Packets pending" + str(self.packets_pending)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800379 for pnum, port in self.port_list.items():
380 print prefix + "OpenFlow Port Number " + str(pnum)
381 port.show(prefix + ' ')
382