blob: 839fe9ec37de1048b29c59f19fe9158d9f5f76f6 [file] [log] [blame]
Sreeju Sreedhare3fefd92019-04-02 15:57:15 -07001
2# Copyright 2017-present Open Networking Foundation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16
17"""
18OpenFlow Test Framework
19
20DataPlane and DataPlanePort classes
21
22Provide the interface to the control the set of ports being used
23to stimulate the switch under test.
24
25See the class dataplaneport for more details. This class wraps
26a set of those objects allowing general calls and parsing
27configuration.
28
29@todo Add "filters" for matching packets. Actions supported
30for filters should include a callback or a counter
31"""
32
33import sys
34import os
35import socket
36import time
37import select
38import logging
39from threading import Thread
40from threading import Lock
41from threading import Condition
42import ofutils
43import netutils
44from pcap_writer import PcapWriter
45
46if "linux" in sys.platform:
47 import afpacket
48else:
49 import pcap
50
51def match_exp_pkt(self, exp_pkt, pkt):
52 """
53 Compare the string value of pkt with the string value of exp_pkt,
54 and return True iff they are identical. If the length of exp_pkt is
55 less than the minimum Ethernet frame size (60 bytes), then padding
56 bytes in pkt are ignored.
57 """
58 e = str(exp_pkt)
59 p = str(pkt)
60 if len(e) < 60:
61 p = p[:len(e)]
62
63 #return e == p
64 #some nic card have capature problem, will have more bytes capatured.
65 if pkt.find(exp_pkt) >=0:
66 return True
67 else:
68 if self.config["dump_packet"]:
69 self.logger.debug("rx pkt ->"+(" ".join("{:02x}".format(ord(c)) for c in pkt)))
70 self.logger.debug("expect pkt->"+(" ".join("{:02x}".format(ord(c)) for c in exp_pkt)))
71
72 return False
73
74
75class DataPlanePortLinux:
76 """
77 Uses raw sockets to capture and send packets on a network interface.
78 """
79
80 RCV_SIZE_DEFAULT = 4096
81 ETH_P_ALL = 0x03
82 RCV_TIMEOUT = 10000
83
84 def __init__(self, interface_name, port_number):
85 """
86 @param interface_name The name of the physical interface like eth1
87 """
88 self.interface_name = interface_name
89 self.socket = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 0)
90 afpacket.enable_auxdata(self.socket)
91 self.socket.bind((interface_name, self.ETH_P_ALL))
92 netutils.set_promisc(self.socket, interface_name)
93 self.socket.settimeout(self.RCV_TIMEOUT)
94
95 def __del__(self):
96 if self.socket:
97 self.socket.close()
98
99 def fileno(self):
100 """
101 Return an integer file descriptor that can be passed to select(2).
102 """
103 return self.socket.fileno()
104
105 def recv(self):
106 """
107 Receive a packet from this port.
108 @retval (packet data, timestamp)
109 """
110 pkt = afpacket.recv(self.socket, self.RCV_SIZE_DEFAULT)
111 return (pkt, time.time())
112
113 def send(self, packet):
114 """
115 Send a packet out this port.
116 @param packet The packet data to send to the port
117 @retval The number of bytes sent
118 """
119 return self.socket.send(packet)
120
121 def down(self):
122 """
123 Bring the physical link down.
124 """
125 #os.system("ifconfig down %s" % self.interface_name)
126 os.system("ifconfig %s down" % self.interface_name)
127
128 def up(self):
129 """
130 Bring the physical link up.
131 """
132 #os.system("ifconfig up %s" % self.interface_name)
133 os.system("ifconfig %s up" % self.interface_name)
134
Roman Bubyr8c385572019-04-25 11:45:13 +0300135 def mtu(self, mtu):
136 """
137 Change MTU value of port.
138 """
139 # os.system("ifconfig up %s" % self.interface_name)
140 os.system("ifconfig %s mtu %d" % (self.interface_name, mtu))
141
Sreeju Sreedhare3fefd92019-04-02 15:57:15 -0700142
143class DataPlanePortPcap:
144 """
145 Alternate port implementation using libpcap. This is used by non-Linux
146 operating systems.
147 """
148
149 def __init__(self, interface_name, port_number):
150 self.pcap = pcap.pcap(interface_name)
151 self.pcap.setnonblock()
152
153 def fileno(self):
154 return self.pcap.fileno()
155
156 def recv(self):
157 (timestamp, pkt) = next(self.pcap)
158 return (pkt[:], timestamp)
159
160 def send(self, packet):
161 if hasattr(self.pcap, "inject"):
162 return self.pcap.inject(packet, len(packet))
163 else:
164 return self.pcap.sendpacket(packet)
165
166 def down(self):
167 pass
168
169 def up(self):
170 pass
171
172class DataPlane(Thread):
173 """
174 This class provides methods to send and receive packets on the dataplane.
175 It uses the DataPlanePort class, or an alternative implementation of that
176 interface, to do IO on a particular port. A background thread is used to
177 read packets from the dataplane ports and enqueue them to be read by the
178 test. The kill() method must be called to shutdown this thread.
179 """
180
181 MAX_QUEUE_LEN = 100
182
183 def __init__(self, config=None):
184 Thread.__init__(self)
185
186 # dict from port number to port object
187 self.ports = {}
188
189 # dict from port number to list of (timestamp, packet)
190 self.packet_queues = {}
191
192 # cvar serves double duty as a regular top level lock and
193 # as a condition variable
194 self.cvar = Condition()
195
196 # Used to wake up the event loop from another thread
197 self.waker = ofutils.EventDescriptor()
198 self.killed = False
199
200 self.logger = logging.getLogger("dataplane")
201 self.pcap_writer = None
202
203 if config is None:
204 self.config = {}
205 else:
206 self.config = config;
207
208 ############################################################
209 #
210 # The platform/config can provide a custom DataPlanePort class
211 # here if you have a custom implementation with different
212 # behavior.
213 #
214 # Set config.dataplane.portclass = MyDataPlanePortClass
215 # where MyDataPlanePortClass has the same interface as the class
216 # DataPlanePort defined here.
217 #
218 if "dataplane" in self.config and "portclass" in self.config["dataplane"]:
219 self.dppclass = self.config["dataplane"]["portclass"]
220 elif "linux" in sys.platform:
221 self.dppclass = DataPlanePortLinux
222 else:
223 self.dppclass = DataPlanePortPcap
224
225 self.start()
226
227 def run(self):
228 """
229 Activity function for class
230 """
231 while not self.killed:
232 sockets = [self.waker] + self.ports.values()
233 try:
234 sel_in, sel_out, sel_err = select.select(sockets, [], [], 1)
235 except:
236 print sys.exc_info()
237 self.logger.error("Select error, exiting")
238 break
239
240 with self.cvar:
241 for port in sel_in:
242 if port == self.waker:
243 self.waker.wait()
244 continue
245 else:
246 # Enqueue packet
247 pkt, timestamp = port.recv()
248 port_number = port._port_number
249 self.logger.debug("Pkt len %d in on port %d",
250 len(pkt), port_number)
251 if self.pcap_writer:
252 self.pcap_writer.write(pkt, timestamp, port_number)
253 queue = self.packet_queues[port_number]
254 if len(queue) >= self.MAX_QUEUE_LEN:
255 # Queue full, throw away oldest
256 queue.pop(0)
257 self.logger.debug("Discarding oldest packet to make room")
258 queue.append((pkt, timestamp))
259 self.cvar.notify_all()
260
261 self.logger.info("Thread exit")
262
263 def port_add(self, interface_name, port_number):
264 """
265 Add a port to the dataplane
266 @param interface_name The name of the physical interface like eth1
267 @param port_number The port number used to refer to the port
268 Stashes the port number on the created port object.
269 """
270 self.ports[port_number] = self.dppclass(interface_name, port_number)
271 self.ports[port_number]._port_number = port_number
272 self.packet_queues[port_number] = []
273 # Need to wake up event loop to change the sockets being selected on.
274 self.waker.notify()
275
276 def send(self, port_number, packet):
277 """
278 Send a packet to the given port
279 @param port_number The port to send the data to
280 @param packet Raw packet data to send to port
281 """
282 self.logger.debug("Sending %d bytes to port %d" %
283 (len(packet), port_number))
284 if self.pcap_writer:
285 self.pcap_writer.write(packet, time.time(), port_number)
286 bytes = self.ports[port_number].send(packet)
287 if bytes != len(packet):
288 self.logger.error("Unhandled send error, length mismatch %d != %d" %
289 (bytes, len(packet)))
290 return bytes
291
292 def oldest_port_number(self):
293 """
294 Returns the port number with the oldest packet, or
295 None if no packets are queued.
296 """
297 min_port_number = None
298 min_time = float('inf')
299 for (port_number, queue) in self.packet_queues.items():
300 if queue and queue[0][1] < min_time:
301 min_time = queue[0][1]
302 min_port_number = port_number
303 return min_port_number
304
305 # Dequeues and yields packets in the order they were received.
306 # Yields (port number, packet, received time).
307 # If port_number is not specified yields packets from all ports.
308 def packets(self, port_number=None):
309 while True:
310 rcv_port_number = port_number or self.oldest_port_number()
311
312 if rcv_port_number == None:
313 self.logger.debug("Out of packets on all ports")
314 break
315
316 queue = self.packet_queues[rcv_port_number]
317
318 if len(queue) == 0:
319 self.logger.debug("Out of packets on port %d", rcv_port_number)
320 break
321
322 pkt, time = queue.pop(0)
323 yield (rcv_port_number, pkt, time)
324
325 def poll(self, port_number=None, timeout=-1, exp_pkt=None):
326 """
327 Poll one or all dataplane ports for a packet
328
329 If port_number is given, get the oldest packet from that port.
330 Otherwise, find the port with the oldest packet and return
331 that packet.
332
333 If exp_pkt is true, discard all packets until that one is found
334
335 @param port_number If set, get packet from this port
336 @param timeout If positive and no packet is available, block
337 until a packet is received or for this many seconds
338 @param exp_pkt If not None, look for this packet and ignore any
339 others received. Note that if port_number is None, all packets
340 from all ports will be discarded until the exp_pkt is found
341 @return The triple port_number, packet, pkt_time where packet
342 is received from port_number at time pkt_time. If a timeout
343 occurs, return None, None, None
344 """
345
346 if exp_pkt and not port_number:
347 self.logger.warn("Dataplane poll with exp_pkt but no port number")
348
349 # Retrieve the packet. Returns (port number, packet, time).
350 def grab():
351 self.logger.debug("Grabbing packet")
352 for (rcv_port_number, pkt, time) in self.packets(port_number):
353 self.logger.debug("Checking packet from port %d", rcv_port_number)
354 if not exp_pkt or match_exp_pkt(self, exp_pkt, pkt):
355 return (rcv_port_number, pkt, time)
356 self.logger.debug("Did not find packet")
357 return None
358
359 with self.cvar:
360 ret = ofutils.timed_wait(self.cvar, grab, timeout=timeout)
361
362 if ret != None:
363 return ret
364 else:
365 self.logger.debug("Poll time out, no packet from " + str(port_number))
366 return (None, None, None)
367
368 def kill(self):
369 """
370 Stop the dataplane thread.
371 """
372 self.killed = True
373 self.waker.notify()
374 self.join()
375 # Explicitly release ports to ensure we don't run out of sockets
376 # even if someone keeps holding a reference to the dataplane.
377 del self.ports
378
379 def port_down(self, port_number):
380 """Brings the specified port down"""
381 self.ports[port_number].down()
382
383 def port_up(self, port_number):
384 """Brings the specified port up"""
385 self.ports[port_number].up()
386
387 def flush(self):
388 """
389 Drop any queued packets.
390 """
391 for port_number in self.packet_queues.keys():
392 self.packet_queues[port_number] = []
393
394 def start_pcap(self, filename):
395 assert(self.pcap_writer == None)
396 self.pcap_writer = PcapWriter(filename)
397
398 def stop_pcap(self):
399 if self.pcap_writer:
400 self.pcap_writer.close()
401 self.pcap_writer = None
Roman Bubyr8c385572019-04-25 11:45:13 +0300402
403 def port_mtu(self, port_number, mtu):
404 """Change MTU on controller's port"""
405 self.ports[port_number].mtu(mtu)