blob: 804a68cc2586db23b4a5577a7c0cd83a258bcee2 [file] [log] [blame]
Scott Bakered4efab2020-01-13 19:12:25 -08001package lz4
2
3import (
4 "encoding/binary"
5 "fmt"
6 "io"
7
8 "github.com/pierrec/lz4/internal/xxh32"
9)
10
11// Writer implements the LZ4 frame encoder.
12type Writer struct {
13 Header
14 // Handler called when a block has been successfully written out.
15 // It provides the number of bytes written.
16 OnBlockDone func(size int)
17
18 buf [19]byte // magic number(4) + header(flags(2)+[Size(8)+DictID(4)]+checksum(1)) does not exceed 19 bytes
19 dst io.Writer // Destination.
20 checksum xxh32.XXHZero // Frame checksum.
21 zdata []byte // Compressed data.
22 data []byte // Data to be compressed.
23 idx int // Index into data.
24 hashtable [winSize]int // Hash table used in CompressBlock().
25}
26
27// NewWriter returns a new LZ4 frame encoder.
28// No access to the underlying io.Writer is performed.
29// The supplied Header is checked at the first Write.
30// It is ok to change it before the first Write but then not until a Reset() is performed.
31func NewWriter(dst io.Writer) *Writer {
32 return &Writer{dst: dst}
33}
34
35// writeHeader builds and writes the header (magic+header) to the underlying io.Writer.
36func (z *Writer) writeHeader() error {
37 // Default to 4Mb if BlockMaxSize is not set.
38 if z.Header.BlockMaxSize == 0 {
39 z.Header.BlockMaxSize = bsMapID[7]
40 }
41 // The only option that needs to be validated.
42 bSize := z.Header.BlockMaxSize
43 bSizeID, ok := bsMapValue[bSize]
44 if !ok {
45 return fmt.Errorf("lz4: invalid block max size: %d", bSize)
46 }
47 // Allocate the compressed/uncompressed buffers.
48 // The compressed buffer cannot exceed the uncompressed one.
49 if n := 2 * bSize; cap(z.zdata) < n {
50 z.zdata = make([]byte, n, n)
51 }
52 z.data = z.zdata[:bSize]
53 z.zdata = z.zdata[:cap(z.zdata)][bSize:]
54 z.idx = 0
55
56 // Size is optional.
57 buf := z.buf[:]
58
59 // Set the fixed size data: magic number, block max size and flags.
60 binary.LittleEndian.PutUint32(buf[0:], frameMagic)
61 flg := byte(Version << 6)
62 flg |= 1 << 5 // No block dependency.
63 if z.Header.BlockChecksum {
64 flg |= 1 << 4
65 }
66 if z.Header.Size > 0 {
67 flg |= 1 << 3
68 }
69 if !z.Header.NoChecksum {
70 flg |= 1 << 2
71 }
72 buf[4] = flg
73 buf[5] = bSizeID << 4
74
75 // Current buffer size: magic(4) + flags(1) + block max size (1).
76 n := 6
77 // Optional items.
78 if z.Header.Size > 0 {
79 binary.LittleEndian.PutUint64(buf[n:], z.Header.Size)
80 n += 8
81 }
82
83 // The header checksum includes the flags, block max size and optional Size.
84 buf[n] = byte(xxh32.ChecksumZero(buf[4:n]) >> 8 & 0xFF)
85 z.checksum.Reset()
86
87 // Header ready, write it out.
88 if _, err := z.dst.Write(buf[0 : n+1]); err != nil {
89 return err
90 }
91 z.Header.done = true
92 if debugFlag {
93 debug("wrote header %v", z.Header)
94 }
95
96 return nil
97}
98
99// Write compresses data from the supplied buffer into the underlying io.Writer.
100// Write does not return until the data has been written.
101func (z *Writer) Write(buf []byte) (int, error) {
102 if !z.Header.done {
103 if err := z.writeHeader(); err != nil {
104 return 0, err
105 }
106 }
107 if debugFlag {
108 debug("input buffer len=%d index=%d", len(buf), z.idx)
109 }
110
111 zn := len(z.data)
112 var n int
113 for len(buf) > 0 {
114 if z.idx == 0 && len(buf) >= zn {
115 // Avoid a copy as there is enough data for a block.
116 if err := z.compressBlock(buf[:zn]); err != nil {
117 return n, err
118 }
119 n += zn
120 buf = buf[zn:]
121 continue
122 }
123 // Accumulate the data to be compressed.
124 m := copy(z.data[z.idx:], buf)
125 n += m
126 z.idx += m
127 buf = buf[m:]
128 if debugFlag {
129 debug("%d bytes copied to buf, current index %d", n, z.idx)
130 }
131
132 if z.idx < len(z.data) {
133 // Buffer not filled.
134 if debugFlag {
135 debug("need more data for compression")
136 }
137 return n, nil
138 }
139
140 // Buffer full.
141 if err := z.compressBlock(z.data); err != nil {
142 return n, err
143 }
144 z.idx = 0
145 }
146
147 return n, nil
148}
149
150// compressBlock compresses a block.
151func (z *Writer) compressBlock(data []byte) error {
152 if !z.NoChecksum {
153 z.checksum.Write(data)
154 }
155
156 // The compressed block size cannot exceed the input's.
157 var zn int
158 var err error
159
160 if level := z.Header.CompressionLevel; level != 0 {
161 zn, err = CompressBlockHC(data, z.zdata, level)
162 } else {
163 zn, err = CompressBlock(data, z.zdata, z.hashtable[:])
164 }
165
166 var zdata []byte
167 var bLen uint32
168 if debugFlag {
169 debug("block compression %d => %d", len(data), zn)
170 }
171 if err == nil && zn > 0 && zn < len(data) {
172 // Compressible and compressed size smaller than uncompressed: ok!
173 bLen = uint32(zn)
174 zdata = z.zdata[:zn]
175 } else {
176 // Uncompressed block.
177 bLen = uint32(len(data)) | compressedBlockFlag
178 zdata = data
179 }
180 if debugFlag {
181 debug("block compression to be written len=%d data len=%d", bLen, len(zdata))
182 }
183
184 // Write the block.
185 if err := z.writeUint32(bLen); err != nil {
186 return err
187 }
188 written, err := z.dst.Write(zdata)
189 if err != nil {
190 return err
191 }
192 if h := z.OnBlockDone; h != nil {
193 h(written)
194 }
195
196 if !z.BlockChecksum {
197 if debugFlag {
198 debug("current frame checksum %x", z.checksum.Sum32())
199 }
200 return nil
201 }
202 checksum := xxh32.ChecksumZero(zdata)
203 if debugFlag {
204 debug("block checksum %x", checksum)
205 defer func() { debug("current frame checksum %x", z.checksum.Sum32()) }()
206 }
207 return z.writeUint32(checksum)
208}
209
210// Flush flushes any pending compressed data to the underlying writer.
211// Flush does not return until the data has been written.
212// If the underlying writer returns an error, Flush returns that error.
213func (z *Writer) Flush() error {
214 if debugFlag {
215 debug("flush with index %d", z.idx)
216 }
217 if z.idx == 0 {
218 return nil
219 }
220
221 if err := z.compressBlock(z.data[:z.idx]); err != nil {
222 return err
223 }
224 z.idx = 0
225 return nil
226}
227
228// Close closes the Writer, flushing any unwritten data to the underlying io.Writer, but does not close the underlying io.Writer.
229func (z *Writer) Close() error {
230 if !z.Header.done {
231 if err := z.writeHeader(); err != nil {
232 return err
233 }
234 }
235 if err := z.Flush(); err != nil {
236 return err
237 }
238
239 if debugFlag {
240 debug("writing last empty block")
241 }
242 if err := z.writeUint32(0); err != nil {
243 return err
244 }
245 if z.NoChecksum {
246 return nil
247 }
248 checksum := z.checksum.Sum32()
249 if debugFlag {
250 debug("stream checksum %x", checksum)
251 }
252 return z.writeUint32(checksum)
253}
254
255// Reset clears the state of the Writer z such that it is equivalent to its
256// initial state from NewWriter, but instead writing to w.
257// No access to the underlying io.Writer is performed.
258func (z *Writer) Reset(w io.Writer) {
259 z.Header = Header{}
260 z.dst = w
261 z.checksum.Reset()
262 z.zdata = z.zdata[:0]
263 z.data = z.data[:0]
264 z.idx = 0
265}
266
267// writeUint32 writes a uint32 to the underlying writer.
268func (z *Writer) writeUint32(x uint32) error {
269 buf := z.buf[:4]
270 binary.LittleEndian.PutUint32(buf, x)
271 _, err := z.dst.Write(buf)
272 return err
273}