Takahiro Suzuki | 241c10e | 2020-12-17 20:17:57 +0900 | [diff] [blame^] | 1 | #!/usr/bin/python |
| 2 | # Copyright 2012 Google, Inc. All rights reserved. |
| 3 | |
| 4 | """TestCreator creates test templates from pcap files.""" |
| 5 | |
| 6 | import argparse |
| 7 | import base64 |
| 8 | import glob |
| 9 | import re |
| 10 | import string |
| 11 | import subprocess |
| 12 | import sys |
| 13 | |
| 14 | |
| 15 | class Packet(object): |
| 16 | """Helper class encapsulating packet from a pcap file.""" |
| 17 | |
| 18 | def __init__(self, packet_lines): |
| 19 | self.packet_lines = packet_lines |
| 20 | self.data = self._DecodeText(packet_lines) |
| 21 | |
| 22 | @classmethod |
| 23 | def _DecodeText(cls, packet_lines): |
| 24 | packet_bytes = [] |
| 25 | # First line is timestamp and stuff, skip it. |
| 26 | # Format: 0x0010: 0000 0020 3aff 3ffe 0000 0000 0000 0000 ....:.?......... |
| 27 | |
| 28 | for line in packet_lines[1:]: |
| 29 | m = re.match(r'\s+0x[a-f\d]+:\s+((?:[\da-f]{2,4}\s)*)', line, re.IGNORECASE) |
| 30 | if m is None: continue |
| 31 | for hexpart in m.group(1).split(): |
| 32 | packet_bytes.append(base64.b16decode(hexpart.upper())) |
| 33 | return ''.join(packet_bytes) |
| 34 | |
| 35 | def Test(self, name, link_type): |
| 36 | """Yields a test using this packet, as a set of lines.""" |
| 37 | yield '// testPacket%s is the packet:' % name |
| 38 | for line in self.packet_lines: |
| 39 | yield '// ' + line |
| 40 | yield 'var testPacket%s = []byte{' % name |
| 41 | data = list(self.data) |
| 42 | while data: |
| 43 | linebytes, data = data[:16], data[16:] |
| 44 | yield ''.join(['\t'] + ['0x%02x, ' % ord(c) for c in linebytes]) |
| 45 | yield '}' |
| 46 | yield 'func TestPacket%s(t *testing.T) {' % name |
| 47 | yield '\tp := gopacket.NewPacket(testPacket%s, LinkType%s, gopacket.Default)' % (name, link_type) |
| 48 | yield '\tif p.ErrorLayer() != nil {' |
| 49 | yield '\t\tt.Error("Failed to decode packet:", p.ErrorLayer().Error())' |
| 50 | yield '\t}' |
| 51 | yield '\tcheckLayers(p, []gopacket.LayerType{LayerType%s, FILL_ME_IN_WITH_ACTUAL_LAYERS}, t)' % link_type |
| 52 | yield '}' |
| 53 | yield 'func BenchmarkDecodePacket%s(b *testing.B) {' % name |
| 54 | yield '\tfor i := 0; i < b.N; i++ {' |
| 55 | yield '\t\tgopacket.NewPacket(testPacket%s, LinkType%s, gopacket.NoCopy)' % (name, link_type) |
| 56 | yield '\t}' |
| 57 | yield '}' |
| 58 | |
| 59 | |
| 60 | |
| 61 | def GetTcpdumpOutput(filename): |
| 62 | """Runs tcpdump on the given file, returning output as string.""" |
| 63 | return subprocess.check_output( |
| 64 | ['tcpdump', '-XX', '-s', '0', '-n', '-r', filename]) |
| 65 | |
| 66 | |
| 67 | def TcpdumpOutputToPackets(output): |
| 68 | """Reads a pcap file with TCPDump, yielding Packet objects.""" |
| 69 | pdata = [] |
| 70 | for line in output.splitlines(): |
| 71 | if line[0] not in string.whitespace and pdata: |
| 72 | yield Packet(pdata) |
| 73 | pdata = [] |
| 74 | pdata.append(line) |
| 75 | if pdata: |
| 76 | yield Packet(pdata) |
| 77 | |
| 78 | |
| 79 | def main(): |
| 80 | class CustomHelpFormatter(argparse.ArgumentDefaultsHelpFormatter): |
| 81 | def _format_usage(self, usage, actions, groups, prefix=None): |
| 82 | header =('TestCreator creates gopacket tests using a pcap file.\n\n' |
| 83 | 'Tests are written to standard out... they can then be \n' |
| 84 | 'copied into the file of your choice and modified as \n' |
| 85 | 'you see.\n\n') |
| 86 | return header + argparse.ArgumentDefaultsHelpFormatter._format_usage( |
| 87 | self, usage, actions, groups, prefix) |
| 88 | |
| 89 | parser = argparse.ArgumentParser(formatter_class=CustomHelpFormatter) |
| 90 | parser.add_argument('--link_type', default='Ethernet', help='the link type (default: %(default)s)') |
| 91 | parser.add_argument('--name', default='Packet%d', help='the layer type, must have "%d" inside it') |
| 92 | parser.add_argument('files', metavar='file.pcap', type=str, nargs='+', help='the files to process') |
| 93 | |
| 94 | args = parser.parse_args() |
| 95 | |
| 96 | for arg in args.files: |
| 97 | for path in glob.glob(arg): |
| 98 | for i, packet in enumerate(TcpdumpOutputToPackets(GetTcpdumpOutput(path))): |
| 99 | print '\n'.join(packet.Test( |
| 100 | args.name % i, args.link_type)) |
| 101 | |
| 102 | if __name__ == '__main__': |
| 103 | main() |