blob: 44ec997a1068ac1e70745b5da0bed74cc582c56e [file] [log] [blame]
Dan Talayco34089522010-02-07 23:07:41 -08001"""
2OpenFlow Test Framework
3
Dan Talayco3087a462010-02-13 14:01:47 -08004DataPlane and DataPlanePort classes
Dan Talayco34089522010-02-07 23:07:41 -08005
6Provide the interface to the control the set of ports being used
7to stimulate the switch under test.
8
9See the class dataplaneport for more details. This class wraps
10a set of those objects allowing general calls and parsing
11configuration.
12
Dan Talayco3087a462010-02-13 14:01:47 -080013@todo Add "filters" for matching packets. Actions supported
14for filters should include a callback or a counter
Dan Talayco34089522010-02-07 23:07:41 -080015"""
16
Dan Talayco3087a462010-02-13 14:01:47 -080017import sys
18import os
19import socket
20import time
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080021import netutils
Dan Talayco3087a462010-02-13 14:01:47 -080022from threading import Thread
23from threading import Lock
Dan Talayco710438c2010-02-18 15:16:07 -080024from oft_config import *
25import select
Dan Talayco3087a462010-02-13 14:01:47 -080026
27#@todo Move these identifiers into config
28ETH_P_ALL = 0x03
29RCV_TIMEOUT = 10000
30RCV_SIZE = 4096
31
Dan Talayco710438c2010-02-18 15:16:07 -080032# class packet_queue:
33# """
34# Class defining a packet queue across multiple ports
35
36# Items in the queue are stored as a triple (port number, pkt, pkt in time)
37# """
38
39# def __init__(self, max_pkts=1024):
40# self.sync = Lock()
41# self.debug_level = debug_level_default
42# self.packets = []
43# self.max_pkts = max_pkts
44# self.packets_total = 0
45# self.packets_discarded = 0
46
Dan Talayco3087a462010-02-13 14:01:47 -080047class DataPlanePort(Thread):
48 """
49 Class defining a port monitoring object.
50
51 Control a dataplane port connected to the switch under test.
52 Creates a promiscuous socket on a physical interface.
53 Queues the packets received on that interface with time stamps.
54 Inherits from Thread class as meant to run in background. Also
55 supports polling.
56 Use accessors to dequeue packets for proper synchronization.
57 """
58
59 def __init__(self, interface_name, max_pkts=1024):
60 """
61 Set up a port monitor object
62 @param interface_name The name of the physical interface like eth1
63 @param max_pkts Maximum number of pkts to keep in queue
64 """
65 Thread.__init__(self)
66 self.interface_name = interface_name
Dan Talayco710438c2010-02-18 15:16:07 -080067 self.debug_level = debug_level_default
Dan Talayco3087a462010-02-13 14:01:47 -080068 self.max_pkts = max_pkts
Dan Talayco3087a462010-02-13 14:01:47 -080069 self.packets_total = 0
70 self.packets = []
Dan Talayco1b3f6902010-02-15 14:14:19 -080071 self.packets_discarded = 0
Dan Talayco3087a462010-02-13 14:01:47 -080072 self.sync = Lock()
73 self.socket = self.interface_open(interface_name)
Dan Talayco710438c2010-02-18 15:16:07 -080074 self.dbg(DEBUG_INFO, "Openned port monitor socket")
Dan Talayco1b3f6902010-02-15 14:14:19 -080075
76 def dbg(self, level, string):
Dan Talayco710438c2010-02-18 15:16:07 -080077 debug_log("DPLANE", self.debug_level, level,
78 self.interface_name + ": " + string)
Dan Talayco3087a462010-02-13 14:01:47 -080079
80 def interface_open(self, interface_name):
81 """
82 Open a socket in a promiscuous mode for a data connection.
83 @param interface_name port name as a string such as 'eth1'
84 @retval s socket
85 """
86 s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
87 socket.htons(ETH_P_ALL))
88 s.bind((interface_name, 0))
Dan Talayco1b3f6902010-02-15 14:14:19 -080089 netutils.set_promisc(s, interface_name)
Dan Talayco3087a462010-02-13 14:01:47 -080090 s.settimeout(RCV_TIMEOUT)
91 return s
92
Dan Talayco3087a462010-02-13 14:01:47 -080093 def run(self):
94 """
95 Activity function for class
96 """
97 self.running = True
Dan Talayco710438c2010-02-18 15:16:07 -080098 self.socs = [self.socket]
99 error_warned = False # Have we warned about error?
Dan Talayco3087a462010-02-13 14:01:47 -0800100 while self.running:
101 try:
Dan Talayco710438c2010-02-18 15:16:07 -0800102 sel_in, sel_out, sel_err = \
103 select.select(self.socs, [], [], 1)
104 except:
105 print sys.exc_info()
106 self.dbg(DEBUG_ERROR, "Select error, exiting")
107 sys.exit(1)
108
109 #if not sel_err is None:
110 # self.dbg(DEBUG_VERBOSE, "Socket error from select set")
111
112 if not self.running:
113 break
114
115 if sel_in is None:
116 continue
117
118 try:
Dan Talayco3087a462010-02-13 14:01:47 -0800119 rcvmsg = self.socket.recv(RCV_SIZE)
Dan Talayco3087a462010-02-13 14:01:47 -0800120 except socket.error:
Dan Talayco710438c2010-02-18 15:16:07 -0800121 if not error_warned:
122 self.dbg(DEBUG_INFO, "Socket error on recv")
123 error_warned = True
Dan Talayco1b3f6902010-02-15 14:14:19 -0800124 continue
Dan Talayco710438c2010-02-18 15:16:07 -0800125
Dan Talayco1b3f6902010-02-15 14:14:19 -0800126 if len(rcvmsg) == 0:
Dan Talayco710438c2010-02-18 15:16:07 -0800127 self.dbg(DEBUG_INFO, "Zero len pkt rcvd")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800128 self.kill()
Dan Talayco3087a462010-02-13 14:01:47 -0800129 break
130
Dan Talayco1b3f6902010-02-15 14:14:19 -0800131 rcvtime = time.clock()
Dan Talayco710438c2010-02-18 15:16:07 -0800132 self.dbg(DEBUG_VERBOSE, "Pkt len " + str(len(rcvmsg)) +
133 " in at " + str(rcvtime))
Dan Talayco1b3f6902010-02-15 14:14:19 -0800134
135 self.sync.acquire()
136 if len(self.packets) >= self.max_pkts:
137 self.packets.pop(0)
138 self.packets_discarded += 1
Dan Talayco710438c2010-02-18 15:16:07 -0800139 self.packets.append((rcvmsg, rcvtime))
Dan Talayco1b3f6902010-02-15 14:14:19 -0800140 self.packets_total += 1
141 self.sync.release()
142
Dan Talayco710438c2010-02-18 15:16:07 -0800143 self.dbg(DEBUG_INFO, "Thread exit ")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800144
145 def kill(self):
146 """
147 Terminate the running thread
148 """
149 self.running = False
150 try:
151 self.socket.close()
152 except:
153 self.dbg(DEBUG_INFO, "Ignoring dataplane soc shutdown error")
Dan Talayco710438c2010-02-18 15:16:07 -0800154 self.dbg(DEBUG_INFO,
155 "Port monitor exiting")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800156
Dan Talayco3087a462010-02-13 14:01:47 -0800157 def dequeue(self):
158 """
159 Get the oldest packet in the queue
160 """
161 self.sync.acquire()
Dan Talayco710438c2010-02-18 15:16:07 -0800162 pkt, pkt_time = self.packets.pop(0)
Dan Talayco3087a462010-02-13 14:01:47 -0800163 self.sync.release()
164 return pkt, pkt_time
165
166 def timestamp_head(self):
167 """
168 Return the timestamp of the head of queue or None if empty
169 """
Dan Talayco710438c2010-02-18 15:16:07 -0800170 rv = None
171 self.sync.acquire()
172 if len(self.packets) > 0:
173 rv = self.packets[0][1]
174 self.sync.release()
175 return rv
Dan Talayco3087a462010-02-13 14:01:47 -0800176
177 def flush(self):
178 """
179 Clear the packet queue
180 """
181 self.sync.acquire()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800182 self.packets_discarded += len(self.packets)
Dan Talayco3087a462010-02-13 14:01:47 -0800183 self.packets = []
184 self.packet_times = []
Dan Talayco3087a462010-02-13 14:01:47 -0800185 self.sync.release()
186
187
188 def send(self, packet):
189 """
190 Send a packet to the dataplane port
191 @param packet The packet data to send to the port
192 @retval The number of bytes sent
193 """
Dan Talayco710438c2010-02-18 15:16:07 -0800194 self.dbg(DEBUG_VERBOSE,
Dan Talayco1b3f6902010-02-15 14:14:19 -0800195 "port sending " + str(len(packet)) + " bytes")
Dan Talayco3087a462010-02-13 14:01:47 -0800196 return self.socket.send(packet)
197
198
199 def register(self, handler):
200 """
201 Register a callback function to receive packets from this
202 port. The callback will be passed the packet, the
203 interface name and the port number (if set) on which the
204 packet was received.
205
206 To be implemented
207 """
208 pass
209
Dan Talayco1b3f6902010-02-15 14:14:19 -0800210 def show(self, prefix=''):
211 print prefix + "Name: " + self.interface_name
Dan Talayco710438c2010-02-18 15:16:07 -0800212 print prefix + "Pkts pending: " + str(len(self.packets))
Dan Talayco1b3f6902010-02-15 14:14:19 -0800213 print prefix + "Pkts total: " + str(self.packets_total)
214 print prefix + "socket: " + str(self.socket)
215
Dan Talayco34089522010-02-07 23:07:41 -0800216
217class DataPlane:
218 """
219 Class defining access primitives to the data plane
220 Controls a list of DataPlanePort objects
221 """
222 def __init__(self):
223 self.port_list = {}
Dan Talayco710438c2010-02-18 15:16:07 -0800224 self.debug_level = debug_level_default
Dan Talayco1b3f6902010-02-15 14:14:19 -0800225
226 def dbg(self, level, string):
Dan Talayco710438c2010-02-18 15:16:07 -0800227 debug_log("DPORT", self.debug_level, level, string)
Dan Talayco34089522010-02-07 23:07:41 -0800228
229 def port_add(self, interface_name, port_number):
230 """
231 Add a port to the dataplane
232 TBD: Max packets for queue?
233 @param interface_name The name of the physical interface like eth1
234 @param port_number The port number used to refer to the port
235 """
236
237 self.port_list[port_number] = DataPlanePort(interface_name)
238 self.port_list[port_number].start()
239
240 def send(self, port_number, packet):
241 """
242 Send a packet to the given port
243 @param port_number The port to send the data to
244 @param packet Raw packet data to send to port
245 """
Dan Talayco710438c2010-02-18 15:16:07 -0800246 self.dbg(DEBUG_VERBOSE,
Dan Talayco1b3f6902010-02-15 14:14:19 -0800247 "Sending %d bytes to port %d" % (len(packet), port_number))
Dan Talayco34089522010-02-07 23:07:41 -0800248 bytes = self.port_list[port_number].send(packet)
249 if bytes != len(packet):
Dan Talayco1b3f6902010-02-15 14:14:19 -0800250 self.dbg(DEBUG_ERROR,"Unhandled send error, " +
251 "length mismatch %d != %d" %
252 (bytes, len(packet)))
Dan Talayco34089522010-02-07 23:07:41 -0800253 return bytes
254
255 def flood(self, packet):
256 """
257 Send a packet to all ports
258 @param packet Raw packet data to send to port
259 """
260 for port_number in self.port_list.keys():
261 bytes = self.port_list[port_number].send(packet)
262 if bytes != len(packet):
Dan Talayco1b3f6902010-02-15 14:14:19 -0800263 self.dbg(DEBUG_ERROR, "Unhandled send error" +
264 ", port %d, length mismatch %d != %d" %
265 (port_number, bytes, len(packet)))
Dan Talayco34089522010-02-07 23:07:41 -0800266
267 def packet_get(self, port_number=None):
268 """
269 Get a packet from the data plane
Dan Talayco710438c2010-02-18 15:16:07 -0800270
271 If port_number is given, get the oldest packet from that port.
Dan Talayco34089522010-02-07 23:07:41 -0800272 Otherwise, find the port with the oldest packet and return
273 that packet.
274 @param port_number If set, get packet from this port
275 @retval The triple port_number, packet, pkt_time where packet
276 is received from port_number at time pkt_time.
277 """
278
279 if port_number:
Dan Talayco710438c2010-02-18 15:16:07 -0800280 if len(self.port_list[port_number].packets) != 0:
Dan Talayco34089522010-02-07 23:07:41 -0800281 pkt, time = self.port_list[port_number].dequeue()
282 return port_number, pkt, time
283 else:
284 return None, None, None
285
286 # Find port with oldest packet
Dan Talayco710438c2010-02-18 15:16:07 -0800287 #@todo Consider using a single queue for all ports
Dan Talayco34089522010-02-07 23:07:41 -0800288 min_time = 0
289 min_port = -1
290 for port_number in self.port_list.keys():
291 ptime = self.port_list[port_number].timestamp_head()
292 if ptime:
293 if (min_port == -1) or (ptime < min_time):
294 min_time = ptime
295 min_port = port_number
296
297 if min_port == -1:
298 return None, None, None
299
300 pkt, time = self.port_list[min_port].dequeue()
301 return min_port, pkt, time
302
Dan Talayco710438c2010-02-18 15:16:07 -0800303 def kill(self, join_threads=False):
Dan Talayco1b3f6902010-02-15 14:14:19 -0800304 """
305 Close all sockets for dataplane
Dan Talayco710438c2010-02-18 15:16:07 -0800306 @param join_threads If True call join on each thread
Dan Talayco1b3f6902010-02-15 14:14:19 -0800307 """
Dan Talayco34089522010-02-07 23:07:41 -0800308 for port_number in self.port_list.keys():
309 self.port_list[port_number].kill()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800310 if join_threads:
Dan Talayco710438c2010-02-18 15:16:07 -0800311 self.dbg(DEBUG_INFO, "Joining " + str(port_number))
Dan Talayco1b3f6902010-02-15 14:14:19 -0800312 self.port_list[port_number].join()
Dan Talayco34089522010-02-07 23:07:41 -0800313
Dan Talayco710438c2010-02-18 15:16:07 -0800314 self.dbg(DEBUG_INFO, "DataPlane shutdown")
Dan Talayco1b3f6902010-02-15 14:14:19 -0800315
316 def show(self, prefix=''):
317 print prefix + "Dataplane Controller"
318 for pnum, port in self.port_list.items():
319 print prefix + "OpenFlow Port Number " + str(pnum)
320 port.show(prefix + ' ')
321