blob: 9389d517a4674a585f6f77de4b9d8fed03f18881 [file] [log] [blame]
Dan Talayco34089522010-02-07 23:07:41 -08001"""
2OpenFlow Test Framework
3
Dan Talayco3087a462010-02-13 14:01:47 -08004DataPlane and DataPlanePort classes
Dan Talayco34089522010-02-07 23:07:41 -08005
6Provide the interface to the control the set of ports being used
7to stimulate the switch under test.
8
9See the class dataplaneport for more details. This class wraps
Dan Talaycoe226eb12010-02-18 23:06:30 -080010a set of those objects allowing general calls and parsing
Dan Talayco34089522010-02-07 23:07:41 -080011configuration.
12
Dan Talayco3087a462010-02-13 14:01:47 -080013@todo Add "filters" for matching packets. Actions supported
14for filters should include a callback or a counter
Dan Talayco34089522010-02-07 23:07:41 -080015"""
16
Dan Talayco3087a462010-02-13 14:01:47 -080017import sys
18import os
19import socket
20import time
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080021import netutils
Dan Talayco3087a462010-02-13 14:01:47 -080022from threading import Thread
23from threading import Lock
Dan Talaycoe226eb12010-02-18 23:06:30 -080024from threading import Condition
Dan Talayco710438c2010-02-18 15:16:07 -080025import select
Dan Talayco48370102010-03-03 15:17:33 -080026import logging
27from oft_assert import oft_assert
Dan Talayco3087a462010-02-13 14:01:47 -080028
Dan Talayco48370102010-03-03 15:17:33 -080029##@todo Find a better home for these identifiers (dataplane)
30RCV_SIZE_DEFAULT = 4096
Dan Talayco3087a462010-02-13 14:01:47 -080031ETH_P_ALL = 0x03
32RCV_TIMEOUT = 10000
Dan Talayco3087a462010-02-13 14:01:47 -080033
34class DataPlanePort(Thread):
35 """
36 Class defining a port monitoring object.
37
38 Control a dataplane port connected to the switch under test.
39 Creates a promiscuous socket on a physical interface.
40 Queues the packets received on that interface with time stamps.
41 Inherits from Thread class as meant to run in background. Also
42 supports polling.
43 Use accessors to dequeue packets for proper synchronization.
Dan Talaycoe226eb12010-02-18 23:06:30 -080044
45 Currently assumes a controlling 'parent' which maintains a
46 common Lock object and a total packet-pending count. May want
47 to decouple that some day.
Dan Talayco3087a462010-02-13 14:01:47 -080048 """
49
Dan Talaycoe226eb12010-02-18 23:06:30 -080050 def __init__(self, interface_name, port_number, parent, max_pkts=1024):
Dan Talayco3087a462010-02-13 14:01:47 -080051 """
52 Set up a port monitor object
53 @param interface_name The name of the physical interface like eth1
Dan Talayco4d065972010-02-18 23:11:32 -080054 @param port_number The port number associated with this port
Dan Talaycoe226eb12010-02-18 23:06:30 -080055 @param parent The controlling dataplane object; for pkt wait CV
Dan Talayco3087a462010-02-13 14:01:47 -080056 @param max_pkts Maximum number of pkts to keep in queue
57 """
58 Thread.__init__(self)
59 self.interface_name = interface_name
60 self.max_pkts = max_pkts
Dan Talayco3087a462010-02-13 14:01:47 -080061 self.packets_total = 0
62 self.packets = []
Dan Talayco1b3f6902010-02-15 14:14:19 -080063 self.packets_discarded = 0
Dan Talaycoe226eb12010-02-18 23:06:30 -080064 self.port_number = port_number
Dan Talayco3087a462010-02-13 14:01:47 -080065 self.socket = self.interface_open(interface_name)
Dan Talayco48370102010-03-03 15:17:33 -080066 logname = "dp-" + interface_name
67 self.logger = logging.getLogger(logname)
68 self.logger.info("Openned port monitor socket")
Dan Talaycoe226eb12010-02-18 23:06:30 -080069 self.parent = parent
70 self.pkt_sync = self.parent.pkt_sync
Dan Talayco1b3f6902010-02-15 14:14:19 -080071
Dan Talayco3087a462010-02-13 14:01:47 -080072 def interface_open(self, interface_name):
73 """
74 Open a socket in a promiscuous mode for a data connection.
75 @param interface_name port name as a string such as 'eth1'
76 @retval s socket
77 """
Dan Talaycoe226eb12010-02-18 23:06:30 -080078 s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
Dan Talayco3087a462010-02-13 14:01:47 -080079 socket.htons(ETH_P_ALL))
80 s.bind((interface_name, 0))
Dan Talayco1b3f6902010-02-15 14:14:19 -080081 netutils.set_promisc(s, interface_name)
Dan Talayco3087a462010-02-13 14:01:47 -080082 s.settimeout(RCV_TIMEOUT)
83 return s
84
Dan Talayco3087a462010-02-13 14:01:47 -080085 def run(self):
86 """
87 Activity function for class
88 """
89 self.running = True
Dan Talayco710438c2010-02-18 15:16:07 -080090 self.socs = [self.socket]
91 error_warned = False # Have we warned about error?
Dan Talayco3087a462010-02-13 14:01:47 -080092 while self.running:
93 try:
Dan Talayco710438c2010-02-18 15:16:07 -080094 sel_in, sel_out, sel_err = \
95 select.select(self.socs, [], [], 1)
96 except:
97 print sys.exc_info()
Dan Talayco48370102010-03-03 15:17:33 -080098 self.logger.error("Select error, exiting")
99 break
Dan Talayco710438c2010-02-18 15:16:07 -0800100
101 if not self.running:
102 break
103
Dan Talayco48370102010-03-03 15:17:33 -0800104 if (sel_in is None) or (len(sel_in) == 0):
Dan Talayco710438c2010-02-18 15:16:07 -0800105 continue
106
107 try:
Dan Talayco48370102010-03-03 15:17:33 -0800108 rcvmsg = self.socket.recv(RCV_SIZE_DEFAULT)
Dan Talayco3087a462010-02-13 14:01:47 -0800109 except socket.error:
Dan Talayco710438c2010-02-18 15:16:07 -0800110 if not error_warned:
Dan Talayco48370102010-03-03 15:17:33 -0800111 self.logger.info("Socket error on recv")
Dan Talayco710438c2010-02-18 15:16:07 -0800112 error_warned = True
Dan Talayco1b3f6902010-02-15 14:14:19 -0800113 continue
Dan Talayco710438c2010-02-18 15:16:07 -0800114
Dan Talayco1b3f6902010-02-15 14:14:19 -0800115 if len(rcvmsg) == 0:
Dan Talayco48370102010-03-03 15:17:33 -0800116 self.logger.info("Zero len pkt rcvd")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800117 self.kill()
Dan Talayco3087a462010-02-13 14:01:47 -0800118 break
119
Dan Talayco1b3f6902010-02-15 14:14:19 -0800120 rcvtime = time.clock()
Dan Talayco48370102010-03-03 15:17:33 -0800121 self.logger.debug("Pkt len " + str(len(rcvmsg)) +
Dan Talayco710438c2010-02-18 15:16:07 -0800122 " in at " + str(rcvtime))
Dan Talayco1b3f6902010-02-15 14:14:19 -0800123
Dan Talaycoe226eb12010-02-18 23:06:30 -0800124 # Enqueue packet
125 self.pkt_sync.acquire()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800126 if len(self.packets) >= self.max_pkts:
Dan Talaycoe226eb12010-02-18 23:06:30 -0800127 # Queue full, throw away oldest
Dan Talayco1b3f6902010-02-15 14:14:19 -0800128 self.packets.pop(0)
129 self.packets_discarded += 1
Dan Talaycoe226eb12010-02-18 23:06:30 -0800130 else:
131 self.parent.packets_pending += 1
Dan Talayco48370102010-03-03 15:17:33 -0800132 # Check if parent is waiting on this (or any) port
133 if self.parent.want_pkt:
134 if (not self.parent.want_pkt_port or
Dan Talaycoe226eb12010-02-18 23:06:30 -0800135 self.parent.want_pkt_port == self.port_number):
Dan Talayco48370102010-03-03 15:17:33 -0800136 self.parent.got_pkt_port = self.port_number
137 self.parent.want_pkt = False
138 self.parent.pkt_sync.notify()
Dan Talayco710438c2010-02-18 15:16:07 -0800139 self.packets.append((rcvmsg, rcvtime))
Dan Talayco1b3f6902010-02-15 14:14:19 -0800140 self.packets_total += 1
Dan Talaycoe226eb12010-02-18 23:06:30 -0800141 self.pkt_sync.release()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800142
Dan Talayco48370102010-03-03 15:17:33 -0800143 self.logger.info("Thread exit ")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800144
145 def kill(self):
146 """
147 Terminate the running thread
148 """
Dan Talayco48370102010-03-03 15:17:33 -0800149 self.logger.debug("Port monitor kill")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800150 self.running = False
151 try:
152 self.socket.close()
153 except:
Dan Talayco48370102010-03-03 15:17:33 -0800154 self.logger.info("Ignoring dataplane soc shutdown error")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800155
Dan Talaycoe226eb12010-02-18 23:06:30 -0800156 def dequeue(self, use_lock=True):
Dan Talayco3087a462010-02-13 14:01:47 -0800157 """
158 Get the oldest packet in the queue
Dan Talaycoe226eb12010-02-18 23:06:30 -0800159 @param use_lock If True, acquires the packet sync lock (which is
160 really the parent's lock)
161 @return The pair packet, packet time-stamp
Dan Talayco3087a462010-02-13 14:01:47 -0800162 """
Dan Talaycoe226eb12010-02-18 23:06:30 -0800163 if use_lock:
164 self.pkt_sync.acquire()
165 if len(self.packets) > 0:
166 pkt, pkt_time = self.packets.pop(0)
167 self.parent.packets_pending -= 1
168 else:
169 pkt = pkt_time = None
170 if use_lock:
171 self.pkt_sync.release()
Dan Talayco3087a462010-02-13 14:01:47 -0800172 return pkt, pkt_time
173
174 def timestamp_head(self):
175 """
176 Return the timestamp of the head of queue or None if empty
177 """
Dan Talayco710438c2010-02-18 15:16:07 -0800178 rv = None
Dan Talaycoe226eb12010-02-18 23:06:30 -0800179 try:
Dan Talayco710438c2010-02-18 15:16:07 -0800180 rv = self.packets[0][1]
Dan Talaycoe226eb12010-02-18 23:06:30 -0800181 except:
182 rv = None
Dan Talayco710438c2010-02-18 15:16:07 -0800183 return rv
Dan Talayco3087a462010-02-13 14:01:47 -0800184
185 def flush(self):
186 """
187 Clear the packet queue
188 """
Dan Talaycoe226eb12010-02-18 23:06:30 -0800189 self.pkt_sync.acquire()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800190 self.packets_discarded += len(self.packets)
Dan Talaycoe226eb12010-02-18 23:06:30 -0800191 self.parent.packets_pending -= len(self.packets)
Dan Talayco3087a462010-02-13 14:01:47 -0800192 self.packets = []
193 self.packet_times = []
Dan Talaycoe226eb12010-02-18 23:06:30 -0800194 self.pkt_sync.release()
Dan Talayco3087a462010-02-13 14:01:47 -0800195
196
197 def send(self, packet):
198 """
199 Send a packet to the dataplane port
200 @param packet The packet data to send to the port
201 @retval The number of bytes sent
202 """
203 return self.socket.send(packet)
204
205
206 def register(self, handler):
207 """
208 Register a callback function to receive packets from this
Dan Talaycoe226eb12010-02-18 23:06:30 -0800209 port. The callback will be passed the packet, the
210 interface name and the port number (if set) on which the
Dan Talayco3087a462010-02-13 14:01:47 -0800211 packet was received.
212
213 To be implemented
214 """
215 pass
216
Dan Talayco1b3f6902010-02-15 14:14:19 -0800217 def show(self, prefix=''):
218 print prefix + "Name: " + self.interface_name
Dan Talayco710438c2010-02-18 15:16:07 -0800219 print prefix + "Pkts pending: " + str(len(self.packets))
Dan Talayco1b3f6902010-02-15 14:14:19 -0800220 print prefix + "Pkts total: " + str(self.packets_total)
221 print prefix + "socket: " + str(self.socket)
Dan Talaycoe226eb12010-02-18 23:06:30 -0800222
Dan Talayco34089522010-02-07 23:07:41 -0800223
224class DataPlane:
225 """
226 Class defining access primitives to the data plane
227 Controls a list of DataPlanePort objects
228 """
229 def __init__(self):
230 self.port_list = {}
Dan Talaycoe226eb12010-02-18 23:06:30 -0800231 # pkt_sync serves double duty as a regular top level lock and
232 # as a condition variable
233 self.pkt_sync = Condition()
234
235 # These are used to signal async pkt arrival for polling
236 self.want_pkt = False
237 self.want_pkt_port = None # What port required (or None)
238 self.got_pkt_port = None # On what port received?
239 self.packets_pending = 0 # Total pkts in all port queues
Dan Talayco48370102010-03-03 15:17:33 -0800240 self.logger = logging.getLogger("dataplane")
Dan Talayco34089522010-02-07 23:07:41 -0800241
242 def port_add(self, interface_name, port_number):
243 """
244 Add a port to the dataplane
245 TBD: Max packets for queue?
246 @param interface_name The name of the physical interface like eth1
247 @param port_number The port number used to refer to the port
248 """
Dan Talaycoe226eb12010-02-18 23:06:30 -0800249
250 self.port_list[port_number] = DataPlanePort(interface_name,
251 port_number, self)
Dan Talayco34089522010-02-07 23:07:41 -0800252 self.port_list[port_number].start()
253
254 def send(self, port_number, packet):
255 """
256 Send a packet to the given port
257 @param port_number The port to send the data to
258 @param packet Raw packet data to send to port
259 """
Dan Talayco48370102010-03-03 15:17:33 -0800260 self.logger.debug("Sending %d bytes to port %d" %
261 (len(packet), port_number))
Dan Talayco34089522010-02-07 23:07:41 -0800262 bytes = self.port_list[port_number].send(packet)
263 if bytes != len(packet):
Dan Talayco48370102010-03-03 15:17:33 -0800264 self.logger.error("Unhandled send error, length mismatch %d != %d" %
Dan Talayco1b3f6902010-02-15 14:14:19 -0800265 (bytes, len(packet)))
Dan Talayco34089522010-02-07 23:07:41 -0800266 return bytes
267
268 def flood(self, packet):
269 """
270 Send a packet to all ports
271 @param packet Raw packet data to send to port
272 """
273 for port_number in self.port_list.keys():
274 bytes = self.port_list[port_number].send(packet)
275 if bytes != len(packet):
Dan Talayco48370102010-03-03 15:17:33 -0800276 self.logger.error("Unhandled send error" +
Dan Talayco1b3f6902010-02-15 14:14:19 -0800277 ", port %d, length mismatch %d != %d" %
278 (port_number, bytes, len(packet)))
Dan Talayco34089522010-02-07 23:07:41 -0800279
Dan Talaycoe226eb12010-02-18 23:06:30 -0800280 def _oldest_packet_find(self):
Dan Talayco34089522010-02-07 23:07:41 -0800281 # Find port with oldest packet
282 min_time = 0
283 min_port = -1
284 for port_number in self.port_list.keys():
285 ptime = self.port_list[port_number].timestamp_head()
286 if ptime:
287 if (min_port == -1) or (ptime < min_time):
Dan Talaycoe226eb12010-02-18 23:06:30 -0800288 min_time = ptime
Dan Talayco34089522010-02-07 23:07:41 -0800289 min_port = port_number
Dan Talaycoe226eb12010-02-18 23:06:30 -0800290 oft_assert(min_port != -1, "Could not find port when pkts pending")
Dan Talayco34089522010-02-07 23:07:41 -0800291
Dan Talaycoe226eb12010-02-18 23:06:30 -0800292 return min_port
293
294 def poll(self, port_number=None, timeout=None):
295 """
296 Poll one or all dataplane ports for a packet
297
298 If port_number is given, get the oldest packet from that port.
299 Otherwise, find the port with the oldest packet and return
300 that packet.
301 @param port_number If set, get packet from this port
302 @param timeout If positive and no packet is available, block
303 until a packet is received or for this many seconds
304 @return The triple port_number, packet, pkt_time where packet
305 is received from port_number at time pkt_time. If a timeout
306 occurs, return None, None, None
307 """
308
309 self.pkt_sync.acquire()
310
311 # Check if requested specific port and it has a packet
312 if port_number and len(self.port_list[port_number].packets) != 0:
313 pkt, time = self.port_list[port_number].dequeue(use_lock=False)
314 self.pkt_sync.release()
315 oft_assert(pkt, "Poll: packet not found on port " +
316 str(port_number))
317 return port_number, pkt, time
318
319 # Check if requested any port and some packet pending
320 if not port_number and self.packets_pending != 0:
321 port = self._oldest_packet_find()
322 pkt, time = self.port_list[port].dequeue(use_lock=False)
323 self.pkt_sync.release()
324 oft_assert(pkt, "Poll: oldest packet not found")
325 return port, pkt, time
326
327 # No packet pending; blocking call requested?
328 if not timeout:
329 self.pkt_sync.release()
Dan Talayco34089522010-02-07 23:07:41 -0800330 return None, None, None
331
Dan Talaycoe226eb12010-02-18 23:06:30 -0800332 # Desired packet isn't available and timeout is specified
333 # Already holding pkt_sync; wait on pkt_sync variable
334 self.want_pkt = True
335 self.want_pkt_port = port_number
336 self.got_pkt_port = None
337 self.pkt_sync.wait(timeout)
338 self.want_pkt = False
339 if self.got_pkt_port:
340 pkt, time = \
341 self.port_list[self.got_pkt_port].dequeue(use_lock=False)
342 self.pkt_sync.release()
343 oft_assert(pkt, "Poll: pkt reported, but not found at " +
Dan Talayco48370102010-03-03 15:17:33 -0800344 str(self.got_pkt_port))
Dan Talaycoe226eb12010-02-18 23:06:30 -0800345 return self.got_pkt_port, pkt, time
346
347 self.pkt_sync.release()
Dan Talayco48370102010-03-03 15:17:33 -0800348 self.logger.debug("Poll time out, no packet")
Dan Talaycoe226eb12010-02-18 23:06:30 -0800349
350 return None, None, None
Dan Talayco34089522010-02-07 23:07:41 -0800351
Dan Talayco48370102010-03-03 15:17:33 -0800352 def kill(self, join_threads=True):
Dan Talayco1b3f6902010-02-15 14:14:19 -0800353 """
354 Close all sockets for dataplane
Dan Talayco710438c2010-02-18 15:16:07 -0800355 @param join_threads If True call join on each thread
Dan Talayco1b3f6902010-02-15 14:14:19 -0800356 """
Dan Talayco34089522010-02-07 23:07:41 -0800357 for port_number in self.port_list.keys():
358 self.port_list[port_number].kill()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800359 if join_threads:
Dan Talayco48370102010-03-03 15:17:33 -0800360 self.logger.debug("Joining " + str(port_number))
Dan Talayco1b3f6902010-02-15 14:14:19 -0800361 self.port_list[port_number].join()
Dan Talayco34089522010-02-07 23:07:41 -0800362
Dan Talayco48370102010-03-03 15:17:33 -0800363 self.logger.info("DataPlane shutdown")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800364
365 def show(self, prefix=''):
366 print prefix + "Dataplane Controller"
Dan Talaycoe226eb12010-02-18 23:06:30 -0800367 print prefix + "Packets pending" + str(self.packets_pending)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800368 for pnum, port in self.port_list.items():
369 print prefix + "OpenFlow Port Number " + str(pnum)
370 port.show(prefix + ' ')
371