blob: f066d56305e6fa7ac9e83ee9ce008be438edc50e [file] [log] [blame]
kesavand2cde6582020-06-22 04:56:23 -04001package lz4
2
3import (
4 "encoding/binary"
5 "fmt"
6 "io"
kesavandc71914f2022-03-25 11:19:03 +05307 "runtime"
kesavand2cde6582020-06-22 04:56:23 -04008
9 "github.com/pierrec/lz4/internal/xxh32"
10)
11
kesavandc71914f2022-03-25 11:19:03 +053012// zResult contains the results of compressing a block.
13type zResult struct {
14 size uint32 // Block header
15 data []byte // Compressed data
16 checksum uint32 // Data checksum
17}
18
kesavand2cde6582020-06-22 04:56:23 -040019// Writer implements the LZ4 frame encoder.
20type Writer struct {
21 Header
kesavandc71914f2022-03-25 11:19:03 +053022 // Handler called when a block has been successfully written out.
23 // It provides the number of bytes written.
24 OnBlockDone func(size int)
kesavand2cde6582020-06-22 04:56:23 -040025
26 buf [19]byte // magic number(4) + header(flags(2)+[Size(8)+DictID(4)]+checksum(1)) does not exceed 19 bytes
27 dst io.Writer // Destination.
28 checksum xxh32.XXHZero // Frame checksum.
kesavandc71914f2022-03-25 11:19:03 +053029 data []byte // Data to be compressed + buffer for compressed data.
kesavand2cde6582020-06-22 04:56:23 -040030 idx int // Index into data.
31 hashtable [winSize]int // Hash table used in CompressBlock().
kesavandc71914f2022-03-25 11:19:03 +053032
33 // For concurrency.
34 c chan chan zResult // Channel for block compression goroutines and writer goroutine.
35 err error // Any error encountered while writing to the underlying destination.
kesavand2cde6582020-06-22 04:56:23 -040036}
37
38// NewWriter returns a new LZ4 frame encoder.
39// No access to the underlying io.Writer is performed.
40// The supplied Header is checked at the first Write.
41// It is ok to change it before the first Write but then not until a Reset() is performed.
42func NewWriter(dst io.Writer) *Writer {
kesavandc71914f2022-03-25 11:19:03 +053043 z := new(Writer)
44 z.Reset(dst)
45 return z
46}
47
48// WithConcurrency sets the number of concurrent go routines used for compression.
49// A negative value sets the concurrency to GOMAXPROCS.
50func (z *Writer) WithConcurrency(n int) *Writer {
51 switch {
52 case n == 0 || n == 1:
53 z.c = nil
54 return z
55 case n < 0:
56 n = runtime.GOMAXPROCS(0)
57 }
58 z.c = make(chan chan zResult, n)
59 // Writer goroutine managing concurrent block compression goroutines.
60 go func() {
61 // Process next block compression item.
62 for c := range z.c {
63 // Read the next compressed block result.
64 // Waiting here ensures that the blocks are output in the order they were sent.
65 // The incoming channel is always closed as it indicates to the caller that
66 // the block has been processed.
67 res := <-c
68 n := len(res.data)
69 if n == 0 {
70 // Notify the block compression routine that we are done with its result.
71 // This is used when a sentinel block is sent to terminate the compression.
72 close(c)
73 return
74 }
75 // Write the block.
76 if err := z.writeUint32(res.size); err != nil && z.err == nil {
77 z.err = err
78 }
79 if _, err := z.dst.Write(res.data); err != nil && z.err == nil {
80 z.err = err
81 }
82 if z.BlockChecksum {
83 if err := z.writeUint32(res.checksum); err != nil && z.err == nil {
84 z.err = err
85 }
86 }
87 // It is now safe to release the buffer as no longer in use by any goroutine.
88 putBuffer(cap(res.data), res.data)
89 if h := z.OnBlockDone; h != nil {
90 h(n)
91 }
92 close(c)
93 }
94 }()
95 return z
96}
97
98// newBuffers instantiates new buffers which size matches the one in Header.
99// The returned buffers are for decompression and compression respectively.
100func (z *Writer) newBuffers() {
101 bSize := z.Header.BlockMaxSize
102 buf := getBuffer(bSize)
103 z.data = buf[:bSize] // Uncompressed buffer is the first half.
104}
105
106// freeBuffers puts the writer's buffers back to the pool.
107func (z *Writer) freeBuffers() {
108 // Put the buffer back into the pool, if any.
109 putBuffer(z.Header.BlockMaxSize, z.data)
110 z.data = nil
kesavand2cde6582020-06-22 04:56:23 -0400111}
112
113// writeHeader builds and writes the header (magic+header) to the underlying io.Writer.
114func (z *Writer) writeHeader() error {
115 // Default to 4Mb if BlockMaxSize is not set.
116 if z.Header.BlockMaxSize == 0 {
kesavandc71914f2022-03-25 11:19:03 +0530117 z.Header.BlockMaxSize = blockSize4M
kesavand2cde6582020-06-22 04:56:23 -0400118 }
119 // The only option that needs to be validated.
120 bSize := z.Header.BlockMaxSize
kesavandc71914f2022-03-25 11:19:03 +0530121 if !isValidBlockSize(z.Header.BlockMaxSize) {
kesavand2cde6582020-06-22 04:56:23 -0400122 return fmt.Errorf("lz4: invalid block max size: %d", bSize)
123 }
124 // Allocate the compressed/uncompressed buffers.
125 // The compressed buffer cannot exceed the uncompressed one.
kesavandc71914f2022-03-25 11:19:03 +0530126 z.newBuffers()
kesavand2cde6582020-06-22 04:56:23 -0400127 z.idx = 0
128
129 // Size is optional.
130 buf := z.buf[:]
131
132 // Set the fixed size data: magic number, block max size and flags.
133 binary.LittleEndian.PutUint32(buf[0:], frameMagic)
134 flg := byte(Version << 6)
135 flg |= 1 << 5 // No block dependency.
136 if z.Header.BlockChecksum {
137 flg |= 1 << 4
138 }
139 if z.Header.Size > 0 {
140 flg |= 1 << 3
141 }
142 if !z.Header.NoChecksum {
143 flg |= 1 << 2
144 }
145 buf[4] = flg
kesavandc71914f2022-03-25 11:19:03 +0530146 buf[5] = blockSizeValueToIndex(z.Header.BlockMaxSize) << 4
kesavand2cde6582020-06-22 04:56:23 -0400147
148 // Current buffer size: magic(4) + flags(1) + block max size (1).
149 n := 6
150 // Optional items.
151 if z.Header.Size > 0 {
152 binary.LittleEndian.PutUint64(buf[n:], z.Header.Size)
153 n += 8
154 }
155
156 // The header checksum includes the flags, block max size and optional Size.
157 buf[n] = byte(xxh32.ChecksumZero(buf[4:n]) >> 8 & 0xFF)
158 z.checksum.Reset()
159
160 // Header ready, write it out.
161 if _, err := z.dst.Write(buf[0 : n+1]); err != nil {
162 return err
163 }
164 z.Header.done = true
165 if debugFlag {
166 debug("wrote header %v", z.Header)
167 }
168
169 return nil
170}
171
172// Write compresses data from the supplied buffer into the underlying io.Writer.
173// Write does not return until the data has been written.
174func (z *Writer) Write(buf []byte) (int, error) {
175 if !z.Header.done {
176 if err := z.writeHeader(); err != nil {
177 return 0, err
178 }
179 }
180 if debugFlag {
181 debug("input buffer len=%d index=%d", len(buf), z.idx)
182 }
183
184 zn := len(z.data)
185 var n int
186 for len(buf) > 0 {
187 if z.idx == 0 && len(buf) >= zn {
188 // Avoid a copy as there is enough data for a block.
189 if err := z.compressBlock(buf[:zn]); err != nil {
190 return n, err
191 }
192 n += zn
193 buf = buf[zn:]
194 continue
195 }
196 // Accumulate the data to be compressed.
197 m := copy(z.data[z.idx:], buf)
198 n += m
199 z.idx += m
200 buf = buf[m:]
201 if debugFlag {
202 debug("%d bytes copied to buf, current index %d", n, z.idx)
203 }
204
205 if z.idx < len(z.data) {
206 // Buffer not filled.
207 if debugFlag {
208 debug("need more data for compression")
209 }
210 return n, nil
211 }
212
213 // Buffer full.
214 if err := z.compressBlock(z.data); err != nil {
215 return n, err
216 }
217 z.idx = 0
218 }
219
220 return n, nil
221}
222
223// compressBlock compresses a block.
224func (z *Writer) compressBlock(data []byte) error {
225 if !z.NoChecksum {
kesavandc71914f2022-03-25 11:19:03 +0530226 _, _ = z.checksum.Write(data)
kesavand2cde6582020-06-22 04:56:23 -0400227 }
228
kesavandc71914f2022-03-25 11:19:03 +0530229 if z.c != nil {
230 c := make(chan zResult)
231 z.c <- c // Send now to guarantee order
232
233 // get a buffer from the pool and copy the data over
234 block := getBuffer(z.Header.BlockMaxSize)[:len(data)]
235 copy(block, data)
236
237 go writerCompressBlock(c, z.Header, block)
238 return nil
239 }
240
241 zdata := z.data[z.Header.BlockMaxSize:cap(z.data)]
kesavand2cde6582020-06-22 04:56:23 -0400242 // The compressed block size cannot exceed the input's.
243 var zn int
kesavand2cde6582020-06-22 04:56:23 -0400244
245 if level := z.Header.CompressionLevel; level != 0 {
kesavandc71914f2022-03-25 11:19:03 +0530246 zn, _ = CompressBlockHC(data, zdata, level)
kesavand2cde6582020-06-22 04:56:23 -0400247 } else {
kesavandc71914f2022-03-25 11:19:03 +0530248 zn, _ = CompressBlock(data, zdata, z.hashtable[:])
kesavand2cde6582020-06-22 04:56:23 -0400249 }
250
kesavand2cde6582020-06-22 04:56:23 -0400251 var bLen uint32
252 if debugFlag {
253 debug("block compression %d => %d", len(data), zn)
254 }
kesavandc71914f2022-03-25 11:19:03 +0530255 if zn > 0 && zn < len(data) {
kesavand2cde6582020-06-22 04:56:23 -0400256 // Compressible and compressed size smaller than uncompressed: ok!
257 bLen = uint32(zn)
kesavandc71914f2022-03-25 11:19:03 +0530258 zdata = zdata[:zn]
kesavand2cde6582020-06-22 04:56:23 -0400259 } else {
260 // Uncompressed block.
261 bLen = uint32(len(data)) | compressedBlockFlag
262 zdata = data
263 }
264 if debugFlag {
265 debug("block compression to be written len=%d data len=%d", bLen, len(zdata))
266 }
267
268 // Write the block.
269 if err := z.writeUint32(bLen); err != nil {
270 return err
271 }
kesavandc71914f2022-03-25 11:19:03 +0530272 written, err := z.dst.Write(zdata)
273 if err != nil {
kesavand2cde6582020-06-22 04:56:23 -0400274 return err
275 }
kesavandc71914f2022-03-25 11:19:03 +0530276 if h := z.OnBlockDone; h != nil {
277 h(written)
278 }
kesavand2cde6582020-06-22 04:56:23 -0400279
kesavandc71914f2022-03-25 11:19:03 +0530280 if !z.BlockChecksum {
kesavand2cde6582020-06-22 04:56:23 -0400281 if debugFlag {
kesavandc71914f2022-03-25 11:19:03 +0530282 debug("current frame checksum %x", z.checksum.Sum32())
kesavand2cde6582020-06-22 04:56:23 -0400283 }
kesavandc71914f2022-03-25 11:19:03 +0530284 return nil
kesavand2cde6582020-06-22 04:56:23 -0400285 }
kesavandc71914f2022-03-25 11:19:03 +0530286 checksum := xxh32.ChecksumZero(zdata)
kesavand2cde6582020-06-22 04:56:23 -0400287 if debugFlag {
kesavandc71914f2022-03-25 11:19:03 +0530288 debug("block checksum %x", checksum)
289 defer func() { debug("current frame checksum %x", z.checksum.Sum32()) }()
kesavand2cde6582020-06-22 04:56:23 -0400290 }
kesavandc71914f2022-03-25 11:19:03 +0530291 return z.writeUint32(checksum)
kesavand2cde6582020-06-22 04:56:23 -0400292}
293
294// Flush flushes any pending compressed data to the underlying writer.
295// Flush does not return until the data has been written.
296// If the underlying writer returns an error, Flush returns that error.
297func (z *Writer) Flush() error {
298 if debugFlag {
299 debug("flush with index %d", z.idx)
300 }
301 if z.idx == 0 {
302 return nil
303 }
304
kesavandc71914f2022-03-25 11:19:03 +0530305 data := getBuffer(z.Header.BlockMaxSize)[:len(z.data[:z.idx])]
306 copy(data, z.data[:z.idx])
307
308 z.idx = 0
309 if z.c == nil {
310 return z.compressBlock(data)
311 }
312 if !z.NoChecksum {
313 _, _ = z.checksum.Write(data)
314 }
315 c := make(chan zResult)
316 z.c <- c
317 writerCompressBlock(c, z.Header, data)
318 return nil
319}
320
321func (z *Writer) close() error {
322 if z.c == nil {
323 return nil
324 }
325 // Send a sentinel block (no data to compress) to terminate the writer main goroutine.
326 c := make(chan zResult)
327 z.c <- c
328 c <- zResult{}
329 // Wait for the main goroutine to complete.
330 <-c
331 // At this point the main goroutine has shut down or is about to return.
332 z.c = nil
333 return z.err
kesavand2cde6582020-06-22 04:56:23 -0400334}
335
336// Close closes the Writer, flushing any unwritten data to the underlying io.Writer, but does not close the underlying io.Writer.
337func (z *Writer) Close() error {
338 if !z.Header.done {
339 if err := z.writeHeader(); err != nil {
340 return err
341 }
342 }
kesavand2cde6582020-06-22 04:56:23 -0400343 if err := z.Flush(); err != nil {
344 return err
345 }
kesavandc71914f2022-03-25 11:19:03 +0530346 if err := z.close(); err != nil {
347 return err
348 }
349 z.freeBuffers()
kesavand2cde6582020-06-22 04:56:23 -0400350
351 if debugFlag {
352 debug("writing last empty block")
353 }
354 if err := z.writeUint32(0); err != nil {
355 return err
356 }
kesavandc71914f2022-03-25 11:19:03 +0530357 if z.NoChecksum {
358 return nil
kesavand2cde6582020-06-22 04:56:23 -0400359 }
kesavandc71914f2022-03-25 11:19:03 +0530360 checksum := z.checksum.Sum32()
361 if debugFlag {
362 debug("stream checksum %x", checksum)
363 }
364 return z.writeUint32(checksum)
kesavand2cde6582020-06-22 04:56:23 -0400365}
366
367// Reset clears the state of the Writer z such that it is equivalent to its
368// initial state from NewWriter, but instead writing to w.
369// No access to the underlying io.Writer is performed.
370func (z *Writer) Reset(w io.Writer) {
kesavandc71914f2022-03-25 11:19:03 +0530371 n := cap(z.c)
372 _ = z.close()
373 z.freeBuffers()
374 z.Header.Reset()
kesavand2cde6582020-06-22 04:56:23 -0400375 z.dst = w
376 z.checksum.Reset()
kesavand2cde6582020-06-22 04:56:23 -0400377 z.idx = 0
kesavandc71914f2022-03-25 11:19:03 +0530378 z.err = nil
379 // reset hashtable to ensure deterministic output.
380 for i := range z.hashtable {
381 z.hashtable[i] = 0
382 }
383 z.WithConcurrency(n)
kesavand2cde6582020-06-22 04:56:23 -0400384}
385
386// writeUint32 writes a uint32 to the underlying writer.
387func (z *Writer) writeUint32(x uint32) error {
388 buf := z.buf[:4]
389 binary.LittleEndian.PutUint32(buf, x)
390 _, err := z.dst.Write(buf)
391 return err
392}
kesavandc71914f2022-03-25 11:19:03 +0530393
394// writerCompressBlock compresses data into a pooled buffer and writes its result
395// out to the input channel.
396func writerCompressBlock(c chan zResult, header Header, data []byte) {
397 zdata := getBuffer(header.BlockMaxSize)
398 // The compressed block size cannot exceed the input's.
399 var zn int
400 if level := header.CompressionLevel; level != 0 {
401 zn, _ = CompressBlockHC(data, zdata, level)
402 } else {
403 var hashTable [winSize]int
404 zn, _ = CompressBlock(data, zdata, hashTable[:])
405 }
406 var res zResult
407 if zn > 0 && zn < len(data) {
408 res.size = uint32(zn)
409 res.data = zdata[:zn]
410 // release the uncompressed block since it is not used anymore
411 putBuffer(header.BlockMaxSize, data)
412 } else {
413 res.size = uint32(len(data)) | compressedBlockFlag
414 res.data = data
415 // release the compressed block since it was not used
416 putBuffer(header.BlockMaxSize, zdata)
417 }
418 if header.BlockChecksum {
419 res.checksum = xxh32.ChecksumZero(res.data)
420 }
421 c <- res
422}