blob: 6dce53036e67ccbf57771fc27f4c290fdb4e0217 [file] [log] [blame]
"""
Pcap file writer
"""
import struct
PcapHeader = struct.Struct("<LHHLLLL")
PcapPktHeader = struct.Struct("<LLLL")
PPIPktHeader = struct.Struct("<BBHL")
PPIAggregateField = struct.Struct("<HHL")
class PcapWriter(object):
def __init__(self, filename):
"""
Open a pcap file
"""
self.stream = file(filename, 'w')
self.stream.write(PcapHeader.pack(
0xa1b2c3d4, # magic
2, # major
4, # minor
0, # timezone offset
0, # timezone accuracy
65535, # snapshot length
192 # PPI linktype
))
def write(self, data, timestamp, port):
"""
Write a packet to a pcap file
'data' should be a string containing the packet data.
'timestamp' should be a float.
'port' should be an integer port number.
"""
ppi_len = PPIPktHeader.size + PPIAggregateField.size
self.stream.write(PcapPktHeader.pack(
int(timestamp), # timestamp seconds
int((timestamp - int(timestamp)) * 10**6), # timestamp microseconds
len(data) + ppi_len, # truncated length
len(data) + ppi_len # un-truncated length
))
self.stream.write(PPIPktHeader.pack(
0, # version
0, # flags
ppi_len, # length
1, # ethernet dlt
))
self.stream.write(PPIAggregateField.pack(8, PPIAggregateField.size - 4, port))
self.stream.write(data)
def close(self):
self.stream.close()
if __name__ == "__main__":
import time
print("Writing test pcap to test.pcap")
pcap_writer = PcapWriter("test.pcap")
pcap_writer.write("\x00\x01\x02\x03\x04\x05\x00\x0a\x0b\x0c\x0d\x0e\x08\x00", time.time(), 42)
pcap_writer.close()