blob: 324f1386b8ad83a4970296a803bf3a846b64c184 [file] [log] [blame]
Pragya Arya324337e2020-02-20 14:35:08 +05301package lz4
2
3import (
4 "encoding/binary"
5 "fmt"
6 "github.com/pierrec/lz4/internal/xxh32"
7 "io"
8 "runtime"
9)
10
11// zResult contains the results of compressing a block.
12type zResult struct {
13 size uint32 // Block header
14 data []byte // Compressed data
15 checksum uint32 // Data checksum
16}
17
18// Writer implements the LZ4 frame encoder.
19type Writer struct {
20 Header
21 // Handler called when a block has been successfully written out.
22 // It provides the number of bytes written.
23 OnBlockDone func(size int)
24
25 buf [19]byte // magic number(4) + header(flags(2)+[Size(8)+DictID(4)]+checksum(1)) does not exceed 19 bytes
26 dst io.Writer // Destination.
27 checksum xxh32.XXHZero // Frame checksum.
28 data []byte // Data to be compressed + buffer for compressed data.
29 idx int // Index into data.
30 hashtable [winSize]int // Hash table used in CompressBlock().
31
32 // For concurrency.
33 c chan chan zResult // Channel for block compression goroutines and writer goroutine.
34 err error // Any error encountered while writing to the underlying destination.
35}
36
37// NewWriter returns a new LZ4 frame encoder.
38// No access to the underlying io.Writer is performed.
39// The supplied Header is checked at the first Write.
40// It is ok to change it before the first Write but then not until a Reset() is performed.
41func NewWriter(dst io.Writer) *Writer {
42 z := new(Writer)
43 z.Reset(dst)
44 return z
45}
46
47// WithConcurrency sets the number of concurrent go routines used for compression.
48// A negative value sets the concurrency to GOMAXPROCS.
49func (z *Writer) WithConcurrency(n int) *Writer {
50 switch {
51 case n == 0 || n == 1:
52 z.c = nil
53 return z
54 case n < 0:
55 n = runtime.GOMAXPROCS(0)
56 }
57 z.c = make(chan chan zResult, n)
58 // Writer goroutine managing concurrent block compression goroutines.
59 go func() {
60 // Process next block compression item.
61 for c := range z.c {
62 // Read the next compressed block result.
63 // Waiting here ensures that the blocks are output in the order they were sent.
64 // The incoming channel is always closed as it indicates to the caller that
65 // the block has been processed.
66 res := <-c
67 n := len(res.data)
68 if n == 0 {
69 // Notify the block compression routine that we are done with its result.
70 // This is used when a sentinel block is sent to terminate the compression.
71 close(c)
72 return
73 }
74 // Write the block.
75 if err := z.writeUint32(res.size); err != nil && z.err == nil {
76 z.err = err
77 }
78 if _, err := z.dst.Write(res.data); err != nil && z.err == nil {
79 z.err = err
80 }
81 if z.BlockChecksum {
82 if err := z.writeUint32(res.checksum); err != nil && z.err == nil {
83 z.err = err
84 }
85 }
86 if isCompressed := res.size&compressedBlockFlag == 0; isCompressed {
87 // It is now safe to release the buffer as no longer in use by any goroutine.
88 putBuffer(cap(res.data), res.data)
89 }
90 if h := z.OnBlockDone; h != nil {
91 h(n)
92 }
93 close(c)
94 }
95 }()
96 return z
97}
98
99// newBuffers instantiates new buffers which size matches the one in Header.
100// The returned buffers are for decompression and compression respectively.
101func (z *Writer) newBuffers() {
102 bSize := z.Header.BlockMaxSize
103 buf := getBuffer(bSize)
104 z.data = buf[:bSize] // Uncompressed buffer is the first half.
105}
106
107// freeBuffers puts the writer's buffers back to the pool.
108func (z *Writer) freeBuffers() {
109 // Put the buffer back into the pool, if any.
110 putBuffer(z.Header.BlockMaxSize, z.data)
111 z.data = nil
112}
113
114// writeHeader builds and writes the header (magic+header) to the underlying io.Writer.
115func (z *Writer) writeHeader() error {
116 // Default to 4Mb if BlockMaxSize is not set.
117 if z.Header.BlockMaxSize == 0 {
118 z.Header.BlockMaxSize = blockSize4M
119 }
120 // The only option that needs to be validated.
121 bSize := z.Header.BlockMaxSize
122 if !isValidBlockSize(z.Header.BlockMaxSize) {
123 return fmt.Errorf("lz4: invalid block max size: %d", bSize)
124 }
125 // Allocate the compressed/uncompressed buffers.
126 // The compressed buffer cannot exceed the uncompressed one.
127 z.newBuffers()
128 z.idx = 0
129
130 // Size is optional.
131 buf := z.buf[:]
132
133 // Set the fixed size data: magic number, block max size and flags.
134 binary.LittleEndian.PutUint32(buf[0:], frameMagic)
135 flg := byte(Version << 6)
136 flg |= 1 << 5 // No block dependency.
137 if z.Header.BlockChecksum {
138 flg |= 1 << 4
139 }
140 if z.Header.Size > 0 {
141 flg |= 1 << 3
142 }
143 if !z.Header.NoChecksum {
144 flg |= 1 << 2
145 }
146 buf[4] = flg
147 buf[5] = blockSizeValueToIndex(z.Header.BlockMaxSize) << 4
148
149 // Current buffer size: magic(4) + flags(1) + block max size (1).
150 n := 6
151 // Optional items.
152 if z.Header.Size > 0 {
153 binary.LittleEndian.PutUint64(buf[n:], z.Header.Size)
154 n += 8
155 }
156
157 // The header checksum includes the flags, block max size and optional Size.
158 buf[n] = byte(xxh32.ChecksumZero(buf[4:n]) >> 8 & 0xFF)
159 z.checksum.Reset()
160
161 // Header ready, write it out.
162 if _, err := z.dst.Write(buf[0 : n+1]); err != nil {
163 return err
164 }
165 z.Header.done = true
166 if debugFlag {
167 debug("wrote header %v", z.Header)
168 }
169
170 return nil
171}
172
173// Write compresses data from the supplied buffer into the underlying io.Writer.
174// Write does not return until the data has been written.
175func (z *Writer) Write(buf []byte) (int, error) {
176 if !z.Header.done {
177 if err := z.writeHeader(); err != nil {
178 return 0, err
179 }
180 }
181 if debugFlag {
182 debug("input buffer len=%d index=%d", len(buf), z.idx)
183 }
184
185 zn := len(z.data)
186 var n int
187 for len(buf) > 0 {
188 if z.idx == 0 && len(buf) >= zn {
189 // Avoid a copy as there is enough data for a block.
190 if err := z.compressBlock(buf[:zn]); err != nil {
191 return n, err
192 }
193 n += zn
194 buf = buf[zn:]
195 continue
196 }
197 // Accumulate the data to be compressed.
198 m := copy(z.data[z.idx:], buf)
199 n += m
200 z.idx += m
201 buf = buf[m:]
202 if debugFlag {
203 debug("%d bytes copied to buf, current index %d", n, z.idx)
204 }
205
206 if z.idx < len(z.data) {
207 // Buffer not filled.
208 if debugFlag {
209 debug("need more data for compression")
210 }
211 return n, nil
212 }
213
214 // Buffer full.
215 if err := z.compressBlock(z.data); err != nil {
216 return n, err
217 }
218 z.idx = 0
219 }
220
221 return n, nil
222}
223
224// compressBlock compresses a block.
225func (z *Writer) compressBlock(data []byte) error {
226 if !z.NoChecksum {
227 _, _ = z.checksum.Write(data)
228 }
229
230 if z.c != nil {
231 c := make(chan zResult)
232 z.c <- c // Send now to guarantee order
233 go writerCompressBlock(c, z.Header, data)
234 return nil
235 }
236
237 zdata := z.data[z.Header.BlockMaxSize:cap(z.data)]
238 // The compressed block size cannot exceed the input's.
239 var zn int
240
241 if level := z.Header.CompressionLevel; level != 0 {
242 zn, _ = CompressBlockHC(data, zdata, level)
243 } else {
244 zn, _ = CompressBlock(data, zdata, z.hashtable[:])
245 }
246
247 var bLen uint32
248 if debugFlag {
249 debug("block compression %d => %d", len(data), zn)
250 }
251 if zn > 0 && zn < len(data) {
252 // Compressible and compressed size smaller than uncompressed: ok!
253 bLen = uint32(zn)
254 zdata = zdata[:zn]
255 } else {
256 // Uncompressed block.
257 bLen = uint32(len(data)) | compressedBlockFlag
258 zdata = data
259 }
260 if debugFlag {
261 debug("block compression to be written len=%d data len=%d", bLen, len(zdata))
262 }
263
264 // Write the block.
265 if err := z.writeUint32(bLen); err != nil {
266 return err
267 }
268 written, err := z.dst.Write(zdata)
269 if err != nil {
270 return err
271 }
272 if h := z.OnBlockDone; h != nil {
273 h(written)
274 }
275
276 if !z.BlockChecksum {
277 if debugFlag {
278 debug("current frame checksum %x", z.checksum.Sum32())
279 }
280 return nil
281 }
282 checksum := xxh32.ChecksumZero(zdata)
283 if debugFlag {
284 debug("block checksum %x", checksum)
285 defer func() { debug("current frame checksum %x", z.checksum.Sum32()) }()
286 }
287 return z.writeUint32(checksum)
288}
289
290// Flush flushes any pending compressed data to the underlying writer.
291// Flush does not return until the data has been written.
292// If the underlying writer returns an error, Flush returns that error.
293func (z *Writer) Flush() error {
294 if debugFlag {
295 debug("flush with index %d", z.idx)
296 }
297 if z.idx == 0 {
298 return nil
299 }
300
301 data := z.data[:z.idx]
302 z.idx = 0
303 if z.c == nil {
304 return z.compressBlock(data)
305 }
306 if !z.NoChecksum {
307 _, _ = z.checksum.Write(data)
308 }
309 c := make(chan zResult)
310 z.c <- c
311 writerCompressBlock(c, z.Header, data)
312 return nil
313}
314
315func (z *Writer) close() error {
316 if z.c == nil {
317 return nil
318 }
319 // Send a sentinel block (no data to compress) to terminate the writer main goroutine.
320 c := make(chan zResult)
321 z.c <- c
322 c <- zResult{}
323 // Wait for the main goroutine to complete.
324 <-c
325 // At this point the main goroutine has shut down or is about to return.
326 z.c = nil
327 return z.err
328}
329
330// Close closes the Writer, flushing any unwritten data to the underlying io.Writer, but does not close the underlying io.Writer.
331func (z *Writer) Close() error {
332 if !z.Header.done {
333 if err := z.writeHeader(); err != nil {
334 return err
335 }
336 }
337 if err := z.Flush(); err != nil {
338 return err
339 }
340 if err := z.close(); err != nil {
341 return err
342 }
343 z.freeBuffers()
344
345 if debugFlag {
346 debug("writing last empty block")
347 }
348 if err := z.writeUint32(0); err != nil {
349 return err
350 }
351 if z.NoChecksum {
352 return nil
353 }
354 checksum := z.checksum.Sum32()
355 if debugFlag {
356 debug("stream checksum %x", checksum)
357 }
358 return z.writeUint32(checksum)
359}
360
361// Reset clears the state of the Writer z such that it is equivalent to its
362// initial state from NewWriter, but instead writing to w.
363// No access to the underlying io.Writer is performed.
364func (z *Writer) Reset(w io.Writer) {
365 n := cap(z.c)
366 _ = z.close()
367 z.freeBuffers()
368 z.Header.Reset()
369 z.dst = w
370 z.checksum.Reset()
371 z.idx = 0
372 z.err = nil
373 z.WithConcurrency(n)
374}
375
376// writeUint32 writes a uint32 to the underlying writer.
377func (z *Writer) writeUint32(x uint32) error {
378 buf := z.buf[:4]
379 binary.LittleEndian.PutUint32(buf, x)
380 _, err := z.dst.Write(buf)
381 return err
382}
383
384// writerCompressBlock compresses data into a pooled buffer and writes its result
385// out to the input channel.
386func writerCompressBlock(c chan zResult, header Header, data []byte) {
387 zdata := getBuffer(header.BlockMaxSize)
388 // The compressed block size cannot exceed the input's.
389 var zn int
390 if level := header.CompressionLevel; level != 0 {
391 zn, _ = CompressBlockHC(data, zdata, level)
392 } else {
393 var hashTable [winSize]int
394 zn, _ = CompressBlock(data, zdata, hashTable[:])
395 }
396 var res zResult
397 if zn > 0 && zn < len(data) {
398 res.size = uint32(zn)
399 res.data = zdata[:zn]
400 } else {
401 res.size = uint32(len(data)) | compressedBlockFlag
402 res.data = data
403 }
404 if header.BlockChecksum {
405 res.checksum = xxh32.ChecksumZero(res.data)
406 }
407 c <- res
408}