blob: 6a60a9a6a57c84cbf05f05dac2f145db1f3141c2 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001package lz4
2
3import (
4 "encoding/binary"
5 "fmt"
6 "io"
khenaidoo106c61a2021-08-11 18:05:46 -04007 "runtime"
William Kurkianea869482019-04-09 15:16:11 -04008
9 "github.com/pierrec/lz4/internal/xxh32"
10)
11
khenaidoo106c61a2021-08-11 18:05:46 -040012// zResult contains the results of compressing a block.
13type zResult struct {
14 size uint32 // Block header
15 data []byte // Compressed data
16 checksum uint32 // Data checksum
17}
18
William Kurkianea869482019-04-09 15:16:11 -040019// Writer implements the LZ4 frame encoder.
20type Writer struct {
21 Header
Abhilash S.L3b494632019-07-16 15:51:09 +053022 // Handler called when a block has been successfully written out.
23 // It provides the number of bytes written.
24 OnBlockDone func(size int)
William Kurkianea869482019-04-09 15:16:11 -040025
26 buf [19]byte // magic number(4) + header(flags(2)+[Size(8)+DictID(4)]+checksum(1)) does not exceed 19 bytes
27 dst io.Writer // Destination.
28 checksum xxh32.XXHZero // Frame checksum.
khenaidoo106c61a2021-08-11 18:05:46 -040029 data []byte // Data to be compressed + buffer for compressed data.
William Kurkianea869482019-04-09 15:16:11 -040030 idx int // Index into data.
31 hashtable [winSize]int // Hash table used in CompressBlock().
khenaidoo106c61a2021-08-11 18:05:46 -040032
33 // For concurrency.
34 c chan chan zResult // Channel for block compression goroutines and writer goroutine.
35 err error // Any error encountered while writing to the underlying destination.
William Kurkianea869482019-04-09 15:16:11 -040036}
37
38// NewWriter returns a new LZ4 frame encoder.
39// No access to the underlying io.Writer is performed.
40// The supplied Header is checked at the first Write.
41// It is ok to change it before the first Write but then not until a Reset() is performed.
42func NewWriter(dst io.Writer) *Writer {
khenaidoo106c61a2021-08-11 18:05:46 -040043 z := new(Writer)
44 z.Reset(dst)
45 return z
46}
47
48// WithConcurrency sets the number of concurrent go routines used for compression.
49// A negative value sets the concurrency to GOMAXPROCS.
50func (z *Writer) WithConcurrency(n int) *Writer {
51 switch {
52 case n == 0 || n == 1:
53 z.c = nil
54 return z
55 case n < 0:
56 n = runtime.GOMAXPROCS(0)
57 }
58 z.c = make(chan chan zResult, n)
59 // Writer goroutine managing concurrent block compression goroutines.
60 go func() {
61 // Process next block compression item.
62 for c := range z.c {
63 // Read the next compressed block result.
64 // Waiting here ensures that the blocks are output in the order they were sent.
65 // The incoming channel is always closed as it indicates to the caller that
66 // the block has been processed.
67 res := <-c
68 n := len(res.data)
69 if n == 0 {
70 // Notify the block compression routine that we are done with its result.
71 // This is used when a sentinel block is sent to terminate the compression.
72 close(c)
73 return
74 }
75 // Write the block.
76 if err := z.writeUint32(res.size); err != nil && z.err == nil {
77 z.err = err
78 }
79 if _, err := z.dst.Write(res.data); err != nil && z.err == nil {
80 z.err = err
81 }
82 if z.BlockChecksum {
83 if err := z.writeUint32(res.checksum); err != nil && z.err == nil {
84 z.err = err
85 }
86 }
87 if isCompressed := res.size&compressedBlockFlag == 0; isCompressed {
88 // It is now safe to release the buffer as no longer in use by any goroutine.
89 putBuffer(cap(res.data), res.data)
90 }
91 if h := z.OnBlockDone; h != nil {
92 h(n)
93 }
94 close(c)
95 }
96 }()
97 return z
98}
99
100// newBuffers instantiates new buffers which size matches the one in Header.
101// The returned buffers are for decompression and compression respectively.
102func (z *Writer) newBuffers() {
103 bSize := z.Header.BlockMaxSize
104 buf := getBuffer(bSize)
105 z.data = buf[:bSize] // Uncompressed buffer is the first half.
106}
107
108// freeBuffers puts the writer's buffers back to the pool.
109func (z *Writer) freeBuffers() {
110 // Put the buffer back into the pool, if any.
111 putBuffer(z.Header.BlockMaxSize, z.data)
112 z.data = nil
William Kurkianea869482019-04-09 15:16:11 -0400113}
114
115// writeHeader builds and writes the header (magic+header) to the underlying io.Writer.
116func (z *Writer) writeHeader() error {
117 // Default to 4Mb if BlockMaxSize is not set.
118 if z.Header.BlockMaxSize == 0 {
khenaidoo106c61a2021-08-11 18:05:46 -0400119 z.Header.BlockMaxSize = blockSize4M
William Kurkianea869482019-04-09 15:16:11 -0400120 }
121 // The only option that needs to be validated.
122 bSize := z.Header.BlockMaxSize
khenaidoo106c61a2021-08-11 18:05:46 -0400123 if !isValidBlockSize(z.Header.BlockMaxSize) {
William Kurkianea869482019-04-09 15:16:11 -0400124 return fmt.Errorf("lz4: invalid block max size: %d", bSize)
125 }
126 // Allocate the compressed/uncompressed buffers.
127 // The compressed buffer cannot exceed the uncompressed one.
khenaidoo106c61a2021-08-11 18:05:46 -0400128 z.newBuffers()
William Kurkianea869482019-04-09 15:16:11 -0400129 z.idx = 0
130
131 // Size is optional.
132 buf := z.buf[:]
133
134 // Set the fixed size data: magic number, block max size and flags.
135 binary.LittleEndian.PutUint32(buf[0:], frameMagic)
136 flg := byte(Version << 6)
137 flg |= 1 << 5 // No block dependency.
138 if z.Header.BlockChecksum {
139 flg |= 1 << 4
140 }
141 if z.Header.Size > 0 {
142 flg |= 1 << 3
143 }
144 if !z.Header.NoChecksum {
145 flg |= 1 << 2
146 }
147 buf[4] = flg
khenaidoo106c61a2021-08-11 18:05:46 -0400148 buf[5] = blockSizeValueToIndex(z.Header.BlockMaxSize) << 4
William Kurkianea869482019-04-09 15:16:11 -0400149
150 // Current buffer size: magic(4) + flags(1) + block max size (1).
151 n := 6
152 // Optional items.
153 if z.Header.Size > 0 {
154 binary.LittleEndian.PutUint64(buf[n:], z.Header.Size)
155 n += 8
156 }
157
158 // The header checksum includes the flags, block max size and optional Size.
159 buf[n] = byte(xxh32.ChecksumZero(buf[4:n]) >> 8 & 0xFF)
160 z.checksum.Reset()
161
162 // Header ready, write it out.
163 if _, err := z.dst.Write(buf[0 : n+1]); err != nil {
164 return err
165 }
166 z.Header.done = true
167 if debugFlag {
168 debug("wrote header %v", z.Header)
169 }
170
171 return nil
172}
173
174// Write compresses data from the supplied buffer into the underlying io.Writer.
175// Write does not return until the data has been written.
176func (z *Writer) Write(buf []byte) (int, error) {
177 if !z.Header.done {
178 if err := z.writeHeader(); err != nil {
179 return 0, err
180 }
181 }
182 if debugFlag {
183 debug("input buffer len=%d index=%d", len(buf), z.idx)
184 }
185
186 zn := len(z.data)
187 var n int
188 for len(buf) > 0 {
189 if z.idx == 0 && len(buf) >= zn {
190 // Avoid a copy as there is enough data for a block.
191 if err := z.compressBlock(buf[:zn]); err != nil {
192 return n, err
193 }
194 n += zn
195 buf = buf[zn:]
196 continue
197 }
198 // Accumulate the data to be compressed.
199 m := copy(z.data[z.idx:], buf)
200 n += m
201 z.idx += m
202 buf = buf[m:]
203 if debugFlag {
204 debug("%d bytes copied to buf, current index %d", n, z.idx)
205 }
206
207 if z.idx < len(z.data) {
208 // Buffer not filled.
209 if debugFlag {
210 debug("need more data for compression")
211 }
212 return n, nil
213 }
214
215 // Buffer full.
216 if err := z.compressBlock(z.data); err != nil {
217 return n, err
218 }
219 z.idx = 0
220 }
221
222 return n, nil
223}
224
225// compressBlock compresses a block.
226func (z *Writer) compressBlock(data []byte) error {
227 if !z.NoChecksum {
khenaidoo106c61a2021-08-11 18:05:46 -0400228 _, _ = z.checksum.Write(data)
William Kurkianea869482019-04-09 15:16:11 -0400229 }
230
khenaidoo106c61a2021-08-11 18:05:46 -0400231 if z.c != nil {
232 c := make(chan zResult)
233 z.c <- c // Send now to guarantee order
234 go writerCompressBlock(c, z.Header, data)
235 return nil
236 }
237
238 zdata := z.data[z.Header.BlockMaxSize:cap(z.data)]
William Kurkianea869482019-04-09 15:16:11 -0400239 // The compressed block size cannot exceed the input's.
240 var zn int
William Kurkianea869482019-04-09 15:16:11 -0400241
242 if level := z.Header.CompressionLevel; level != 0 {
khenaidoo106c61a2021-08-11 18:05:46 -0400243 zn, _ = CompressBlockHC(data, zdata, level)
William Kurkianea869482019-04-09 15:16:11 -0400244 } else {
khenaidoo106c61a2021-08-11 18:05:46 -0400245 zn, _ = CompressBlock(data, zdata, z.hashtable[:])
William Kurkianea869482019-04-09 15:16:11 -0400246 }
247
William Kurkianea869482019-04-09 15:16:11 -0400248 var bLen uint32
249 if debugFlag {
250 debug("block compression %d => %d", len(data), zn)
251 }
khenaidoo106c61a2021-08-11 18:05:46 -0400252 if zn > 0 && zn < len(data) {
William Kurkianea869482019-04-09 15:16:11 -0400253 // Compressible and compressed size smaller than uncompressed: ok!
254 bLen = uint32(zn)
khenaidoo106c61a2021-08-11 18:05:46 -0400255 zdata = zdata[:zn]
William Kurkianea869482019-04-09 15:16:11 -0400256 } else {
257 // Uncompressed block.
258 bLen = uint32(len(data)) | compressedBlockFlag
259 zdata = data
260 }
261 if debugFlag {
262 debug("block compression to be written len=%d data len=%d", bLen, len(zdata))
263 }
264
265 // Write the block.
266 if err := z.writeUint32(bLen); err != nil {
267 return err
268 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530269 written, err := z.dst.Write(zdata)
270 if err != nil {
William Kurkianea869482019-04-09 15:16:11 -0400271 return err
272 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530273 if h := z.OnBlockDone; h != nil {
274 h(written)
275 }
William Kurkianea869482019-04-09 15:16:11 -0400276
Abhilash S.L3b494632019-07-16 15:51:09 +0530277 if !z.BlockChecksum {
William Kurkianea869482019-04-09 15:16:11 -0400278 if debugFlag {
Abhilash S.L3b494632019-07-16 15:51:09 +0530279 debug("current frame checksum %x", z.checksum.Sum32())
William Kurkianea869482019-04-09 15:16:11 -0400280 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530281 return nil
William Kurkianea869482019-04-09 15:16:11 -0400282 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530283 checksum := xxh32.ChecksumZero(zdata)
William Kurkianea869482019-04-09 15:16:11 -0400284 if debugFlag {
Abhilash S.L3b494632019-07-16 15:51:09 +0530285 debug("block checksum %x", checksum)
286 defer func() { debug("current frame checksum %x", z.checksum.Sum32()) }()
William Kurkianea869482019-04-09 15:16:11 -0400287 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530288 return z.writeUint32(checksum)
William Kurkianea869482019-04-09 15:16:11 -0400289}
290
291// Flush flushes any pending compressed data to the underlying writer.
292// Flush does not return until the data has been written.
293// If the underlying writer returns an error, Flush returns that error.
294func (z *Writer) Flush() error {
295 if debugFlag {
296 debug("flush with index %d", z.idx)
297 }
298 if z.idx == 0 {
299 return nil
300 }
301
khenaidoo106c61a2021-08-11 18:05:46 -0400302 data := z.data[:z.idx]
Abhilash S.L3b494632019-07-16 15:51:09 +0530303 z.idx = 0
khenaidoo106c61a2021-08-11 18:05:46 -0400304 if z.c == nil {
305 return z.compressBlock(data)
306 }
307 if !z.NoChecksum {
308 _, _ = z.checksum.Write(data)
309 }
310 c := make(chan zResult)
311 z.c <- c
312 writerCompressBlock(c, z.Header, data)
Abhilash S.L3b494632019-07-16 15:51:09 +0530313 return nil
William Kurkianea869482019-04-09 15:16:11 -0400314}
315
khenaidoo106c61a2021-08-11 18:05:46 -0400316func (z *Writer) close() error {
317 if z.c == nil {
318 return nil
319 }
320 // Send a sentinel block (no data to compress) to terminate the writer main goroutine.
321 c := make(chan zResult)
322 z.c <- c
323 c <- zResult{}
324 // Wait for the main goroutine to complete.
325 <-c
326 // At this point the main goroutine has shut down or is about to return.
327 z.c = nil
328 return z.err
329}
330
William Kurkianea869482019-04-09 15:16:11 -0400331// Close closes the Writer, flushing any unwritten data to the underlying io.Writer, but does not close the underlying io.Writer.
332func (z *Writer) Close() error {
333 if !z.Header.done {
334 if err := z.writeHeader(); err != nil {
335 return err
336 }
337 }
William Kurkianea869482019-04-09 15:16:11 -0400338 if err := z.Flush(); err != nil {
339 return err
340 }
khenaidoo106c61a2021-08-11 18:05:46 -0400341 if err := z.close(); err != nil {
342 return err
343 }
344 z.freeBuffers()
William Kurkianea869482019-04-09 15:16:11 -0400345
346 if debugFlag {
347 debug("writing last empty block")
348 }
349 if err := z.writeUint32(0); err != nil {
350 return err
351 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530352 if z.NoChecksum {
353 return nil
William Kurkianea869482019-04-09 15:16:11 -0400354 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530355 checksum := z.checksum.Sum32()
356 if debugFlag {
357 debug("stream checksum %x", checksum)
358 }
359 return z.writeUint32(checksum)
William Kurkianea869482019-04-09 15:16:11 -0400360}
361
362// Reset clears the state of the Writer z such that it is equivalent to its
363// initial state from NewWriter, but instead writing to w.
364// No access to the underlying io.Writer is performed.
365func (z *Writer) Reset(w io.Writer) {
khenaidoo106c61a2021-08-11 18:05:46 -0400366 n := cap(z.c)
367 _ = z.close()
368 z.freeBuffers()
369 z.Header.Reset()
William Kurkianea869482019-04-09 15:16:11 -0400370 z.dst = w
371 z.checksum.Reset()
William Kurkianea869482019-04-09 15:16:11 -0400372 z.idx = 0
khenaidoo106c61a2021-08-11 18:05:46 -0400373 z.err = nil
374 // reset hashtable to ensure deterministic output.
375 for i := range z.hashtable {
376 z.hashtable[i] = 0
377 }
378 z.WithConcurrency(n)
William Kurkianea869482019-04-09 15:16:11 -0400379}
380
381// writeUint32 writes a uint32 to the underlying writer.
382func (z *Writer) writeUint32(x uint32) error {
383 buf := z.buf[:4]
384 binary.LittleEndian.PutUint32(buf, x)
385 _, err := z.dst.Write(buf)
386 return err
387}
khenaidoo106c61a2021-08-11 18:05:46 -0400388
389// writerCompressBlock compresses data into a pooled buffer and writes its result
390// out to the input channel.
391func writerCompressBlock(c chan zResult, header Header, data []byte) {
392 zdata := getBuffer(header.BlockMaxSize)
393 // The compressed block size cannot exceed the input's.
394 var zn int
395 if level := header.CompressionLevel; level != 0 {
396 zn, _ = CompressBlockHC(data, zdata, level)
397 } else {
398 var hashTable [winSize]int
399 zn, _ = CompressBlock(data, zdata, hashTable[:])
400 }
401 var res zResult
402 if zn > 0 && zn < len(data) {
403 res.size = uint32(zn)
404 res.data = zdata[:zn]
405 } else {
406 res.size = uint32(len(data)) | compressedBlockFlag
407 res.data = data
408 }
409 if header.BlockChecksum {
410 res.checksum = xxh32.ChecksumZero(res.data)
411 }
412 c <- res
413}