blob: 89cf073602232de1b368b67642f9a7b889a17603 [file] [log] [blame]
Dan Talayco34089522010-02-07 23:07:41 -08001"""
2OpenFlow Test Framework
3
Dan Talayco3087a462010-02-13 14:01:47 -08004DataPlane and DataPlanePort classes
Dan Talayco34089522010-02-07 23:07:41 -08005
6Provide the interface to the control the set of ports being used
7to stimulate the switch under test.
8
9See the class dataplaneport for more details. This class wraps
10a set of those objects allowing general calls and parsing
11configuration.
12
Dan Talayco3087a462010-02-13 14:01:47 -080013@todo Add "filters" for matching packets. Actions supported
14for filters should include a callback or a counter
Dan Talayco34089522010-02-07 23:07:41 -080015"""
16
Dan Talayco3087a462010-02-13 14:01:47 -080017import sys
18import os
19import socket
20import time
Dan Talaycod7e2dbe2010-02-13 21:51:15 -080021import netutils
Dan Talayco3087a462010-02-13 14:01:47 -080022from threading import Thread
23from threading import Lock
Dan Talayco1b3f6902010-02-15 14:14:19 -080024import oft_config
Dan Talayco3087a462010-02-13 14:01:47 -080025
26#@todo Move these identifiers into config
27ETH_P_ALL = 0x03
28RCV_TIMEOUT = 10000
29RCV_SIZE = 4096
30
31class DataPlanePort(Thread):
32 """
33 Class defining a port monitoring object.
34
35 Control a dataplane port connected to the switch under test.
36 Creates a promiscuous socket on a physical interface.
37 Queues the packets received on that interface with time stamps.
38 Inherits from Thread class as meant to run in background. Also
39 supports polling.
40 Use accessors to dequeue packets for proper synchronization.
41 """
42
43 def __init__(self, interface_name, max_pkts=1024):
44 """
45 Set up a port monitor object
46 @param interface_name The name of the physical interface like eth1
47 @param max_pkts Maximum number of pkts to keep in queue
48 """
49 Thread.__init__(self)
50 self.interface_name = interface_name
Dan Talayco1b3f6902010-02-15 14:14:19 -080051 self.debug_level = oft_config.debug_level_default
Dan Talayco3087a462010-02-13 14:01:47 -080052 self.max_pkts = max_pkts
53 self.packets_pending = 0
54 self.packets_total = 0
55 self.packets = []
56 self.packet_times = []
Dan Talayco1b3f6902010-02-15 14:14:19 -080057 self.packets_discarded = 0
Dan Talayco3087a462010-02-13 14:01:47 -080058 self.sync = Lock()
59 self.socket = self.interface_open(interface_name)
Dan Talayco1b3f6902010-02-15 14:14:19 -080060 self.dbg(oft_config.DEBUG_INFO,
61 "Openned port monitor socket " + interface_name)
62
63 def dbg(self, level, string):
64 oft_config.debug_log("DPLANE", self.debug_level, level, string)
Dan Talayco3087a462010-02-13 14:01:47 -080065
66 def interface_open(self, interface_name):
67 """
68 Open a socket in a promiscuous mode for a data connection.
69 @param interface_name port name as a string such as 'eth1'
70 @retval s socket
71 """
72 s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW,
73 socket.htons(ETH_P_ALL))
74 s.bind((interface_name, 0))
Dan Talayco1b3f6902010-02-15 14:14:19 -080075 netutils.set_promisc(s, interface_name)
Dan Talayco3087a462010-02-13 14:01:47 -080076 s.settimeout(RCV_TIMEOUT)
77 return s
78
Dan Talayco3087a462010-02-13 14:01:47 -080079 def run(self):
80 """
81 Activity function for class
82 """
83 self.running = True
84 while self.running:
85 try:
86 rcvmsg = self.socket.recv(RCV_SIZE)
Dan Talayco3087a462010-02-13 14:01:47 -080087 except socket.error:
Dan Talayco1b3f6902010-02-15 14:14:19 -080088 self.dbg(DEBUG_INFO, "Socket error for " +
89 self.interface_name)
90 continue
91 if len(rcvmsg) == 0:
92 self.dbg(DEBUG_INFO, "Zero len pkt on " + self.interface_name)
93 self.kill()
Dan Talayco3087a462010-02-13 14:01:47 -080094 break
95
Dan Talayco1b3f6902010-02-15 14:14:19 -080096 rcvtime = time.clock()
97
98 self.sync.acquire()
99 if len(self.packets) >= self.max_pkts:
100 self.packets.pop(0)
101 self.packets_discarded += 1
102 self.packets.append(rcvmsg)
103 self.packet_times.append(rcvtime)
104 self.packets_pending += 1
105 self.packets_total += 1
106 self.sync.release()
107
108 self.dbg(DEBUG_INFO, "Thread exit for " + self.interface_name)
109
110 def kill(self):
111 """
112 Terminate the running thread
113 """
114 self.running = False
115 try:
116 self.socket.close()
117 except:
118 self.dbg(DEBUG_INFO, "Ignoring dataplane soc shutdown error")
119 self.dbg(oft_config.DEBUG_INFO,
120 "Port monitor for " + self.interface_name + " exiting")
121
Dan Talayco3087a462010-02-13 14:01:47 -0800122 def dequeue(self):
123 """
124 Get the oldest packet in the queue
125 """
126 self.sync.acquire()
127 pkt = self.packets.pop(0)
128 pkt_time = self.packet_times.pop(0)
129 self.packets_pending -= 1
130 self.sync.release()
131 return pkt, pkt_time
132
133 def timestamp_head(self):
134 """
135 Return the timestamp of the head of queue or None if empty
136 """
137 if self.packets_pending:
138 return self.packet_times[0]
139 return None
140
141 def flush(self):
142 """
143 Clear the packet queue
144 """
145 self.sync.acquire()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800146 self.packets_discarded += len(self.packets)
Dan Talayco3087a462010-02-13 14:01:47 -0800147 self.packets = []
148 self.packet_times = []
149 self.packets_pending = 0
150 self.sync.release()
151
152
153 def send(self, packet):
154 """
155 Send a packet to the dataplane port
156 @param packet The packet data to send to the port
157 @retval The number of bytes sent
158 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800159 self.dbg(oft_config.DEBUG_VERBOSE,
160 "port sending " + str(len(packet)) + " bytes")
Dan Talayco3087a462010-02-13 14:01:47 -0800161 return self.socket.send(packet)
162
163
164 def register(self, handler):
165 """
166 Register a callback function to receive packets from this
167 port. The callback will be passed the packet, the
168 interface name and the port number (if set) on which the
169 packet was received.
170
171 To be implemented
172 """
173 pass
174
Dan Talayco1b3f6902010-02-15 14:14:19 -0800175 def show(self, prefix=''):
176 print prefix + "Name: " + self.interface_name
177 print prefix + "Pkts pending: " + str(self.packets_pending)
178 print prefix + "Pkts total: " + str(self.packets_total)
179 print prefix + "socket: " + str(self.socket)
180
Dan Talayco34089522010-02-07 23:07:41 -0800181
182class DataPlane:
183 """
184 Class defining access primitives to the data plane
185 Controls a list of DataPlanePort objects
186 """
187 def __init__(self):
188 self.port_list = {}
Dan Talayco1b3f6902010-02-15 14:14:19 -0800189 self.debug_level = oft_config.debug_level_default
190
191 def dbg(self, level, string):
192 oft_config.debug_log("DPORT", self.debug_level, level, string)
Dan Talayco34089522010-02-07 23:07:41 -0800193
194 def port_add(self, interface_name, port_number):
195 """
196 Add a port to the dataplane
197 TBD: Max packets for queue?
198 @param interface_name The name of the physical interface like eth1
199 @param port_number The port number used to refer to the port
200 """
201
202 self.port_list[port_number] = DataPlanePort(interface_name)
203 self.port_list[port_number].start()
204
205 def send(self, port_number, packet):
206 """
207 Send a packet to the given port
208 @param port_number The port to send the data to
209 @param packet Raw packet data to send to port
210 """
Dan Talayco1b3f6902010-02-15 14:14:19 -0800211 self.dbg(oft_config.DEBUG_VERBOSE,
212 "Sending %d bytes to port %d" % (len(packet), port_number))
Dan Talayco34089522010-02-07 23:07:41 -0800213 bytes = self.port_list[port_number].send(packet)
214 if bytes != len(packet):
Dan Talayco1b3f6902010-02-15 14:14:19 -0800215 self.dbg(DEBUG_ERROR,"Unhandled send error, " +
216 "length mismatch %d != %d" %
217 (bytes, len(packet)))
Dan Talayco34089522010-02-07 23:07:41 -0800218 return bytes
219
220 def flood(self, packet):
221 """
222 Send a packet to all ports
223 @param packet Raw packet data to send to port
224 """
225 for port_number in self.port_list.keys():
226 bytes = self.port_list[port_number].send(packet)
227 if bytes != len(packet):
Dan Talayco1b3f6902010-02-15 14:14:19 -0800228 self.dbg(DEBUG_ERROR, "Unhandled send error" +
229 ", port %d, length mismatch %d != %d" %
230 (port_number, bytes, len(packet)))
Dan Talayco34089522010-02-07 23:07:41 -0800231
232 def packet_get(self, port_number=None):
233 """
234 Get a packet from the data plane
235 If port_number is given, get the packet from that port.
236 Otherwise, find the port with the oldest packet and return
237 that packet.
238 @param port_number If set, get packet from this port
239 @retval The triple port_number, packet, pkt_time where packet
240 is received from port_number at time pkt_time.
241 """
242
243 if port_number:
244 if self.port_list[port_number].packets_pending != 0:
245 pkt, time = self.port_list[port_number].dequeue()
246 return port_number, pkt, time
247 else:
248 return None, None, None
249
250 # Find port with oldest packet
251 min_time = 0
252 min_port = -1
253 for port_number in self.port_list.keys():
254 ptime = self.port_list[port_number].timestamp_head()
255 if ptime:
256 if (min_port == -1) or (ptime < min_time):
257 min_time = ptime
258 min_port = port_number
259
260 if min_port == -1:
261 return None, None, None
262
263 pkt, time = self.port_list[min_port].dequeue()
264 return min_port, pkt, time
265
Dan Talayco1b3f6902010-02-15 14:14:19 -0800266 def kill(self, join_threads=True):
267 """
268 Close all sockets for dataplane
269 @param join_threads If True (default) call join on each thread
270 """
Dan Talayco34089522010-02-07 23:07:41 -0800271 for port_number in self.port_list.keys():
272 self.port_list[port_number].kill()
Dan Talayco1b3f6902010-02-15 14:14:19 -0800273 if join_threads:
274 self.dbg(oft_config.DEBUG_INFO, "Joining ", port_number)
275 self.port_list[port_number].join()
Dan Talayco34089522010-02-07 23:07:41 -0800276
Dan Talayco1b3f6902010-02-15 14:14:19 -0800277 self.dbg(oft_config.DEBUG_INFO, "DataPlane shutdown")
278
279 def show(self, prefix=''):
280 print prefix + "Dataplane Controller"
281 for pnum, port in self.port_list.items():
282 print prefix + "OpenFlow Port Number " + str(pnum)
283 port.show(prefix + ' ')
284