blob: 965bd62baf7c3b4de73a385960ed242d4e1eb63f [file] [log] [blame]
Dan Talayco34089522010-02-07 23:07:41 -08001"""
2OpenFlow Test Framework
3
Dan Talayco3087a462010-02-13 14:01:47 -08004DataPlane and DataPlanePort classes
Dan Talayco34089522010-02-07 23:07:41 -08005
6Provide the interface to the control the set of ports being used
7to stimulate the switch under test.
8
9See the class dataplaneport for more details. This class wraps
Dan Talaycoe226eb12010-02-18 23:06:30 -080010a set of those objects allowing general calls and parsing
Dan Talayco34089522010-02-07 23:07:41 -080011configuration.
12
Dan Talayco3087a462010-02-13 14:01:47 -080013@todo Add "filters" for matching packets. Actions supported
14for filters should include a callback or a counter
Dan Talayco34089522010-02-07 23:07:41 -080015"""
16
Dan Talayco3087a462010-02-13 14:01:47 -080017import sys
18import os
19import socket
20import time
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080021import netutils
Dan Talayco3087a462010-02-13 14:01:47 -080022from threading import Thread
23from threading import Lock
Dan Talaycoe226eb12010-02-18 23:06:30 -080024from threading import Condition
Dan Talayco710438c2010-02-18 15:16:07 -080025import select
Dan Talayco48370102010-03-03 15:17:33 -080026import logging
27from oft_assert import oft_assert
Dan Talayco3087a462010-02-13 14:01:47 -080028
Dan Talayco48370102010-03-03 15:17:33 -080029##@todo Find a better home for these identifiers (dataplane)
30RCV_SIZE_DEFAULT = 4096
Dan Talayco3087a462010-02-13 14:01:47 -080031ETH_P_ALL = 0x03
32RCV_TIMEOUT = 10000
Dan Talayco3087a462010-02-13 14:01:47 -080033
34class DataPlanePort(Thread):
35 """
36 Class defining a port monitoring object.
37
38 Control a dataplane port connected to the switch under test.
39 Creates a promiscuous socket on a physical interface.
40 Queues the packets received on that interface with time stamps.
41 Inherits from Thread class as meant to run in background. Also
42 supports polling.
43 Use accessors to dequeue packets for proper synchronization.
Dan Talaycoe226eb12010-02-18 23:06:30 -080044
45 Currently assumes a controlling 'parent' which maintains a
46 common Lock object and a total packet-pending count. May want
47 to decouple that some day.
Dan Talayco3087a462010-02-13 14:01:47 -080048 """
49
Dan Talaycoe226eb12010-02-18 23:06:30 -080050 def __init__(self, interface_name, port_number, parent, max_pkts=1024):
Dan Talayco3087a462010-02-13 14:01:47 -080051 """
52 Set up a port monitor object
53 @param interface_name The name of the physical interface like eth1
Dan Talayco4d065972010-02-18 23:11:32 -080054 @param port_number The port number associated with this port
Dan Talaycoe226eb12010-02-18 23:06:30 -080055 @param parent The controlling dataplane object; for pkt wait CV
Dan Talayco3087a462010-02-13 14:01:47 -080056 @param max_pkts Maximum number of pkts to keep in queue
57 """
58 Thread.__init__(self)
59 self.interface_name = interface_name
60 self.max_pkts = max_pkts
Dan Talayco3087a462010-02-13 14:01:47 -080061 self.packets_total = 0
62 self.packets = []
Dan Talayco1b3f6902010-02-15 14:14:19 -080063 self.packets_discarded = 0
Dan Talaycoe226eb12010-02-18 23:06:30 -080064 self.port_number = port_number
Dan Talayco48370102010-03-03 15:17:33 -080065 logname = "dp-" + interface_name
66 self.logger = logging.getLogger(logname)
Dan Talayco0db53eb2010-03-10 14:00:02 -080067 try:
68 self.socket = self.interface_open(interface_name)
69 except:
70 self.logger.info("Could not open socket")
71 sys.exit(1)
Dan Talayco48370102010-03-03 15:17:33 -080072 self.logger.info("Openned port monitor socket")
Dan Talaycoe226eb12010-02-18 23:06:30 -080073 self.parent = parent
74 self.pkt_sync = self.parent.pkt_sync
Dan Talayco1b3f6902010-02-15 14:14:19 -080075
Dan Talayco3087a462010-02-13 14:01:47 -080076 def interface_open(self, interface_name):
77 """
78 Open a socket in a promiscuous mode for a data connection.
79 @param interface_name port name as a string such as 'eth1'
80 @retval s socket
81 """
Dan Talaycoe226eb12010-02-18 23:06:30 -080082 s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
Dan Talayco3087a462010-02-13 14:01:47 -080083 socket.htons(ETH_P_ALL))
84 s.bind((interface_name, 0))
Dan Talayco1b3f6902010-02-15 14:14:19 -080085 netutils.set_promisc(s, interface_name)
Dan Talayco3087a462010-02-13 14:01:47 -080086 s.settimeout(RCV_TIMEOUT)
87 return s
88
Dan Talayco3087a462010-02-13 14:01:47 -080089 def run(self):
90 """
91 Activity function for class
92 """
93 self.running = True
Dan Talayco710438c2010-02-18 15:16:07 -080094 self.socs = [self.socket]
95 error_warned = False # Have we warned about error?
Dan Talayco3087a462010-02-13 14:01:47 -080096 while self.running:
97 try:
Dan Talayco710438c2010-02-18 15:16:07 -080098 sel_in, sel_out, sel_err = \
99 select.select(self.socs, [], [], 1)
100 except:
101 print sys.exc_info()
Dan Talayco48370102010-03-03 15:17:33 -0800102 self.logger.error("Select error, exiting")
103 break
Dan Talayco710438c2010-02-18 15:16:07 -0800104
105 if not self.running:
106 break
107
Dan Talayco48370102010-03-03 15:17:33 -0800108 if (sel_in is None) or (len(sel_in) == 0):
Dan Talayco710438c2010-02-18 15:16:07 -0800109 continue
110
111 try:
Dan Talayco48370102010-03-03 15:17:33 -0800112 rcvmsg = self.socket.recv(RCV_SIZE_DEFAULT)
Dan Talayco3087a462010-02-13 14:01:47 -0800113 except socket.error:
Dan Talayco710438c2010-02-18 15:16:07 -0800114 if not error_warned:
Dan Talayco48370102010-03-03 15:17:33 -0800115 self.logger.info("Socket error on recv")
Dan Talayco710438c2010-02-18 15:16:07 -0800116 error_warned = True
Dan Talayco1b3f6902010-02-15 14:14:19 -0800117 continue
Dan Talayco710438c2010-02-18 15:16:07 -0800118
Dan Talayco1b3f6902010-02-15 14:14:19 -0800119 if len(rcvmsg) == 0:
Dan Talayco48370102010-03-03 15:17:33 -0800120 self.logger.info("Zero len pkt rcvd")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800121 self.kill()
Dan Talayco3087a462010-02-13 14:01:47 -0800122 break
123
Dan Talayco1b3f6902010-02-15 14:14:19 -0800124 rcvtime = time.clock()
Dan Talayco48370102010-03-03 15:17:33 -0800125 self.logger.debug("Pkt len " + str(len(rcvmsg)) +
Dan Talayco710438c2010-02-18 15:16:07 -0800126 " in at " + str(rcvtime))
Dan Talayco1b3f6902010-02-15 14:14:19 -0800127
Dan Talaycoe226eb12010-02-18 23:06:30 -0800128 # Enqueue packet
129 self.pkt_sync.acquire()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800130 if len(self.packets) >= self.max_pkts:
Dan Talaycoe226eb12010-02-18 23:06:30 -0800131 # Queue full, throw away oldest
Dan Talayco1b3f6902010-02-15 14:14:19 -0800132 self.packets.pop(0)
133 self.packets_discarded += 1
Dan Talaycoe226eb12010-02-18 23:06:30 -0800134 else:
135 self.parent.packets_pending += 1
Dan Talayco48370102010-03-03 15:17:33 -0800136 # Check if parent is waiting on this (or any) port
137 if self.parent.want_pkt:
138 if (not self.parent.want_pkt_port or
Dan Talaycoe226eb12010-02-18 23:06:30 -0800139 self.parent.want_pkt_port == self.port_number):
Dan Talayco48370102010-03-03 15:17:33 -0800140 self.parent.got_pkt_port = self.port_number
141 self.parent.want_pkt = False
142 self.parent.pkt_sync.notify()
Dan Talayco710438c2010-02-18 15:16:07 -0800143 self.packets.append((rcvmsg, rcvtime))
Dan Talayco1b3f6902010-02-15 14:14:19 -0800144 self.packets_total += 1
Dan Talaycoe226eb12010-02-18 23:06:30 -0800145 self.pkt_sync.release()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800146
Dan Talayco48370102010-03-03 15:17:33 -0800147 self.logger.info("Thread exit ")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800148
149 def kill(self):
150 """
151 Terminate the running thread
152 """
Dan Talayco48370102010-03-03 15:17:33 -0800153 self.logger.debug("Port monitor kill")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800154 self.running = False
155 try:
156 self.socket.close()
157 except:
Dan Talayco48370102010-03-03 15:17:33 -0800158 self.logger.info("Ignoring dataplane soc shutdown error")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800159
Dan Talaycoe226eb12010-02-18 23:06:30 -0800160 def dequeue(self, use_lock=True):
Dan Talayco3087a462010-02-13 14:01:47 -0800161 """
162 Get the oldest packet in the queue
Dan Talaycoe226eb12010-02-18 23:06:30 -0800163 @param use_lock If True, acquires the packet sync lock (which is
164 really the parent's lock)
165 @return The pair packet, packet time-stamp
Dan Talayco3087a462010-02-13 14:01:47 -0800166 """
Dan Talaycoe226eb12010-02-18 23:06:30 -0800167 if use_lock:
168 self.pkt_sync.acquire()
169 if len(self.packets) > 0:
170 pkt, pkt_time = self.packets.pop(0)
171 self.parent.packets_pending -= 1
172 else:
173 pkt = pkt_time = None
174 if use_lock:
175 self.pkt_sync.release()
Dan Talayco3087a462010-02-13 14:01:47 -0800176 return pkt, pkt_time
177
178 def timestamp_head(self):
179 """
180 Return the timestamp of the head of queue or None if empty
181 """
Dan Talayco710438c2010-02-18 15:16:07 -0800182 rv = None
Dan Talaycoe226eb12010-02-18 23:06:30 -0800183 try:
Dan Talayco710438c2010-02-18 15:16:07 -0800184 rv = self.packets[0][1]
Dan Talaycoe226eb12010-02-18 23:06:30 -0800185 except:
186 rv = None
Dan Talayco710438c2010-02-18 15:16:07 -0800187 return rv
Dan Talayco3087a462010-02-13 14:01:47 -0800188
189 def flush(self):
190 """
191 Clear the packet queue
192 """
Dan Talaycoe226eb12010-02-18 23:06:30 -0800193 self.pkt_sync.acquire()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800194 self.packets_discarded += len(self.packets)
Dan Talaycoe226eb12010-02-18 23:06:30 -0800195 self.parent.packets_pending -= len(self.packets)
Dan Talayco3087a462010-02-13 14:01:47 -0800196 self.packets = []
197 self.packet_times = []
Dan Talaycoe226eb12010-02-18 23:06:30 -0800198 self.pkt_sync.release()
Dan Talayco3087a462010-02-13 14:01:47 -0800199
200
201 def send(self, packet):
202 """
203 Send a packet to the dataplane port
204 @param packet The packet data to send to the port
205 @retval The number of bytes sent
206 """
207 return self.socket.send(packet)
208
209
210 def register(self, handler):
211 """
212 Register a callback function to receive packets from this
Dan Talaycoe226eb12010-02-18 23:06:30 -0800213 port. The callback will be passed the packet, the
214 interface name and the port number (if set) on which the
Dan Talayco3087a462010-02-13 14:01:47 -0800215 packet was received.
216
217 To be implemented
218 """
219 pass
220
Dan Talayco1b3f6902010-02-15 14:14:19 -0800221 def show(self, prefix=''):
222 print prefix + "Name: " + self.interface_name
Dan Talayco710438c2010-02-18 15:16:07 -0800223 print prefix + "Pkts pending: " + str(len(self.packets))
Dan Talayco1b3f6902010-02-15 14:14:19 -0800224 print prefix + "Pkts total: " + str(self.packets_total)
225 print prefix + "socket: " + str(self.socket)
Dan Talaycoe226eb12010-02-18 23:06:30 -0800226
Dan Talayco34089522010-02-07 23:07:41 -0800227
228class DataPlane:
229 """
230 Class defining access primitives to the data plane
231 Controls a list of DataPlanePort objects
232 """
233 def __init__(self):
234 self.port_list = {}
Dan Talaycoe226eb12010-02-18 23:06:30 -0800235 # pkt_sync serves double duty as a regular top level lock and
236 # as a condition variable
237 self.pkt_sync = Condition()
238
239 # These are used to signal async pkt arrival for polling
240 self.want_pkt = False
241 self.want_pkt_port = None # What port required (or None)
242 self.got_pkt_port = None # On what port received?
243 self.packets_pending = 0 # Total pkts in all port queues
Dan Talayco48370102010-03-03 15:17:33 -0800244 self.logger = logging.getLogger("dataplane")
Dan Talayco34089522010-02-07 23:07:41 -0800245
246 def port_add(self, interface_name, port_number):
247 """
248 Add a port to the dataplane
249 TBD: Max packets for queue?
250 @param interface_name The name of the physical interface like eth1
251 @param port_number The port number used to refer to the port
252 """
Dan Talaycoe226eb12010-02-18 23:06:30 -0800253
Dan Talayco11c26e72010-03-07 22:03:57 -0800254 self.port_list[port_number] = DataPlanePort(interface_name,
Dan Talaycoe226eb12010-02-18 23:06:30 -0800255 port_number, self)
Dan Talayco34089522010-02-07 23:07:41 -0800256 self.port_list[port_number].start()
257
258 def send(self, port_number, packet):
259 """
260 Send a packet to the given port
261 @param port_number The port to send the data to
262 @param packet Raw packet data to send to port
263 """
Dan Talayco11c26e72010-03-07 22:03:57 -0800264 self.logger.debug("Sending %d bytes to port %d" %
265 (len(packet), port_number))
Dan Talayco34089522010-02-07 23:07:41 -0800266 bytes = self.port_list[port_number].send(packet)
267 if bytes != len(packet):
Dan Talayco48370102010-03-03 15:17:33 -0800268 self.logger.error("Unhandled send error, length mismatch %d != %d" %
Dan Talayco1b3f6902010-02-15 14:14:19 -0800269 (bytes, len(packet)))
Dan Talayco34089522010-02-07 23:07:41 -0800270 return bytes
271
272 def flood(self, packet):
273 """
274 Send a packet to all ports
275 @param packet Raw packet data to send to port
276 """
277 for port_number in self.port_list.keys():
278 bytes = self.port_list[port_number].send(packet)
279 if bytes != len(packet):
Dan Talayco48370102010-03-03 15:17:33 -0800280 self.logger.error("Unhandled send error" +
Dan Talayco1b3f6902010-02-15 14:14:19 -0800281 ", port %d, length mismatch %d != %d" %
282 (port_number, bytes, len(packet)))
Dan Talayco34089522010-02-07 23:07:41 -0800283
Dan Talaycoe226eb12010-02-18 23:06:30 -0800284 def _oldest_packet_find(self):
Dan Talayco34089522010-02-07 23:07:41 -0800285 # Find port with oldest packet
286 min_time = 0
287 min_port = -1
288 for port_number in self.port_list.keys():
289 ptime = self.port_list[port_number].timestamp_head()
290 if ptime:
291 if (min_port == -1) or (ptime < min_time):
Dan Talaycoe226eb12010-02-18 23:06:30 -0800292 min_time = ptime
Dan Talayco34089522010-02-07 23:07:41 -0800293 min_port = port_number
Dan Talaycoe226eb12010-02-18 23:06:30 -0800294 oft_assert(min_port != -1, "Could not find port when pkts pending")
Dan Talayco34089522010-02-07 23:07:41 -0800295
Dan Talaycoe226eb12010-02-18 23:06:30 -0800296 return min_port
297
298 def poll(self, port_number=None, timeout=None):
299 """
300 Poll one or all dataplane ports for a packet
301
302 If port_number is given, get the oldest packet from that port.
303 Otherwise, find the port with the oldest packet and return
304 that packet.
305 @param port_number If set, get packet from this port
Dan Talayco11c26e72010-03-07 22:03:57 -0800306 @param timeout If positive and no packet is available, block
Dan Talaycoe226eb12010-02-18 23:06:30 -0800307 until a packet is received or for this many seconds
308 @return The triple port_number, packet, pkt_time where packet
309 is received from port_number at time pkt_time. If a timeout
310 occurs, return None, None, None
311 """
312
313 self.pkt_sync.acquire()
314
315 # Check if requested specific port and it has a packet
316 if port_number and len(self.port_list[port_number].packets) != 0:
317 pkt, time = self.port_list[port_number].dequeue(use_lock=False)
318 self.pkt_sync.release()
Dan Talayco11c26e72010-03-07 22:03:57 -0800319 oft_assert(pkt, "Poll: packet not found on port " +
Dan Talaycoe226eb12010-02-18 23:06:30 -0800320 str(port_number))
321 return port_number, pkt, time
322
323 # Check if requested any port and some packet pending
324 if not port_number and self.packets_pending != 0:
325 port = self._oldest_packet_find()
326 pkt, time = self.port_list[port].dequeue(use_lock=False)
327 self.pkt_sync.release()
328 oft_assert(pkt, "Poll: oldest packet not found")
329 return port, pkt, time
330
331 # No packet pending; blocking call requested?
332 if not timeout:
333 self.pkt_sync.release()
Dan Talayco34089522010-02-07 23:07:41 -0800334 return None, None, None
335
Dan Talaycoe226eb12010-02-18 23:06:30 -0800336 # Desired packet isn't available and timeout is specified
337 # Already holding pkt_sync; wait on pkt_sync variable
338 self.want_pkt = True
339 self.want_pkt_port = port_number
340 self.got_pkt_port = None
341 self.pkt_sync.wait(timeout)
342 self.want_pkt = False
343 if self.got_pkt_port:
344 pkt, time = \
345 self.port_list[self.got_pkt_port].dequeue(use_lock=False)
346 self.pkt_sync.release()
347 oft_assert(pkt, "Poll: pkt reported, but not found at " +
Dan Talayco48370102010-03-03 15:17:33 -0800348 str(self.got_pkt_port))
Dan Talaycoe226eb12010-02-18 23:06:30 -0800349 return self.got_pkt_port, pkt, time
350
351 self.pkt_sync.release()
Dan Talayco48370102010-03-03 15:17:33 -0800352 self.logger.debug("Poll time out, no packet")
Dan Talaycoe226eb12010-02-18 23:06:30 -0800353
354 return None, None, None
Dan Talayco34089522010-02-07 23:07:41 -0800355
Dan Talayco48370102010-03-03 15:17:33 -0800356 def kill(self, join_threads=True):
Dan Talayco1b3f6902010-02-15 14:14:19 -0800357 """
358 Close all sockets for dataplane
Dan Talayco710438c2010-02-18 15:16:07 -0800359 @param join_threads If True call join on each thread
Dan Talayco1b3f6902010-02-15 14:14:19 -0800360 """
Dan Talayco34089522010-02-07 23:07:41 -0800361 for port_number in self.port_list.keys():
362 self.port_list[port_number].kill()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800363 if join_threads:
Dan Talayco48370102010-03-03 15:17:33 -0800364 self.logger.debug("Joining " + str(port_number))
Dan Talayco1b3f6902010-02-15 14:14:19 -0800365 self.port_list[port_number].join()
Dan Talayco34089522010-02-07 23:07:41 -0800366
Dan Talayco48370102010-03-03 15:17:33 -0800367 self.logger.info("DataPlane shutdown")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800368
369 def show(self, prefix=''):
370 print prefix + "Dataplane Controller"
Dan Talaycoe226eb12010-02-18 23:06:30 -0800371 print prefix + "Packets pending" + str(self.packets_pending)
Dan Talayco1b3f6902010-02-15 14:14:19 -0800372 for pnum, port in self.port_list.items():
373 print prefix + "OpenFlow Port Number " + str(pnum)
374 port.show(prefix + ' ')
375