blob: bdbfea7c70f992a02a46b34c7a05eb85efc6916f [file] [log] [blame]
Dan Talayco34089522010-02-07 23:07:41 -08001"""
2OpenFlow Test Framework
3
Dan Talayco3087a462010-02-13 14:01:47 -08004DataPlane and DataPlanePort classes
Dan Talayco34089522010-02-07 23:07:41 -08005
6Provide the interface to the control the set of ports being used
7to stimulate the switch under test.
8
9See the class dataplaneport for more details. This class wraps
Dan Talaycoe226eb12010-02-18 23:06:30 -080010a set of those objects allowing general calls and parsing
Dan Talayco34089522010-02-07 23:07:41 -080011configuration.
12
Dan Talayco3087a462010-02-13 14:01:47 -080013@todo Add "filters" for matching packets. Actions supported
14for filters should include a callback or a counter
Dan Talayco34089522010-02-07 23:07:41 -080015"""
16
Dan Talayco3087a462010-02-13 14:01:47 -080017import sys
18import os
19import socket
20import time
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080021import netutils
Dan Talayco3087a462010-02-13 14:01:47 -080022from threading import Thread
23from threading import Lock
Dan Talaycoe226eb12010-02-18 23:06:30 -080024from threading import Condition
Dan Talayco710438c2010-02-18 15:16:07 -080025from oft_config import *
26import select
Dan Talayco3087a462010-02-13 14:01:47 -080027
28#@todo Move these identifiers into config
29ETH_P_ALL = 0x03
30RCV_TIMEOUT = 10000
31RCV_SIZE = 4096
32
33class DataPlanePort(Thread):
34 """
35 Class defining a port monitoring object.
36
37 Control a dataplane port connected to the switch under test.
38 Creates a promiscuous socket on a physical interface.
39 Queues the packets received on that interface with time stamps.
40 Inherits from Thread class as meant to run in background. Also
41 supports polling.
42 Use accessors to dequeue packets for proper synchronization.
Dan Talaycoe226eb12010-02-18 23:06:30 -080043
44 Currently assumes a controlling 'parent' which maintains a
45 common Lock object and a total packet-pending count. May want
46 to decouple that some day.
Dan Talayco3087a462010-02-13 14:01:47 -080047 """
48
Dan Talaycoe226eb12010-02-18 23:06:30 -080049 def __init__(self, interface_name, port_number, parent, max_pkts=1024):
Dan Talayco3087a462010-02-13 14:01:47 -080050 """
51 Set up a port monitor object
52 @param interface_name The name of the physical interface like eth1
Dan Talaycoe226eb12010-02-18 23:06:30 -080053 @param parent The controlling dataplane object; for pkt wait CV
Dan Talayco3087a462010-02-13 14:01:47 -080054 @param max_pkts Maximum number of pkts to keep in queue
55 """
56 Thread.__init__(self)
57 self.interface_name = interface_name
Dan Talayco710438c2010-02-18 15:16:07 -080058 self.debug_level = debug_level_default
Dan Talayco3087a462010-02-13 14:01:47 -080059 self.max_pkts = max_pkts
Dan Talayco3087a462010-02-13 14:01:47 -080060 self.packets_total = 0
61 self.packets = []
Dan Talayco1b3f6902010-02-15 14:14:19 -080062 self.packets_discarded = 0
Dan Talaycoe226eb12010-02-18 23:06:30 -080063 self.port_number = port_number
Dan Talayco3087a462010-02-13 14:01:47 -080064 self.socket = self.interface_open(interface_name)
Dan Talayco710438c2010-02-18 15:16:07 -080065 self.dbg(DEBUG_INFO, "Openned port monitor socket")
Dan Talaycoe226eb12010-02-18 23:06:30 -080066 self.parent = parent
67 self.pkt_sync = self.parent.pkt_sync
Dan Talayco1b3f6902010-02-15 14:14:19 -080068
69 def dbg(self, level, string):
Dan Talaycoe226eb12010-02-18 23:06:30 -080070 debug_log("DPLANE", self.debug_level, level,
Dan Talayco710438c2010-02-18 15:16:07 -080071 self.interface_name + ": " + string)
Dan Talayco3087a462010-02-13 14:01:47 -080072
73 def interface_open(self, interface_name):
74 """
75 Open a socket in a promiscuous mode for a data connection.
76 @param interface_name port name as a string such as 'eth1'
77 @retval s socket
78 """
Dan Talaycoe226eb12010-02-18 23:06:30 -080079 s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
Dan Talayco3087a462010-02-13 14:01:47 -080080 socket.htons(ETH_P_ALL))
81 s.bind((interface_name, 0))
Dan Talayco1b3f6902010-02-15 14:14:19 -080082 netutils.set_promisc(s, interface_name)
Dan Talayco3087a462010-02-13 14:01:47 -080083 s.settimeout(RCV_TIMEOUT)
84 return s
85
Dan Talayco3087a462010-02-13 14:01:47 -080086 def run(self):
87 """
88 Activity function for class
89 """
90 self.running = True
Dan Talayco710438c2010-02-18 15:16:07 -080091 self.socs = [self.socket]
92 error_warned = False # Have we warned about error?
Dan Talayco3087a462010-02-13 14:01:47 -080093 while self.running:
94 try:
Dan Talayco710438c2010-02-18 15:16:07 -080095 sel_in, sel_out, sel_err = \
96 select.select(self.socs, [], [], 1)
97 except:
98 print sys.exc_info()
99 self.dbg(DEBUG_ERROR, "Select error, exiting")
100 sys.exit(1)
101
102 #if not sel_err is None:
103 # self.dbg(DEBUG_VERBOSE, "Socket error from select set")
104
105 if not self.running:
106 break
107
108 if sel_in is None:
109 continue
110
111 try:
Dan Talayco3087a462010-02-13 14:01:47 -0800112 rcvmsg = self.socket.recv(RCV_SIZE)
Dan Talayco3087a462010-02-13 14:01:47 -0800113 except socket.error:
Dan Talayco710438c2010-02-18 15:16:07 -0800114 if not error_warned:
115 self.dbg(DEBUG_INFO, "Socket error on recv")
116 error_warned = True
Dan Talayco1b3f6902010-02-15 14:14:19 -0800117 continue
Dan Talayco710438c2010-02-18 15:16:07 -0800118
Dan Talayco1b3f6902010-02-15 14:14:19 -0800119 if len(rcvmsg) == 0:
Dan Talayco710438c2010-02-18 15:16:07 -0800120 self.dbg(DEBUG_INFO, "Zero len pkt rcvd")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800121 self.kill()
Dan Talayco3087a462010-02-13 14:01:47 -0800122 break
123
Dan Talayco1b3f6902010-02-15 14:14:19 -0800124 rcvtime = time.clock()
Dan Talaycoe226eb12010-02-18 23:06:30 -0800125 self.dbg(DEBUG_VERBOSE, "Pkt len " + str(len(rcvmsg)) +
Dan Talayco710438c2010-02-18 15:16:07 -0800126 " in at " + str(rcvtime))
Dan Talayco1b3f6902010-02-15 14:14:19 -0800127
Dan Talaycoe226eb12010-02-18 23:06:30 -0800128 # Enqueue packet
129 self.pkt_sync.acquire()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800130 if len(self.packets) >= self.max_pkts:
Dan Talaycoe226eb12010-02-18 23:06:30 -0800131 # Queue full, throw away oldest
Dan Talayco1b3f6902010-02-15 14:14:19 -0800132 self.packets.pop(0)
133 self.packets_discarded += 1
Dan Talaycoe226eb12010-02-18 23:06:30 -0800134 else:
135 self.parent.packets_pending += 1
136 # Check if parent is waiting on this (or any) port
137 if self.parent.want_pkt:
138 if (not self.parent.want_pkt_port or
139 self.parent.want_pkt_port == self.port_number):
140 self.parent.got_pkt_port = self.port_number
141 self.parent.want_pkt = False
142 self.parent.want_pkt.notify()
Dan Talayco710438c2010-02-18 15:16:07 -0800143 self.packets.append((rcvmsg, rcvtime))
Dan Talayco1b3f6902010-02-15 14:14:19 -0800144 self.packets_total += 1
Dan Talaycoe226eb12010-02-18 23:06:30 -0800145 self.pkt_sync.release()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800146
Dan Talayco710438c2010-02-18 15:16:07 -0800147 self.dbg(DEBUG_INFO, "Thread exit ")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800148
149 def kill(self):
150 """
151 Terminate the running thread
152 """
153 self.running = False
154 try:
155 self.socket.close()
156 except:
157 self.dbg(DEBUG_INFO, "Ignoring dataplane soc shutdown error")
Dan Talaycoe226eb12010-02-18 23:06:30 -0800158 self.dbg(DEBUG_INFO, "Port monitor exiting")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800159
Dan Talaycoe226eb12010-02-18 23:06:30 -0800160 def dequeue(self, use_lock=True):
Dan Talayco3087a462010-02-13 14:01:47 -0800161 """
162 Get the oldest packet in the queue
Dan Talaycoe226eb12010-02-18 23:06:30 -0800163 @param use_lock If True, acquires the packet sync lock (which is
164 really the parent's lock)
165 @return The pair packet, packet time-stamp
Dan Talayco3087a462010-02-13 14:01:47 -0800166 """
Dan Talaycoe226eb12010-02-18 23:06:30 -0800167 if use_lock:
168 self.pkt_sync.acquire()
169 if len(self.packets) > 0:
170 pkt, pkt_time = self.packets.pop(0)
171 self.parent.packets_pending -= 1
172 else:
173 pkt = pkt_time = None
174 if use_lock:
175 self.pkt_sync.release()
Dan Talayco3087a462010-02-13 14:01:47 -0800176 return pkt, pkt_time
177
178 def timestamp_head(self):
179 """
180 Return the timestamp of the head of queue or None if empty
181 """
Dan Talayco710438c2010-02-18 15:16:07 -0800182 rv = None
Dan Talaycoe226eb12010-02-18 23:06:30 -0800183 try:
Dan Talayco710438c2010-02-18 15:16:07 -0800184 rv = self.packets[0][1]
Dan Talaycoe226eb12010-02-18 23:06:30 -0800185 except:
186 rv = None
Dan Talayco710438c2010-02-18 15:16:07 -0800187 return rv
Dan Talayco3087a462010-02-13 14:01:47 -0800188
189 def flush(self):
190 """
191 Clear the packet queue
192 """
Dan Talaycoe226eb12010-02-18 23:06:30 -0800193 self.pkt_sync.acquire()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800194 self.packets_discarded += len(self.packets)
Dan Talaycoe226eb12010-02-18 23:06:30 -0800195 self.parent.packets_pending -= len(self.packets)
Dan Talayco3087a462010-02-13 14:01:47 -0800196 self.packets = []
197 self.packet_times = []
Dan Talaycoe226eb12010-02-18 23:06:30 -0800198 self.pkt_sync.release()
Dan Talayco3087a462010-02-13 14:01:47 -0800199
200
201 def send(self, packet):
202 """
203 Send a packet to the dataplane port
204 @param packet The packet data to send to the port
205 @retval The number of bytes sent
206 """
Dan Talayco710438c2010-02-18 15:16:07 -0800207 self.dbg(DEBUG_VERBOSE,
Dan Talayco1b3f6902010-02-15 14:14:19 -0800208 "port sending " + str(len(packet)) + " bytes")
Dan Talayco3087a462010-02-13 14:01:47 -0800209 return self.socket.send(packet)
210
211
212 def register(self, handler):
213 """
214 Register a callback function to receive packets from this
Dan Talaycoe226eb12010-02-18 23:06:30 -0800215 port. The callback will be passed the packet, the
216 interface name and the port number (if set) on which the
Dan Talayco3087a462010-02-13 14:01:47 -0800217 packet was received.
218
219 To be implemented
220 """
221 pass
222
Dan Talayco1b3f6902010-02-15 14:14:19 -0800223 def show(self, prefix=''):
224 print prefix + "Name: " + self.interface_name
Dan Talayco710438c2010-02-18 15:16:07 -0800225 print prefix + "Pkts pending: " + str(len(self.packets))
Dan Talayco1b3f6902010-02-15 14:14:19 -0800226 print prefix + "Pkts total: " + str(self.packets_total)
227 print prefix + "socket: " + str(self.socket)
Dan Talaycoe226eb12010-02-18 23:06:30 -0800228
Dan Talayco34089522010-02-07 23:07:41 -0800229
230class DataPlane:
231 """
232 Class defining access primitives to the data plane
233 Controls a list of DataPlanePort objects
234 """
235 def __init__(self):
236 self.port_list = {}
Dan Talayco710438c2010-02-18 15:16:07 -0800237 self.debug_level = debug_level_default
Dan Talaycoe226eb12010-02-18 23:06:30 -0800238 # pkt_sync serves double duty as a regular top level lock and
239 # as a condition variable
240 self.pkt_sync = Condition()
241
242 # These are used to signal async pkt arrival for polling
243 self.want_pkt = False
244 self.want_pkt_port = None # What port required (or None)
245 self.got_pkt_port = None # On what port received?
246 self.packets_pending = 0 # Total pkts in all port queues
Dan Talayco1b3f6902010-02-15 14:14:19 -0800247
248 def dbg(self, level, string):
Dan Talayco710438c2010-02-18 15:16:07 -0800249 debug_log("DPORT", self.debug_level, level, string)
Dan Talayco34089522010-02-07 23:07:41 -0800250
251 def port_add(self, interface_name, port_number):
252 """
253 Add a port to the dataplane
254 TBD: Max packets for queue?
255 @param interface_name The name of the physical interface like eth1
256 @param port_number The port number used to refer to the port
257 """
Dan Talaycoe226eb12010-02-18 23:06:30 -0800258
259 self.port_list[port_number] = DataPlanePort(interface_name,
260 port_number, self)
Dan Talayco34089522010-02-07 23:07:41 -0800261 self.port_list[port_number].start()
262
263 def send(self, port_number, packet):
264 """
265 Send a packet to the given port
266 @param port_number The port to send the data to
267 @param packet Raw packet data to send to port
268 """
Dan Talayco710438c2010-02-18 15:16:07 -0800269 self.dbg(DEBUG_VERBOSE,
Dan Talayco1b3f6902010-02-15 14:14:19 -0800270 "Sending %d bytes to port %d" % (len(packet), port_number))
Dan Talayco34089522010-02-07 23:07:41 -0800271 bytes = self.port_list[port_number].send(packet)
272 if bytes != len(packet):
Dan Talaycoe226eb12010-02-18 23:06:30 -0800273 self.dbg(DEBUG_ERROR,"Unhandled send error, " +
Dan Talayco1b3f6902010-02-15 14:14:19 -0800274 "length mismatch %d != %d" %
275 (bytes, len(packet)))
Dan Talayco34089522010-02-07 23:07:41 -0800276 return bytes
277
278 def flood(self, packet):
279 """
280 Send a packet to all ports
281 @param packet Raw packet data to send to port
282 """
283 for port_number in self.port_list.keys():
284 bytes = self.port_list[port_number].send(packet)
285 if bytes != len(packet):
Dan Talayco1b3f6902010-02-15 14:14:19 -0800286 self.dbg(DEBUG_ERROR, "Unhandled send error" +
287 ", port %d, length mismatch %d != %d" %
288 (port_number, bytes, len(packet)))
Dan Talayco34089522010-02-07 23:07:41 -0800289
Dan Talaycoe226eb12010-02-18 23:06:30 -0800290 def _oldest_packet_find(self):
Dan Talayco34089522010-02-07 23:07:41 -0800291 # Find port with oldest packet
292 min_time = 0
293 min_port = -1
294 for port_number in self.port_list.keys():
295 ptime = self.port_list[port_number].timestamp_head()
296 if ptime:
297 if (min_port == -1) or (ptime < min_time):
Dan Talaycoe226eb12010-02-18 23:06:30 -0800298 min_time = ptime
Dan Talayco34089522010-02-07 23:07:41 -0800299 min_port = port_number
Dan Talaycoe226eb12010-02-18 23:06:30 -0800300 oft_assert(min_port != -1, "Could not find port when pkts pending")
Dan Talayco34089522010-02-07 23:07:41 -0800301
Dan Talaycoe226eb12010-02-18 23:06:30 -0800302 return min_port
303
304 def poll(self, port_number=None, timeout=None):
305 """
306 Poll one or all dataplane ports for a packet
307
308 If port_number is given, get the oldest packet from that port.
309 Otherwise, find the port with the oldest packet and return
310 that packet.
311 @param port_number If set, get packet from this port
312 @param timeout If positive and no packet is available, block
313 until a packet is received or for this many seconds
314 @return The triple port_number, packet, pkt_time where packet
315 is received from port_number at time pkt_time. If a timeout
316 occurs, return None, None, None
317 """
318
319 self.pkt_sync.acquire()
320
321 # Check if requested specific port and it has a packet
322 if port_number and len(self.port_list[port_number].packets) != 0:
323 pkt, time = self.port_list[port_number].dequeue(use_lock=False)
324 self.pkt_sync.release()
325 oft_assert(pkt, "Poll: packet not found on port " +
326 str(port_number))
327 return port_number, pkt, time
328
329 # Check if requested any port and some packet pending
330 if not port_number and self.packets_pending != 0:
331 port = self._oldest_packet_find()
332 pkt, time = self.port_list[port].dequeue(use_lock=False)
333 self.pkt_sync.release()
334 oft_assert(pkt, "Poll: oldest packet not found")
335 return port, pkt, time
336
337 # No packet pending; blocking call requested?
338 if not timeout:
339 self.pkt_sync.release()
Dan Talayco34089522010-02-07 23:07:41 -0800340 return None, None, None
341
Dan Talaycoe226eb12010-02-18 23:06:30 -0800342 # Desired packet isn't available and timeout is specified
343 # Already holding pkt_sync; wait on pkt_sync variable
344 self.want_pkt = True
345 self.want_pkt_port = port_number
346 self.got_pkt_port = None
347 self.pkt_sync.wait(timeout)
348 self.want_pkt = False
349 if self.got_pkt_port:
350 pkt, time = \
351 self.port_list[self.got_pkt_port].dequeue(use_lock=False)
352 self.pkt_sync.release()
353 oft_assert(pkt, "Poll: pkt reported, but not found at " +
354 self.got_pkt_port)
355 return self.got_pkt_port, pkt, time
356
357 self.pkt_sync.release()
358 self.dbg(DEBUG_VERBOSE, "Poll time out, no packet")
359
360 return None, None, None
Dan Talayco34089522010-02-07 23:07:41 -0800361
Dan Talayco710438c2010-02-18 15:16:07 -0800362 def kill(self, join_threads=False):
Dan Talayco1b3f6902010-02-15 14:14:19 -0800363 """
364 Close all sockets for dataplane
Dan Talayco710438c2010-02-18 15:16:07 -0800365 @param join_threads If True call join on each thread
Dan Talayco1b3f6902010-02-15 14:14:19 -0800366 """
Dan Talayco34089522010-02-07 23:07:41 -0800367 for port_number in self.port_list.keys():
368 self.port_list[port_number].kill()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800369 if join_threads:
Dan Talayco710438c2010-02-18 15:16:07 -0800370 self.dbg(DEBUG_INFO, "Joining " + str(port_number))
Dan Talayco1b3f6902010-02-15 14:14:19 -0800371 self.port_list[port_number].join()
Dan Talayco34089522010-02-07 23:07:41 -0800372
Dan Talayco710438c2010-02-18 15:16:07 -0800373 self.dbg(DEBUG_INFO, "DataPlane shutdown")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800374
375 def show(self, prefix=''):
376 print prefix + "Dataplane Controller"
Dan Talaycoe226eb12010-02-18 23:06:30 -0800377 print prefix + "Packets pending" + str(self.packets_pending)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800378 for pnum, port in self.port_list.items():
379 print prefix + "OpenFlow Port Number " + str(pnum)
380 port.show(prefix + ' ')
381