blob: bc36d562980f0a0ef9167face718bc21135899c5 [file] [log] [blame]
Sreeju Sreedhare3fefd92019-04-02 15:57:15 -07001
2# Copyright 2017-present Open Networking Foundation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16
17"""
18OpenFlow Test Framework
19
20DataPlane and DataPlanePort classes
21
22Provide the interface to the control the set of ports being used
23to stimulate the switch under test.
24
25See the class dataplaneport for more details. This class wraps
26a set of those objects allowing general calls and parsing
27configuration.
28
29@todo Add "filters" for matching packets. Actions supported
30for filters should include a callback or a counter
31"""
32
33import sys
34import os
35import socket
36import time
37import select
38import logging
39from threading import Thread
40from threading import Lock
41from threading import Condition
42import ofutils
43import netutils
44from pcap_writer import PcapWriter
45
46if "linux" in sys.platform:
47 import afpacket
48else:
49 import pcap
50
51def match_exp_pkt(self, exp_pkt, pkt):
52 """
53 Compare the string value of pkt with the string value of exp_pkt,
54 and return True iff they are identical. If the length of exp_pkt is
55 less than the minimum Ethernet frame size (60 bytes), then padding
56 bytes in pkt are ignored.
57 """
58 e = str(exp_pkt)
59 p = str(pkt)
60 if len(e) < 60:
61 p = p[:len(e)]
62
63 #return e == p
64 #some nic card have capature problem, will have more bytes capatured.
65 if pkt.find(exp_pkt) >=0:
Roman Bubyrd4b87412019-08-05 12:52:40 +030066 if self.config["dump_packet"]:
67 self.logger.debug("found pkt->"+(" ".join("{:02x}".format(ord(c)) for c in exp_pkt)))
68
Sreeju Sreedhare3fefd92019-04-02 15:57:15 -070069 return True
70 else:
71 if self.config["dump_packet"]:
Roman Bubyrd4b87412019-08-05 12:52:40 +030072 pass
73 # self.logger.debug("rx pkt ->"+(" ".join("{:02x}".format(ord(c)) for c in pkt)))
74 # self.logger.debug("expect pkt->"+(" ".join("{:02x}".format(ord(c)) for c in exp_pkt)))
Sreeju Sreedhare3fefd92019-04-02 15:57:15 -070075
76 return False
77
78
79class DataPlanePortLinux:
80 """
81 Uses raw sockets to capture and send packets on a network interface.
82 """
83
84 RCV_SIZE_DEFAULT = 4096
85 ETH_P_ALL = 0x03
86 RCV_TIMEOUT = 10000
87
88 def __init__(self, interface_name, port_number):
89 """
90 @param interface_name The name of the physical interface like eth1
91 """
92 self.interface_name = interface_name
93 self.socket = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 0)
94 afpacket.enable_auxdata(self.socket)
95 self.socket.bind((interface_name, self.ETH_P_ALL))
96 netutils.set_promisc(self.socket, interface_name)
97 self.socket.settimeout(self.RCV_TIMEOUT)
98
99 def __del__(self):
100 if self.socket:
101 self.socket.close()
102
103 def fileno(self):
104 """
105 Return an integer file descriptor that can be passed to select(2).
106 """
107 return self.socket.fileno()
108
109 def recv(self):
110 """
111 Receive a packet from this port.
112 @retval (packet data, timestamp)
113 """
114 pkt = afpacket.recv(self.socket, self.RCV_SIZE_DEFAULT)
115 return (pkt, time.time())
116
117 def send(self, packet):
118 """
119 Send a packet out this port.
120 @param packet The packet data to send to the port
121 @retval The number of bytes sent
122 """
123 return self.socket.send(packet)
124
125 def down(self):
126 """
127 Bring the physical link down.
128 """
129 #os.system("ifconfig down %s" % self.interface_name)
130 os.system("ifconfig %s down" % self.interface_name)
131
132 def up(self):
133 """
134 Bring the physical link up.
135 """
136 #os.system("ifconfig up %s" % self.interface_name)
137 os.system("ifconfig %s up" % self.interface_name)
138
Roman Bubyr8c385572019-04-25 11:45:13 +0300139 def mtu(self, mtu):
140 """
141 Change MTU value of port.
142 """
143 # os.system("ifconfig up %s" % self.interface_name)
144 os.system("ifconfig %s mtu %d" % (self.interface_name, mtu))
145
Sreeju Sreedhare3fefd92019-04-02 15:57:15 -0700146
147class DataPlanePortPcap:
148 """
149 Alternate port implementation using libpcap. This is used by non-Linux
150 operating systems.
151 """
152
153 def __init__(self, interface_name, port_number):
154 self.pcap = pcap.pcap(interface_name)
155 self.pcap.setnonblock()
156
157 def fileno(self):
158 return self.pcap.fileno()
159
160 def recv(self):
161 (timestamp, pkt) = next(self.pcap)
162 return (pkt[:], timestamp)
163
164 def send(self, packet):
165 if hasattr(self.pcap, "inject"):
166 return self.pcap.inject(packet, len(packet))
167 else:
168 return self.pcap.sendpacket(packet)
169
170 def down(self):
171 pass
172
173 def up(self):
174 pass
175
176class DataPlane(Thread):
177 """
178 This class provides methods to send and receive packets on the dataplane.
179 It uses the DataPlanePort class, or an alternative implementation of that
180 interface, to do IO on a particular port. A background thread is used to
181 read packets from the dataplane ports and enqueue them to be read by the
182 test. The kill() method must be called to shutdown this thread.
183 """
184
185 MAX_QUEUE_LEN = 100
186
187 def __init__(self, config=None):
188 Thread.__init__(self)
189
190 # dict from port number to port object
191 self.ports = {}
192
193 # dict from port number to list of (timestamp, packet)
194 self.packet_queues = {}
195
196 # cvar serves double duty as a regular top level lock and
197 # as a condition variable
198 self.cvar = Condition()
199
200 # Used to wake up the event loop from another thread
201 self.waker = ofutils.EventDescriptor()
202 self.killed = False
203
204 self.logger = logging.getLogger("dataplane")
205 self.pcap_writer = None
206
207 if config is None:
208 self.config = {}
209 else:
210 self.config = config;
211
212 ############################################################
213 #
214 # The platform/config can provide a custom DataPlanePort class
215 # here if you have a custom implementation with different
216 # behavior.
217 #
218 # Set config.dataplane.portclass = MyDataPlanePortClass
219 # where MyDataPlanePortClass has the same interface as the class
220 # DataPlanePort defined here.
221 #
222 if "dataplane" in self.config and "portclass" in self.config["dataplane"]:
223 self.dppclass = self.config["dataplane"]["portclass"]
224 elif "linux" in sys.platform:
225 self.dppclass = DataPlanePortLinux
226 else:
227 self.dppclass = DataPlanePortPcap
228
229 self.start()
230
231 def run(self):
232 """
233 Activity function for class
234 """
235 while not self.killed:
236 sockets = [self.waker] + self.ports.values()
237 try:
238 sel_in, sel_out, sel_err = select.select(sockets, [], [], 1)
239 except:
240 print sys.exc_info()
241 self.logger.error("Select error, exiting")
242 break
243
244 with self.cvar:
245 for port in sel_in:
246 if port == self.waker:
247 self.waker.wait()
248 continue
249 else:
250 # Enqueue packet
251 pkt, timestamp = port.recv()
252 port_number = port._port_number
Roman Bubyrd4b87412019-08-05 12:52:40 +0300253 # self.logger.debug("Pkt len %d in on port %d",
254 # len(pkt), port_number)
Sreeju Sreedhare3fefd92019-04-02 15:57:15 -0700255 if self.pcap_writer:
256 self.pcap_writer.write(pkt, timestamp, port_number)
257 queue = self.packet_queues[port_number]
258 if len(queue) >= self.MAX_QUEUE_LEN:
259 # Queue full, throw away oldest
260 queue.pop(0)
Roman Bubyrd4b87412019-08-05 12:52:40 +0300261 # self.logger.debug("Discarding oldest packet to make room")
Sreeju Sreedhare3fefd92019-04-02 15:57:15 -0700262 queue.append((pkt, timestamp))
263 self.cvar.notify_all()
264
265 self.logger.info("Thread exit")
266
267 def port_add(self, interface_name, port_number):
268 """
269 Add a port to the dataplane
270 @param interface_name The name of the physical interface like eth1
271 @param port_number The port number used to refer to the port
272 Stashes the port number on the created port object.
273 """
274 self.ports[port_number] = self.dppclass(interface_name, port_number)
275 self.ports[port_number]._port_number = port_number
276 self.packet_queues[port_number] = []
277 # Need to wake up event loop to change the sockets being selected on.
278 self.waker.notify()
279
280 def send(self, port_number, packet):
281 """
282 Send a packet to the given port
283 @param port_number The port to send the data to
284 @param packet Raw packet data to send to port
285 """
286 self.logger.debug("Sending %d bytes to port %d" %
287 (len(packet), port_number))
288 if self.pcap_writer:
289 self.pcap_writer.write(packet, time.time(), port_number)
290 bytes = self.ports[port_number].send(packet)
291 if bytes != len(packet):
292 self.logger.error("Unhandled send error, length mismatch %d != %d" %
293 (bytes, len(packet)))
294 return bytes
295
296 def oldest_port_number(self):
297 """
298 Returns the port number with the oldest packet, or
299 None if no packets are queued.
300 """
301 min_port_number = None
302 min_time = float('inf')
303 for (port_number, queue) in self.packet_queues.items():
304 if queue and queue[0][1] < min_time:
305 min_time = queue[0][1]
306 min_port_number = port_number
307 return min_port_number
308
309 # Dequeues and yields packets in the order they were received.
310 # Yields (port number, packet, received time).
311 # If port_number is not specified yields packets from all ports.
312 def packets(self, port_number=None):
313 while True:
314 rcv_port_number = port_number or self.oldest_port_number()
315
316 if rcv_port_number == None:
317 self.logger.debug("Out of packets on all ports")
318 break
319
320 queue = self.packet_queues[rcv_port_number]
321
322 if len(queue) == 0:
323 self.logger.debug("Out of packets on port %d", rcv_port_number)
324 break
325
326 pkt, time = queue.pop(0)
327 yield (rcv_port_number, pkt, time)
328
329 def poll(self, port_number=None, timeout=-1, exp_pkt=None):
330 """
331 Poll one or all dataplane ports for a packet
332
333 If port_number is given, get the oldest packet from that port.
334 Otherwise, find the port with the oldest packet and return
335 that packet.
336
337 If exp_pkt is true, discard all packets until that one is found
338
339 @param port_number If set, get packet from this port
340 @param timeout If positive and no packet is available, block
341 until a packet is received or for this many seconds
342 @param exp_pkt If not None, look for this packet and ignore any
343 others received. Note that if port_number is None, all packets
344 from all ports will be discarded until the exp_pkt is found
345 @return The triple port_number, packet, pkt_time where packet
346 is received from port_number at time pkt_time. If a timeout
347 occurs, return None, None, None
348 """
349
350 if exp_pkt and not port_number:
351 self.logger.warn("Dataplane poll with exp_pkt but no port number")
352
353 # Retrieve the packet. Returns (port number, packet, time).
354 def grab():
355 self.logger.debug("Grabbing packet")
356 for (rcv_port_number, pkt, time) in self.packets(port_number):
357 self.logger.debug("Checking packet from port %d", rcv_port_number)
358 if not exp_pkt or match_exp_pkt(self, exp_pkt, pkt):
359 return (rcv_port_number, pkt, time)
360 self.logger.debug("Did not find packet")
361 return None
362
363 with self.cvar:
364 ret = ofutils.timed_wait(self.cvar, grab, timeout=timeout)
365
366 if ret != None:
367 return ret
368 else:
369 self.logger.debug("Poll time out, no packet from " + str(port_number))
370 return (None, None, None)
371
372 def kill(self):
373 """
374 Stop the dataplane thread.
375 """
376 self.killed = True
377 self.waker.notify()
378 self.join()
379 # Explicitly release ports to ensure we don't run out of sockets
380 # even if someone keeps holding a reference to the dataplane.
381 del self.ports
382
383 def port_down(self, port_number):
384 """Brings the specified port down"""
385 self.ports[port_number].down()
386
387 def port_up(self, port_number):
388 """Brings the specified port up"""
389 self.ports[port_number].up()
390
391 def flush(self):
392 """
393 Drop any queued packets.
394 """
395 for port_number in self.packet_queues.keys():
396 self.packet_queues[port_number] = []
397
398 def start_pcap(self, filename):
399 assert(self.pcap_writer == None)
400 self.pcap_writer = PcapWriter(filename)
401
402 def stop_pcap(self):
403 if self.pcap_writer:
404 self.pcap_writer.close()
405 self.pcap_writer = None
Roman Bubyr8c385572019-04-25 11:45:13 +0300406
407 def port_mtu(self, port_number, mtu):
408 """Change MTU on controller's port"""
409 self.ports[port_number].mtu(mtu)