Sreeju Sreedhar | e3fefd9 | 2019-04-02 15:57:15 -0700 | [diff] [blame] | 1 | |
| 2 | # Copyright 2017-present Open Networking Foundation |
| 3 | # |
| 4 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | # you may not use this file except in compliance with the License. |
| 6 | # You may obtain a copy of the License at |
| 7 | # |
| 8 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | # |
| 10 | # Unless required by applicable law or agreed to in writing, software |
| 11 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | # See the License for the specific language governing permissions and |
| 14 | # limitations under the License. |
| 15 | |
| 16 | |
| 17 | """ |
| 18 | OpenFlow Test Framework |
| 19 | |
| 20 | DataPlane and DataPlanePort classes |
| 21 | |
| 22 | Provide the interface to the control the set of ports being used |
| 23 | to stimulate the switch under test. |
| 24 | |
| 25 | See the class dataplaneport for more details. This class wraps |
| 26 | a set of those objects allowing general calls and parsing |
| 27 | configuration. |
| 28 | |
| 29 | @todo Add "filters" for matching packets. Actions supported |
| 30 | for filters should include a callback or a counter |
| 31 | """ |
| 32 | |
| 33 | import sys |
| 34 | import os |
| 35 | import socket |
| 36 | import time |
| 37 | import select |
| 38 | import logging |
| 39 | from threading import Thread |
| 40 | from threading import Lock |
| 41 | from threading import Condition |
| 42 | import ofutils |
| 43 | import netutils |
| 44 | from pcap_writer import PcapWriter |
| 45 | |
| 46 | if "linux" in sys.platform: |
| 47 | import afpacket |
| 48 | else: |
| 49 | import pcap |
| 50 | |
| 51 | def match_exp_pkt(self, exp_pkt, pkt): |
| 52 | """ |
| 53 | Compare the string value of pkt with the string value of exp_pkt, |
| 54 | and return True iff they are identical. If the length of exp_pkt is |
| 55 | less than the minimum Ethernet frame size (60 bytes), then padding |
| 56 | bytes in pkt are ignored. |
| 57 | """ |
| 58 | e = str(exp_pkt) |
| 59 | p = str(pkt) |
| 60 | if len(e) < 60: |
| 61 | p = p[:len(e)] |
| 62 | |
| 63 | #return e == p |
| 64 | #some nic card have capature problem, will have more bytes capatured. |
| 65 | if pkt.find(exp_pkt) >=0: |
Roman Bubyr | d4b8741 | 2019-08-05 12:52:40 +0300 | [diff] [blame^] | 66 | if self.config["dump_packet"]: |
| 67 | self.logger.debug("found pkt->"+(" ".join("{:02x}".format(ord(c)) for c in exp_pkt))) |
| 68 | |
Sreeju Sreedhar | e3fefd9 | 2019-04-02 15:57:15 -0700 | [diff] [blame] | 69 | return True |
| 70 | else: |
| 71 | if self.config["dump_packet"]: |
Roman Bubyr | d4b8741 | 2019-08-05 12:52:40 +0300 | [diff] [blame^] | 72 | pass |
| 73 | # self.logger.debug("rx pkt ->"+(" ".join("{:02x}".format(ord(c)) for c in pkt))) |
| 74 | # self.logger.debug("expect pkt->"+(" ".join("{:02x}".format(ord(c)) for c in exp_pkt))) |
Sreeju Sreedhar | e3fefd9 | 2019-04-02 15:57:15 -0700 | [diff] [blame] | 75 | |
| 76 | return False |
| 77 | |
| 78 | |
| 79 | class DataPlanePortLinux: |
| 80 | """ |
| 81 | Uses raw sockets to capture and send packets on a network interface. |
| 82 | """ |
| 83 | |
| 84 | RCV_SIZE_DEFAULT = 4096 |
| 85 | ETH_P_ALL = 0x03 |
| 86 | RCV_TIMEOUT = 10000 |
| 87 | |
| 88 | def __init__(self, interface_name, port_number): |
| 89 | """ |
| 90 | @param interface_name The name of the physical interface like eth1 |
| 91 | """ |
| 92 | self.interface_name = interface_name |
| 93 | self.socket = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 0) |
| 94 | afpacket.enable_auxdata(self.socket) |
| 95 | self.socket.bind((interface_name, self.ETH_P_ALL)) |
| 96 | netutils.set_promisc(self.socket, interface_name) |
| 97 | self.socket.settimeout(self.RCV_TIMEOUT) |
| 98 | |
| 99 | def __del__(self): |
| 100 | if self.socket: |
| 101 | self.socket.close() |
| 102 | |
| 103 | def fileno(self): |
| 104 | """ |
| 105 | Return an integer file descriptor that can be passed to select(2). |
| 106 | """ |
| 107 | return self.socket.fileno() |
| 108 | |
| 109 | def recv(self): |
| 110 | """ |
| 111 | Receive a packet from this port. |
| 112 | @retval (packet data, timestamp) |
| 113 | """ |
| 114 | pkt = afpacket.recv(self.socket, self.RCV_SIZE_DEFAULT) |
| 115 | return (pkt, time.time()) |
| 116 | |
| 117 | def send(self, packet): |
| 118 | """ |
| 119 | Send a packet out this port. |
| 120 | @param packet The packet data to send to the port |
| 121 | @retval The number of bytes sent |
| 122 | """ |
| 123 | return self.socket.send(packet) |
| 124 | |
| 125 | def down(self): |
| 126 | """ |
| 127 | Bring the physical link down. |
| 128 | """ |
| 129 | #os.system("ifconfig down %s" % self.interface_name) |
| 130 | os.system("ifconfig %s down" % self.interface_name) |
| 131 | |
| 132 | def up(self): |
| 133 | """ |
| 134 | Bring the physical link up. |
| 135 | """ |
| 136 | #os.system("ifconfig up %s" % self.interface_name) |
| 137 | os.system("ifconfig %s up" % self.interface_name) |
| 138 | |
Roman Bubyr | 8c38557 | 2019-04-25 11:45:13 +0300 | [diff] [blame] | 139 | def mtu(self, mtu): |
| 140 | """ |
| 141 | Change MTU value of port. |
| 142 | """ |
| 143 | # os.system("ifconfig up %s" % self.interface_name) |
| 144 | os.system("ifconfig %s mtu %d" % (self.interface_name, mtu)) |
| 145 | |
Sreeju Sreedhar | e3fefd9 | 2019-04-02 15:57:15 -0700 | [diff] [blame] | 146 | |
| 147 | class DataPlanePortPcap: |
| 148 | """ |
| 149 | Alternate port implementation using libpcap. This is used by non-Linux |
| 150 | operating systems. |
| 151 | """ |
| 152 | |
| 153 | def __init__(self, interface_name, port_number): |
| 154 | self.pcap = pcap.pcap(interface_name) |
| 155 | self.pcap.setnonblock() |
| 156 | |
| 157 | def fileno(self): |
| 158 | return self.pcap.fileno() |
| 159 | |
| 160 | def recv(self): |
| 161 | (timestamp, pkt) = next(self.pcap) |
| 162 | return (pkt[:], timestamp) |
| 163 | |
| 164 | def send(self, packet): |
| 165 | if hasattr(self.pcap, "inject"): |
| 166 | return self.pcap.inject(packet, len(packet)) |
| 167 | else: |
| 168 | return self.pcap.sendpacket(packet) |
| 169 | |
| 170 | def down(self): |
| 171 | pass |
| 172 | |
| 173 | def up(self): |
| 174 | pass |
| 175 | |
| 176 | class DataPlane(Thread): |
| 177 | """ |
| 178 | This class provides methods to send and receive packets on the dataplane. |
| 179 | It uses the DataPlanePort class, or an alternative implementation of that |
| 180 | interface, to do IO on a particular port. A background thread is used to |
| 181 | read packets from the dataplane ports and enqueue them to be read by the |
| 182 | test. The kill() method must be called to shutdown this thread. |
| 183 | """ |
| 184 | |
| 185 | MAX_QUEUE_LEN = 100 |
| 186 | |
| 187 | def __init__(self, config=None): |
| 188 | Thread.__init__(self) |
| 189 | |
| 190 | # dict from port number to port object |
| 191 | self.ports = {} |
| 192 | |
| 193 | # dict from port number to list of (timestamp, packet) |
| 194 | self.packet_queues = {} |
| 195 | |
| 196 | # cvar serves double duty as a regular top level lock and |
| 197 | # as a condition variable |
| 198 | self.cvar = Condition() |
| 199 | |
| 200 | # Used to wake up the event loop from another thread |
| 201 | self.waker = ofutils.EventDescriptor() |
| 202 | self.killed = False |
| 203 | |
| 204 | self.logger = logging.getLogger("dataplane") |
| 205 | self.pcap_writer = None |
| 206 | |
| 207 | if config is None: |
| 208 | self.config = {} |
| 209 | else: |
| 210 | self.config = config; |
| 211 | |
| 212 | ############################################################ |
| 213 | # |
| 214 | # The platform/config can provide a custom DataPlanePort class |
| 215 | # here if you have a custom implementation with different |
| 216 | # behavior. |
| 217 | # |
| 218 | # Set config.dataplane.portclass = MyDataPlanePortClass |
| 219 | # where MyDataPlanePortClass has the same interface as the class |
| 220 | # DataPlanePort defined here. |
| 221 | # |
| 222 | if "dataplane" in self.config and "portclass" in self.config["dataplane"]: |
| 223 | self.dppclass = self.config["dataplane"]["portclass"] |
| 224 | elif "linux" in sys.platform: |
| 225 | self.dppclass = DataPlanePortLinux |
| 226 | else: |
| 227 | self.dppclass = DataPlanePortPcap |
| 228 | |
| 229 | self.start() |
| 230 | |
| 231 | def run(self): |
| 232 | """ |
| 233 | Activity function for class |
| 234 | """ |
| 235 | while not self.killed: |
| 236 | sockets = [self.waker] + self.ports.values() |
| 237 | try: |
| 238 | sel_in, sel_out, sel_err = select.select(sockets, [], [], 1) |
| 239 | except: |
| 240 | print sys.exc_info() |
| 241 | self.logger.error("Select error, exiting") |
| 242 | break |
| 243 | |
| 244 | with self.cvar: |
| 245 | for port in sel_in: |
| 246 | if port == self.waker: |
| 247 | self.waker.wait() |
| 248 | continue |
| 249 | else: |
| 250 | # Enqueue packet |
| 251 | pkt, timestamp = port.recv() |
| 252 | port_number = port._port_number |
Roman Bubyr | d4b8741 | 2019-08-05 12:52:40 +0300 | [diff] [blame^] | 253 | # self.logger.debug("Pkt len %d in on port %d", |
| 254 | # len(pkt), port_number) |
Sreeju Sreedhar | e3fefd9 | 2019-04-02 15:57:15 -0700 | [diff] [blame] | 255 | if self.pcap_writer: |
| 256 | self.pcap_writer.write(pkt, timestamp, port_number) |
| 257 | queue = self.packet_queues[port_number] |
| 258 | if len(queue) >= self.MAX_QUEUE_LEN: |
| 259 | # Queue full, throw away oldest |
| 260 | queue.pop(0) |
Roman Bubyr | d4b8741 | 2019-08-05 12:52:40 +0300 | [diff] [blame^] | 261 | # self.logger.debug("Discarding oldest packet to make room") |
Sreeju Sreedhar | e3fefd9 | 2019-04-02 15:57:15 -0700 | [diff] [blame] | 262 | queue.append((pkt, timestamp)) |
| 263 | self.cvar.notify_all() |
| 264 | |
| 265 | self.logger.info("Thread exit") |
| 266 | |
| 267 | def port_add(self, interface_name, port_number): |
| 268 | """ |
| 269 | Add a port to the dataplane |
| 270 | @param interface_name The name of the physical interface like eth1 |
| 271 | @param port_number The port number used to refer to the port |
| 272 | Stashes the port number on the created port object. |
| 273 | """ |
| 274 | self.ports[port_number] = self.dppclass(interface_name, port_number) |
| 275 | self.ports[port_number]._port_number = port_number |
| 276 | self.packet_queues[port_number] = [] |
| 277 | # Need to wake up event loop to change the sockets being selected on. |
| 278 | self.waker.notify() |
| 279 | |
| 280 | def send(self, port_number, packet): |
| 281 | """ |
| 282 | Send a packet to the given port |
| 283 | @param port_number The port to send the data to |
| 284 | @param packet Raw packet data to send to port |
| 285 | """ |
| 286 | self.logger.debug("Sending %d bytes to port %d" % |
| 287 | (len(packet), port_number)) |
| 288 | if self.pcap_writer: |
| 289 | self.pcap_writer.write(packet, time.time(), port_number) |
| 290 | bytes = self.ports[port_number].send(packet) |
| 291 | if bytes != len(packet): |
| 292 | self.logger.error("Unhandled send error, length mismatch %d != %d" % |
| 293 | (bytes, len(packet))) |
| 294 | return bytes |
| 295 | |
| 296 | def oldest_port_number(self): |
| 297 | """ |
| 298 | Returns the port number with the oldest packet, or |
| 299 | None if no packets are queued. |
| 300 | """ |
| 301 | min_port_number = None |
| 302 | min_time = float('inf') |
| 303 | for (port_number, queue) in self.packet_queues.items(): |
| 304 | if queue and queue[0][1] < min_time: |
| 305 | min_time = queue[0][1] |
| 306 | min_port_number = port_number |
| 307 | return min_port_number |
| 308 | |
| 309 | # Dequeues and yields packets in the order they were received. |
| 310 | # Yields (port number, packet, received time). |
| 311 | # If port_number is not specified yields packets from all ports. |
| 312 | def packets(self, port_number=None): |
| 313 | while True: |
| 314 | rcv_port_number = port_number or self.oldest_port_number() |
| 315 | |
| 316 | if rcv_port_number == None: |
| 317 | self.logger.debug("Out of packets on all ports") |
| 318 | break |
| 319 | |
| 320 | queue = self.packet_queues[rcv_port_number] |
| 321 | |
| 322 | if len(queue) == 0: |
| 323 | self.logger.debug("Out of packets on port %d", rcv_port_number) |
| 324 | break |
| 325 | |
| 326 | pkt, time = queue.pop(0) |
| 327 | yield (rcv_port_number, pkt, time) |
| 328 | |
| 329 | def poll(self, port_number=None, timeout=-1, exp_pkt=None): |
| 330 | """ |
| 331 | Poll one or all dataplane ports for a packet |
| 332 | |
| 333 | If port_number is given, get the oldest packet from that port. |
| 334 | Otherwise, find the port with the oldest packet and return |
| 335 | that packet. |
| 336 | |
| 337 | If exp_pkt is true, discard all packets until that one is found |
| 338 | |
| 339 | @param port_number If set, get packet from this port |
| 340 | @param timeout If positive and no packet is available, block |
| 341 | until a packet is received or for this many seconds |
| 342 | @param exp_pkt If not None, look for this packet and ignore any |
| 343 | others received. Note that if port_number is None, all packets |
| 344 | from all ports will be discarded until the exp_pkt is found |
| 345 | @return The triple port_number, packet, pkt_time where packet |
| 346 | is received from port_number at time pkt_time. If a timeout |
| 347 | occurs, return None, None, None |
| 348 | """ |
| 349 | |
| 350 | if exp_pkt and not port_number: |
| 351 | self.logger.warn("Dataplane poll with exp_pkt but no port number") |
| 352 | |
| 353 | # Retrieve the packet. Returns (port number, packet, time). |
| 354 | def grab(): |
| 355 | self.logger.debug("Grabbing packet") |
| 356 | for (rcv_port_number, pkt, time) in self.packets(port_number): |
| 357 | self.logger.debug("Checking packet from port %d", rcv_port_number) |
| 358 | if not exp_pkt or match_exp_pkt(self, exp_pkt, pkt): |
| 359 | return (rcv_port_number, pkt, time) |
| 360 | self.logger.debug("Did not find packet") |
| 361 | return None |
| 362 | |
| 363 | with self.cvar: |
| 364 | ret = ofutils.timed_wait(self.cvar, grab, timeout=timeout) |
| 365 | |
| 366 | if ret != None: |
| 367 | return ret |
| 368 | else: |
| 369 | self.logger.debug("Poll time out, no packet from " + str(port_number)) |
| 370 | return (None, None, None) |
| 371 | |
| 372 | def kill(self): |
| 373 | """ |
| 374 | Stop the dataplane thread. |
| 375 | """ |
| 376 | self.killed = True |
| 377 | self.waker.notify() |
| 378 | self.join() |
| 379 | # Explicitly release ports to ensure we don't run out of sockets |
| 380 | # even if someone keeps holding a reference to the dataplane. |
| 381 | del self.ports |
| 382 | |
| 383 | def port_down(self, port_number): |
| 384 | """Brings the specified port down""" |
| 385 | self.ports[port_number].down() |
| 386 | |
| 387 | def port_up(self, port_number): |
| 388 | """Brings the specified port up""" |
| 389 | self.ports[port_number].up() |
| 390 | |
| 391 | def flush(self): |
| 392 | """ |
| 393 | Drop any queued packets. |
| 394 | """ |
| 395 | for port_number in self.packet_queues.keys(): |
| 396 | self.packet_queues[port_number] = [] |
| 397 | |
| 398 | def start_pcap(self, filename): |
| 399 | assert(self.pcap_writer == None) |
| 400 | self.pcap_writer = PcapWriter(filename) |
| 401 | |
| 402 | def stop_pcap(self): |
| 403 | if self.pcap_writer: |
| 404 | self.pcap_writer.close() |
| 405 | self.pcap_writer = None |
Roman Bubyr | 8c38557 | 2019-04-25 11:45:13 +0300 | [diff] [blame] | 406 | |
| 407 | def port_mtu(self, port_number, mtu): |
| 408 | """Change MTU on controller's port""" |
| 409 | self.ports[port_number].mtu(mtu) |