blob: df79d4765dea6ea2bbc8b3e9f349b66c6387cbe9 [file] [log] [blame]
Sreeju Sreedhare3fefd92019-04-02 15:57:15 -07001
2# Copyright 2017-present Open Networking Foundation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16
17"""
18OpenFlow Test Framework
19
20DataPlane and DataPlanePort classes
21
22Provide the interface to the control the set of ports being used
23to stimulate the switch under test.
24
25See the class dataplaneport for more details. This class wraps
26a set of those objects allowing general calls and parsing
27configuration.
28
29@todo Add "filters" for matching packets. Actions supported
30for filters should include a callback or a counter
31"""
32
33import sys
34import os
35import socket
36import time
37import select
38import logging
39from threading import Thread
40from threading import Lock
41from threading import Condition
42import ofutils
43import netutils
44from pcap_writer import PcapWriter
45
46if "linux" in sys.platform:
47 import afpacket
48else:
49 import pcap
50
51def match_exp_pkt(self, exp_pkt, pkt):
52 """
53 Compare the string value of pkt with the string value of exp_pkt,
54 and return True iff they are identical. If the length of exp_pkt is
55 less than the minimum Ethernet frame size (60 bytes), then padding
56 bytes in pkt are ignored.
57 """
58 e = str(exp_pkt)
59 p = str(pkt)
60 if len(e) < 60:
61 p = p[:len(e)]
62
63 #return e == p
64 #some nic card have capature problem, will have more bytes capatured.
65 if pkt.find(exp_pkt) >=0:
66 return True
67 else:
68 if self.config["dump_packet"]:
69 self.logger.debug("rx pkt ->"+(" ".join("{:02x}".format(ord(c)) for c in pkt)))
70 self.logger.debug("expect pkt->"+(" ".join("{:02x}".format(ord(c)) for c in exp_pkt)))
71
72 return False
73
74
75class DataPlanePortLinux:
76 """
77 Uses raw sockets to capture and send packets on a network interface.
78 """
79
80 RCV_SIZE_DEFAULT = 4096
81 ETH_P_ALL = 0x03
82 RCV_TIMEOUT = 10000
83
84 def __init__(self, interface_name, port_number):
85 """
86 @param interface_name The name of the physical interface like eth1
87 """
88 self.interface_name = interface_name
89 self.socket = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 0)
90 afpacket.enable_auxdata(self.socket)
91 self.socket.bind((interface_name, self.ETH_P_ALL))
92 netutils.set_promisc(self.socket, interface_name)
93 self.socket.settimeout(self.RCV_TIMEOUT)
94
95 def __del__(self):
96 if self.socket:
97 self.socket.close()
98
99 def fileno(self):
100 """
101 Return an integer file descriptor that can be passed to select(2).
102 """
103 return self.socket.fileno()
104
105 def recv(self):
106 """
107 Receive a packet from this port.
108 @retval (packet data, timestamp)
109 """
110 pkt = afpacket.recv(self.socket, self.RCV_SIZE_DEFAULT)
111 return (pkt, time.time())
112
113 def send(self, packet):
114 """
115 Send a packet out this port.
116 @param packet The packet data to send to the port
117 @retval The number of bytes sent
118 """
119 return self.socket.send(packet)
120
121 def down(self):
122 """
123 Bring the physical link down.
124 """
125 #os.system("ifconfig down %s" % self.interface_name)
126 os.system("ifconfig %s down" % self.interface_name)
127
128 def up(self):
129 """
130 Bring the physical link up.
131 """
132 #os.system("ifconfig up %s" % self.interface_name)
133 os.system("ifconfig %s up" % self.interface_name)
134
135
136class DataPlanePortPcap:
137 """
138 Alternate port implementation using libpcap. This is used by non-Linux
139 operating systems.
140 """
141
142 def __init__(self, interface_name, port_number):
143 self.pcap = pcap.pcap(interface_name)
144 self.pcap.setnonblock()
145
146 def fileno(self):
147 return self.pcap.fileno()
148
149 def recv(self):
150 (timestamp, pkt) = next(self.pcap)
151 return (pkt[:], timestamp)
152
153 def send(self, packet):
154 if hasattr(self.pcap, "inject"):
155 return self.pcap.inject(packet, len(packet))
156 else:
157 return self.pcap.sendpacket(packet)
158
159 def down(self):
160 pass
161
162 def up(self):
163 pass
164
165class DataPlane(Thread):
166 """
167 This class provides methods to send and receive packets on the dataplane.
168 It uses the DataPlanePort class, or an alternative implementation of that
169 interface, to do IO on a particular port. A background thread is used to
170 read packets from the dataplane ports and enqueue them to be read by the
171 test. The kill() method must be called to shutdown this thread.
172 """
173
174 MAX_QUEUE_LEN = 100
175
176 def __init__(self, config=None):
177 Thread.__init__(self)
178
179 # dict from port number to port object
180 self.ports = {}
181
182 # dict from port number to list of (timestamp, packet)
183 self.packet_queues = {}
184
185 # cvar serves double duty as a regular top level lock and
186 # as a condition variable
187 self.cvar = Condition()
188
189 # Used to wake up the event loop from another thread
190 self.waker = ofutils.EventDescriptor()
191 self.killed = False
192
193 self.logger = logging.getLogger("dataplane")
194 self.pcap_writer = None
195
196 if config is None:
197 self.config = {}
198 else:
199 self.config = config;
200
201 ############################################################
202 #
203 # The platform/config can provide a custom DataPlanePort class
204 # here if you have a custom implementation with different
205 # behavior.
206 #
207 # Set config.dataplane.portclass = MyDataPlanePortClass
208 # where MyDataPlanePortClass has the same interface as the class
209 # DataPlanePort defined here.
210 #
211 if "dataplane" in self.config and "portclass" in self.config["dataplane"]:
212 self.dppclass = self.config["dataplane"]["portclass"]
213 elif "linux" in sys.platform:
214 self.dppclass = DataPlanePortLinux
215 else:
216 self.dppclass = DataPlanePortPcap
217
218 self.start()
219
220 def run(self):
221 """
222 Activity function for class
223 """
224 while not self.killed:
225 sockets = [self.waker] + self.ports.values()
226 try:
227 sel_in, sel_out, sel_err = select.select(sockets, [], [], 1)
228 except:
229 print sys.exc_info()
230 self.logger.error("Select error, exiting")
231 break
232
233 with self.cvar:
234 for port in sel_in:
235 if port == self.waker:
236 self.waker.wait()
237 continue
238 else:
239 # Enqueue packet
240 pkt, timestamp = port.recv()
241 port_number = port._port_number
242 self.logger.debug("Pkt len %d in on port %d",
243 len(pkt), port_number)
244 if self.pcap_writer:
245 self.pcap_writer.write(pkt, timestamp, port_number)
246 queue = self.packet_queues[port_number]
247 if len(queue) >= self.MAX_QUEUE_LEN:
248 # Queue full, throw away oldest
249 queue.pop(0)
250 self.logger.debug("Discarding oldest packet to make room")
251 queue.append((pkt, timestamp))
252 self.cvar.notify_all()
253
254 self.logger.info("Thread exit")
255
256 def port_add(self, interface_name, port_number):
257 """
258 Add a port to the dataplane
259 @param interface_name The name of the physical interface like eth1
260 @param port_number The port number used to refer to the port
261 Stashes the port number on the created port object.
262 """
263 self.ports[port_number] = self.dppclass(interface_name, port_number)
264 self.ports[port_number]._port_number = port_number
265 self.packet_queues[port_number] = []
266 # Need to wake up event loop to change the sockets being selected on.
267 self.waker.notify()
268
269 def send(self, port_number, packet):
270 """
271 Send a packet to the given port
272 @param port_number The port to send the data to
273 @param packet Raw packet data to send to port
274 """
275 self.logger.debug("Sending %d bytes to port %d" %
276 (len(packet), port_number))
277 if self.pcap_writer:
278 self.pcap_writer.write(packet, time.time(), port_number)
279 bytes = self.ports[port_number].send(packet)
280 if bytes != len(packet):
281 self.logger.error("Unhandled send error, length mismatch %d != %d" %
282 (bytes, len(packet)))
283 return bytes
284
285 def oldest_port_number(self):
286 """
287 Returns the port number with the oldest packet, or
288 None if no packets are queued.
289 """
290 min_port_number = None
291 min_time = float('inf')
292 for (port_number, queue) in self.packet_queues.items():
293 if queue and queue[0][1] < min_time:
294 min_time = queue[0][1]
295 min_port_number = port_number
296 return min_port_number
297
298 # Dequeues and yields packets in the order they were received.
299 # Yields (port number, packet, received time).
300 # If port_number is not specified yields packets from all ports.
301 def packets(self, port_number=None):
302 while True:
303 rcv_port_number = port_number or self.oldest_port_number()
304
305 if rcv_port_number == None:
306 self.logger.debug("Out of packets on all ports")
307 break
308
309 queue = self.packet_queues[rcv_port_number]
310
311 if len(queue) == 0:
312 self.logger.debug("Out of packets on port %d", rcv_port_number)
313 break
314
315 pkt, time = queue.pop(0)
316 yield (rcv_port_number, pkt, time)
317
318 def poll(self, port_number=None, timeout=-1, exp_pkt=None):
319 """
320 Poll one or all dataplane ports for a packet
321
322 If port_number is given, get the oldest packet from that port.
323 Otherwise, find the port with the oldest packet and return
324 that packet.
325
326 If exp_pkt is true, discard all packets until that one is found
327
328 @param port_number If set, get packet from this port
329 @param timeout If positive and no packet is available, block
330 until a packet is received or for this many seconds
331 @param exp_pkt If not None, look for this packet and ignore any
332 others received. Note that if port_number is None, all packets
333 from all ports will be discarded until the exp_pkt is found
334 @return The triple port_number, packet, pkt_time where packet
335 is received from port_number at time pkt_time. If a timeout
336 occurs, return None, None, None
337 """
338
339 if exp_pkt and not port_number:
340 self.logger.warn("Dataplane poll with exp_pkt but no port number")
341
342 # Retrieve the packet. Returns (port number, packet, time).
343 def grab():
344 self.logger.debug("Grabbing packet")
345 for (rcv_port_number, pkt, time) in self.packets(port_number):
346 self.logger.debug("Checking packet from port %d", rcv_port_number)
347 if not exp_pkt or match_exp_pkt(self, exp_pkt, pkt):
348 return (rcv_port_number, pkt, time)
349 self.logger.debug("Did not find packet")
350 return None
351
352 with self.cvar:
353 ret = ofutils.timed_wait(self.cvar, grab, timeout=timeout)
354
355 if ret != None:
356 return ret
357 else:
358 self.logger.debug("Poll time out, no packet from " + str(port_number))
359 return (None, None, None)
360
361 def kill(self):
362 """
363 Stop the dataplane thread.
364 """
365 self.killed = True
366 self.waker.notify()
367 self.join()
368 # Explicitly release ports to ensure we don't run out of sockets
369 # even if someone keeps holding a reference to the dataplane.
370 del self.ports
371
372 def port_down(self, port_number):
373 """Brings the specified port down"""
374 self.ports[port_number].down()
375
376 def port_up(self, port_number):
377 """Brings the specified port up"""
378 self.ports[port_number].up()
379
380 def flush(self):
381 """
382 Drop any queued packets.
383 """
384 for port_number in self.packet_queues.keys():
385 self.packet_queues[port_number] = []
386
387 def start_pcap(self, filename):
388 assert(self.pcap_writer == None)
389 self.pcap_writer = PcapWriter(filename)
390
391 def stop_pcap(self):
392 if self.pcap_writer:
393 self.pcap_writer.close()
394 self.pcap_writer = None