blob: 6a60a9a6a57c84cbf05f05dac2f145db1f3141c2 [file] [log] [blame]
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001package lz4
2
3import (
4 "encoding/binary"
5 "fmt"
6 "io"
khenaidoo7d3c5582021-08-11 18:09:44 -04007 "runtime"
Holger Hildebrandtfa074992020-03-27 15:42:06 +00008
9 "github.com/pierrec/lz4/internal/xxh32"
10)
11
khenaidoo7d3c5582021-08-11 18:09:44 -040012// zResult contains the results of compressing a block.
13type zResult struct {
14 size uint32 // Block header
15 data []byte // Compressed data
16 checksum uint32 // Data checksum
17}
18
Holger Hildebrandtfa074992020-03-27 15:42:06 +000019// Writer implements the LZ4 frame encoder.
20type Writer struct {
21 Header
22 // Handler called when a block has been successfully written out.
23 // It provides the number of bytes written.
24 OnBlockDone func(size int)
25
26 buf [19]byte // magic number(4) + header(flags(2)+[Size(8)+DictID(4)]+checksum(1)) does not exceed 19 bytes
27 dst io.Writer // Destination.
28 checksum xxh32.XXHZero // Frame checksum.
khenaidoo7d3c5582021-08-11 18:09:44 -040029 data []byte // Data to be compressed + buffer for compressed data.
Holger Hildebrandtfa074992020-03-27 15:42:06 +000030 idx int // Index into data.
31 hashtable [winSize]int // Hash table used in CompressBlock().
khenaidoo7d3c5582021-08-11 18:09:44 -040032
33 // For concurrency.
34 c chan chan zResult // Channel for block compression goroutines and writer goroutine.
35 err error // Any error encountered while writing to the underlying destination.
Holger Hildebrandtfa074992020-03-27 15:42:06 +000036}
37
38// NewWriter returns a new LZ4 frame encoder.
39// No access to the underlying io.Writer is performed.
40// The supplied Header is checked at the first Write.
41// It is ok to change it before the first Write but then not until a Reset() is performed.
42func NewWriter(dst io.Writer) *Writer {
khenaidoo7d3c5582021-08-11 18:09:44 -040043 z := new(Writer)
44 z.Reset(dst)
45 return z
46}
47
48// WithConcurrency sets the number of concurrent go routines used for compression.
49// A negative value sets the concurrency to GOMAXPROCS.
50func (z *Writer) WithConcurrency(n int) *Writer {
51 switch {
52 case n == 0 || n == 1:
53 z.c = nil
54 return z
55 case n < 0:
56 n = runtime.GOMAXPROCS(0)
57 }
58 z.c = make(chan chan zResult, n)
59 // Writer goroutine managing concurrent block compression goroutines.
60 go func() {
61 // Process next block compression item.
62 for c := range z.c {
63 // Read the next compressed block result.
64 // Waiting here ensures that the blocks are output in the order they were sent.
65 // The incoming channel is always closed as it indicates to the caller that
66 // the block has been processed.
67 res := <-c
68 n := len(res.data)
69 if n == 0 {
70 // Notify the block compression routine that we are done with its result.
71 // This is used when a sentinel block is sent to terminate the compression.
72 close(c)
73 return
74 }
75 // Write the block.
76 if err := z.writeUint32(res.size); err != nil && z.err == nil {
77 z.err = err
78 }
79 if _, err := z.dst.Write(res.data); err != nil && z.err == nil {
80 z.err = err
81 }
82 if z.BlockChecksum {
83 if err := z.writeUint32(res.checksum); err != nil && z.err == nil {
84 z.err = err
85 }
86 }
87 if isCompressed := res.size&compressedBlockFlag == 0; isCompressed {
88 // It is now safe to release the buffer as no longer in use by any goroutine.
89 putBuffer(cap(res.data), res.data)
90 }
91 if h := z.OnBlockDone; h != nil {
92 h(n)
93 }
94 close(c)
95 }
96 }()
97 return z
98}
99
100// newBuffers instantiates new buffers which size matches the one in Header.
101// The returned buffers are for decompression and compression respectively.
102func (z *Writer) newBuffers() {
103 bSize := z.Header.BlockMaxSize
104 buf := getBuffer(bSize)
105 z.data = buf[:bSize] // Uncompressed buffer is the first half.
106}
107
108// freeBuffers puts the writer's buffers back to the pool.
109func (z *Writer) freeBuffers() {
110 // Put the buffer back into the pool, if any.
111 putBuffer(z.Header.BlockMaxSize, z.data)
112 z.data = nil
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000113}
114
115// writeHeader builds and writes the header (magic+header) to the underlying io.Writer.
116func (z *Writer) writeHeader() error {
117 // Default to 4Mb if BlockMaxSize is not set.
118 if z.Header.BlockMaxSize == 0 {
khenaidoo7d3c5582021-08-11 18:09:44 -0400119 z.Header.BlockMaxSize = blockSize4M
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000120 }
121 // The only option that needs to be validated.
122 bSize := z.Header.BlockMaxSize
khenaidoo7d3c5582021-08-11 18:09:44 -0400123 if !isValidBlockSize(z.Header.BlockMaxSize) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000124 return fmt.Errorf("lz4: invalid block max size: %d", bSize)
125 }
126 // Allocate the compressed/uncompressed buffers.
127 // The compressed buffer cannot exceed the uncompressed one.
khenaidoo7d3c5582021-08-11 18:09:44 -0400128 z.newBuffers()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000129 z.idx = 0
130
131 // Size is optional.
132 buf := z.buf[:]
133
134 // Set the fixed size data: magic number, block max size and flags.
135 binary.LittleEndian.PutUint32(buf[0:], frameMagic)
136 flg := byte(Version << 6)
137 flg |= 1 << 5 // No block dependency.
138 if z.Header.BlockChecksum {
139 flg |= 1 << 4
140 }
141 if z.Header.Size > 0 {
142 flg |= 1 << 3
143 }
144 if !z.Header.NoChecksum {
145 flg |= 1 << 2
146 }
147 buf[4] = flg
khenaidoo7d3c5582021-08-11 18:09:44 -0400148 buf[5] = blockSizeValueToIndex(z.Header.BlockMaxSize) << 4
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000149
150 // Current buffer size: magic(4) + flags(1) + block max size (1).
151 n := 6
152 // Optional items.
153 if z.Header.Size > 0 {
154 binary.LittleEndian.PutUint64(buf[n:], z.Header.Size)
155 n += 8
156 }
157
158 // The header checksum includes the flags, block max size and optional Size.
159 buf[n] = byte(xxh32.ChecksumZero(buf[4:n]) >> 8 & 0xFF)
160 z.checksum.Reset()
161
162 // Header ready, write it out.
163 if _, err := z.dst.Write(buf[0 : n+1]); err != nil {
164 return err
165 }
166 z.Header.done = true
167 if debugFlag {
168 debug("wrote header %v", z.Header)
169 }
170
171 return nil
172}
173
174// Write compresses data from the supplied buffer into the underlying io.Writer.
175// Write does not return until the data has been written.
176func (z *Writer) Write(buf []byte) (int, error) {
177 if !z.Header.done {
178 if err := z.writeHeader(); err != nil {
179 return 0, err
180 }
181 }
182 if debugFlag {
183 debug("input buffer len=%d index=%d", len(buf), z.idx)
184 }
185
186 zn := len(z.data)
187 var n int
188 for len(buf) > 0 {
189 if z.idx == 0 && len(buf) >= zn {
190 // Avoid a copy as there is enough data for a block.
191 if err := z.compressBlock(buf[:zn]); err != nil {
192 return n, err
193 }
194 n += zn
195 buf = buf[zn:]
196 continue
197 }
198 // Accumulate the data to be compressed.
199 m := copy(z.data[z.idx:], buf)
200 n += m
201 z.idx += m
202 buf = buf[m:]
203 if debugFlag {
204 debug("%d bytes copied to buf, current index %d", n, z.idx)
205 }
206
207 if z.idx < len(z.data) {
208 // Buffer not filled.
209 if debugFlag {
210 debug("need more data for compression")
211 }
212 return n, nil
213 }
214
215 // Buffer full.
216 if err := z.compressBlock(z.data); err != nil {
217 return n, err
218 }
219 z.idx = 0
220 }
221
222 return n, nil
223}
224
225// compressBlock compresses a block.
226func (z *Writer) compressBlock(data []byte) error {
227 if !z.NoChecksum {
khenaidoo7d3c5582021-08-11 18:09:44 -0400228 _, _ = z.checksum.Write(data)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000229 }
230
khenaidoo7d3c5582021-08-11 18:09:44 -0400231 if z.c != nil {
232 c := make(chan zResult)
233 z.c <- c // Send now to guarantee order
234 go writerCompressBlock(c, z.Header, data)
235 return nil
236 }
237
238 zdata := z.data[z.Header.BlockMaxSize:cap(z.data)]
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000239 // The compressed block size cannot exceed the input's.
240 var zn int
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000241
242 if level := z.Header.CompressionLevel; level != 0 {
khenaidoo7d3c5582021-08-11 18:09:44 -0400243 zn, _ = CompressBlockHC(data, zdata, level)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000244 } else {
khenaidoo7d3c5582021-08-11 18:09:44 -0400245 zn, _ = CompressBlock(data, zdata, z.hashtable[:])
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000246 }
247
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000248 var bLen uint32
249 if debugFlag {
250 debug("block compression %d => %d", len(data), zn)
251 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400252 if zn > 0 && zn < len(data) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000253 // Compressible and compressed size smaller than uncompressed: ok!
254 bLen = uint32(zn)
khenaidoo7d3c5582021-08-11 18:09:44 -0400255 zdata = zdata[:zn]
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000256 } else {
257 // Uncompressed block.
258 bLen = uint32(len(data)) | compressedBlockFlag
259 zdata = data
260 }
261 if debugFlag {
262 debug("block compression to be written len=%d data len=%d", bLen, len(zdata))
263 }
264
265 // Write the block.
266 if err := z.writeUint32(bLen); err != nil {
267 return err
268 }
269 written, err := z.dst.Write(zdata)
270 if err != nil {
271 return err
272 }
273 if h := z.OnBlockDone; h != nil {
274 h(written)
275 }
276
277 if !z.BlockChecksum {
278 if debugFlag {
279 debug("current frame checksum %x", z.checksum.Sum32())
280 }
281 return nil
282 }
283 checksum := xxh32.ChecksumZero(zdata)
284 if debugFlag {
285 debug("block checksum %x", checksum)
286 defer func() { debug("current frame checksum %x", z.checksum.Sum32()) }()
287 }
288 return z.writeUint32(checksum)
289}
290
291// Flush flushes any pending compressed data to the underlying writer.
292// Flush does not return until the data has been written.
293// If the underlying writer returns an error, Flush returns that error.
294func (z *Writer) Flush() error {
295 if debugFlag {
296 debug("flush with index %d", z.idx)
297 }
298 if z.idx == 0 {
299 return nil
300 }
301
khenaidoo7d3c5582021-08-11 18:09:44 -0400302 data := z.data[:z.idx]
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000303 z.idx = 0
khenaidoo7d3c5582021-08-11 18:09:44 -0400304 if z.c == nil {
305 return z.compressBlock(data)
306 }
307 if !z.NoChecksum {
308 _, _ = z.checksum.Write(data)
309 }
310 c := make(chan zResult)
311 z.c <- c
312 writerCompressBlock(c, z.Header, data)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000313 return nil
314}
315
khenaidoo7d3c5582021-08-11 18:09:44 -0400316func (z *Writer) close() error {
317 if z.c == nil {
318 return nil
319 }
320 // Send a sentinel block (no data to compress) to terminate the writer main goroutine.
321 c := make(chan zResult)
322 z.c <- c
323 c <- zResult{}
324 // Wait for the main goroutine to complete.
325 <-c
326 // At this point the main goroutine has shut down or is about to return.
327 z.c = nil
328 return z.err
329}
330
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000331// Close closes the Writer, flushing any unwritten data to the underlying io.Writer, but does not close the underlying io.Writer.
332func (z *Writer) Close() error {
333 if !z.Header.done {
334 if err := z.writeHeader(); err != nil {
335 return err
336 }
337 }
338 if err := z.Flush(); err != nil {
339 return err
340 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400341 if err := z.close(); err != nil {
342 return err
343 }
344 z.freeBuffers()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000345
346 if debugFlag {
347 debug("writing last empty block")
348 }
349 if err := z.writeUint32(0); err != nil {
350 return err
351 }
352 if z.NoChecksum {
353 return nil
354 }
355 checksum := z.checksum.Sum32()
356 if debugFlag {
357 debug("stream checksum %x", checksum)
358 }
359 return z.writeUint32(checksum)
360}
361
362// Reset clears the state of the Writer z such that it is equivalent to its
363// initial state from NewWriter, but instead writing to w.
364// No access to the underlying io.Writer is performed.
365func (z *Writer) Reset(w io.Writer) {
khenaidoo7d3c5582021-08-11 18:09:44 -0400366 n := cap(z.c)
367 _ = z.close()
368 z.freeBuffers()
369 z.Header.Reset()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000370 z.dst = w
371 z.checksum.Reset()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000372 z.idx = 0
khenaidoo7d3c5582021-08-11 18:09:44 -0400373 z.err = nil
374 // reset hashtable to ensure deterministic output.
375 for i := range z.hashtable {
376 z.hashtable[i] = 0
377 }
378 z.WithConcurrency(n)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000379}
380
381// writeUint32 writes a uint32 to the underlying writer.
382func (z *Writer) writeUint32(x uint32) error {
383 buf := z.buf[:4]
384 binary.LittleEndian.PutUint32(buf, x)
385 _, err := z.dst.Write(buf)
386 return err
387}
khenaidoo7d3c5582021-08-11 18:09:44 -0400388
389// writerCompressBlock compresses data into a pooled buffer and writes its result
390// out to the input channel.
391func writerCompressBlock(c chan zResult, header Header, data []byte) {
392 zdata := getBuffer(header.BlockMaxSize)
393 // The compressed block size cannot exceed the input's.
394 var zn int
395 if level := header.CompressionLevel; level != 0 {
396 zn, _ = CompressBlockHC(data, zdata, level)
397 } else {
398 var hashTable [winSize]int
399 zn, _ = CompressBlock(data, zdata, hashTable[:])
400 }
401 var res zResult
402 if zn > 0 && zn < len(data) {
403 res.size = uint32(zn)
404 res.data = zdata[:zn]
405 } else {
406 res.size = uint32(len(data)) | compressedBlockFlag
407 res.data = data
408 }
409 if header.BlockChecksum {
410 res.checksum = xxh32.ChecksumZero(res.data)
411 }
412 c <- res
413}