blob: f593e464b66c0bba25cad8143ff6c37ae1d0a675 [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001// Copyright 2019+ Klaus Post. All rights reserved.
2// License information can be found in the LICENSE file.
3// Based on work by Yann Collet, released under BSD License.
4
5package zstd
6
7import (
8 "errors"
9 "io"
10 "sync"
11)
12
13// Decoder provides decoding of zstandard streams.
14// The decoder has been designed to operate without allocations after a warmup.
15// This means that you should store the decoder for best performance.
16// To re-use a stream decoder, use the Reset(r io.Reader) error to switch to another stream.
17// A decoder can safely be re-used even if the previous stream failed.
18// To release the resources, you must call the Close() function on a decoder.
19type Decoder struct {
20 o decoderOptions
21
22 // Unreferenced decoders, ready for use.
23 decoders chan *blockDec
24
25 // Streams ready to be decoded.
26 stream chan decodeStream
27
28 // Current read position used for Reader functionality.
29 current decoderState
30
31 // Custom dictionaries.
32 // Always uses copies.
33 dicts map[uint32]dict
34
35 // streamWg is the waitgroup for all streams
36 streamWg sync.WaitGroup
37}
38
39// decoderState is used for maintaining state when the decoder
40// is used for streaming.
41type decoderState struct {
42 // current block being written to stream.
43 decodeOutput
44
45 // output in order to be written to stream.
46 output chan decodeOutput
47
48 // cancel remaining output.
49 cancel chan struct{}
50
51 flushed bool
52}
53
54var (
55 // Check the interfaces we want to support.
56 _ = io.WriterTo(&Decoder{})
57 _ = io.Reader(&Decoder{})
58)
59
60// NewReader creates a new decoder.
61// A nil Reader can be provided in which case Reset can be used to start a decode.
62//
63// A Decoder can be used in two modes:
64//
65// 1) As a stream, or
66// 2) For stateless decoding using DecodeAll.
67//
68// Only a single stream can be decoded concurrently, but the same decoder
69// can run multiple concurrent stateless decodes. It is even possible to
70// use stateless decodes while a stream is being decoded.
71//
72// The Reset function can be used to initiate a new stream, which is will considerably
73// reduce the allocations normally caused by NewReader.
74func NewReader(r io.Reader, opts ...DOption) (*Decoder, error) {
75 initPredefined()
76 var d Decoder
77 d.o.setDefault()
78 for _, o := range opts {
79 err := o(&d.o)
80 if err != nil {
81 return nil, err
82 }
83 }
84 d.current.output = make(chan decodeOutput, d.o.concurrent)
85 d.current.flushed = true
86
87 if r == nil {
88 d.current.err = ErrDecoderNilInput
89 }
90
91 // Transfer option dicts.
92 d.dicts = make(map[uint32]dict, len(d.o.dicts))
93 for _, dc := range d.o.dicts {
94 d.dicts[dc.id] = dc
95 }
96 d.o.dicts = nil
97
98 // Create decoders
99 d.decoders = make(chan *blockDec, d.o.concurrent)
100 for i := 0; i < d.o.concurrent; i++ {
101 dec := newBlockDec(d.o.lowMem)
102 dec.localFrame = newFrameDec(d.o)
103 d.decoders <- dec
104 }
105
106 if r == nil {
107 return &d, nil
108 }
109 return &d, d.Reset(r)
110}
111
112// Read bytes from the decompressed stream into p.
113// Returns the number of bytes written and any error that occurred.
114// When the stream is done, io.EOF will be returned.
115func (d *Decoder) Read(p []byte) (int, error) {
116 if d.stream == nil {
117 return 0, ErrDecoderNilInput
118 }
119 var n int
120 for {
121 if len(d.current.b) > 0 {
122 filled := copy(p, d.current.b)
123 p = p[filled:]
124 d.current.b = d.current.b[filled:]
125 n += filled
126 }
127 if len(p) == 0 {
128 break
129 }
130 if len(d.current.b) == 0 {
131 // We have an error and no more data
132 if d.current.err != nil {
133 break
134 }
135 if !d.nextBlock(n == 0) {
136 return n, nil
137 }
138 }
139 }
140 if len(d.current.b) > 0 {
141 if debug {
142 println("returning", n, "still bytes left:", len(d.current.b))
143 }
144 // Only return error at end of block
145 return n, nil
146 }
147 if d.current.err != nil {
148 d.drainOutput()
149 }
150 if debug {
151 println("returning", n, d.current.err, len(d.decoders))
152 }
153 return n, d.current.err
154}
155
156// Reset will reset the decoder the supplied stream after the current has finished processing.
157// Note that this functionality cannot be used after Close has been called.
158// Reset can be called with a nil reader to release references to the previous reader.
159// After being called with a nil reader, no other operations than Reset or DecodeAll or Close
160// should be used.
161func (d *Decoder) Reset(r io.Reader) error {
162 if d.current.err == ErrDecoderClosed {
163 return d.current.err
164 }
165
166 d.drainOutput()
167
168 if r == nil {
169 d.current.err = ErrDecoderNilInput
170 d.current.flushed = true
171 return nil
172 }
173
174 if d.stream == nil {
175 d.stream = make(chan decodeStream, 1)
176 d.streamWg.Add(1)
177 go d.startStreamDecoder(d.stream)
178 }
179
180 // If bytes buffer and < 1MB, do sync decoding anyway.
181 if bb, ok := r.(byter); ok && bb.Len() < 1<<20 {
182 bb2 := bb
183 if debug {
184 println("*bytes.Buffer detected, doing sync decode, len:", bb.Len())
185 }
186 b := bb2.Bytes()
187 var dst []byte
188 if cap(d.current.b) > 0 {
189 dst = d.current.b
190 }
191
192 dst, err := d.DecodeAll(b, dst[:0])
193 if err == nil {
194 err = io.EOF
195 }
196 d.current.b = dst
197 d.current.err = err
198 d.current.flushed = true
199 if debug {
200 println("sync decode to", len(dst), "bytes, err:", err)
201 }
202 return nil
203 }
204
205 // Remove current block.
206 d.current.decodeOutput = decodeOutput{}
207 d.current.err = nil
208 d.current.cancel = make(chan struct{})
209 d.current.flushed = false
210 d.current.d = nil
211
212 d.stream <- decodeStream{
213 r: r,
214 output: d.current.output,
215 cancel: d.current.cancel,
216 }
217 return nil
218}
219
220// drainOutput will drain the output until errEndOfStream is sent.
221func (d *Decoder) drainOutput() {
222 if d.current.cancel != nil {
223 println("cancelling current")
224 close(d.current.cancel)
225 d.current.cancel = nil
226 }
227 if d.current.d != nil {
228 if debug {
229 printf("re-adding current decoder %p, decoders: %d", d.current.d, len(d.decoders))
230 }
231 d.decoders <- d.current.d
232 d.current.d = nil
233 d.current.b = nil
234 }
235 if d.current.output == nil || d.current.flushed {
236 println("current already flushed")
237 return
238 }
239 for v := range d.current.output {
240 if v.d != nil {
241 if debug {
242 printf("re-adding decoder %p", v.d)
243 }
244 d.decoders <- v.d
245 }
246 if v.err == errEndOfStream {
247 println("current flushed")
248 d.current.flushed = true
249 return
250 }
251 }
252}
253
254// WriteTo writes data to w until there's no more data to write or when an error occurs.
255// The return value n is the number of bytes written.
256// Any error encountered during the write is also returned.
257func (d *Decoder) WriteTo(w io.Writer) (int64, error) {
258 if d.stream == nil {
259 return 0, ErrDecoderNilInput
260 }
261 var n int64
262 for {
263 if len(d.current.b) > 0 {
264 n2, err2 := w.Write(d.current.b)
265 n += int64(n2)
266 if err2 != nil && d.current.err == nil {
267 d.current.err = err2
268 break
269 }
270 }
271 if d.current.err != nil {
272 break
273 }
274 d.nextBlock(true)
275 }
276 err := d.current.err
277 if err != nil {
278 d.drainOutput()
279 }
280 if err == io.EOF {
281 err = nil
282 }
283 return n, err
284}
285
286// DecodeAll allows stateless decoding of a blob of bytes.
287// Output will be appended to dst, so if the destination size is known
288// you can pre-allocate the destination slice to avoid allocations.
289// DecodeAll can be used concurrently.
290// The Decoder concurrency limits will be respected.
291func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) {
292 if d.current.err == ErrDecoderClosed {
293 return dst, ErrDecoderClosed
294 }
295
296 // Grab a block decoder and frame decoder.
297 block := <-d.decoders
298 frame := block.localFrame
299 defer func() {
300 if debug {
301 printf("re-adding decoder: %p", block)
302 }
303 frame.rawInput = nil
304 frame.bBuf = nil
305 d.decoders <- block
306 }()
307 frame.bBuf = input
308
309 for {
310 frame.history.reset()
311 err := frame.reset(&frame.bBuf)
312 if err == io.EOF {
313 if debug {
314 println("frame reset return EOF")
315 }
316 return dst, nil
317 }
318 if frame.DictionaryID != nil {
319 dict, ok := d.dicts[*frame.DictionaryID]
320 if !ok {
321 return nil, ErrUnknownDictionary
322 }
323 frame.history.setDict(&dict)
324 }
325 if err != nil {
326 return dst, err
327 }
328 if frame.FrameContentSize > d.o.maxDecodedSize-uint64(len(dst)) {
329 return dst, ErrDecoderSizeExceeded
330 }
331 if frame.FrameContentSize > 0 && frame.FrameContentSize < 1<<30 {
332 // Never preallocate moe than 1 GB up front.
333 if cap(dst)-len(dst) < int(frame.FrameContentSize) {
334 dst2 := make([]byte, len(dst), len(dst)+int(frame.FrameContentSize))
335 copy(dst2, dst)
336 dst = dst2
337 }
338 }
339 if cap(dst) == 0 {
340 // Allocate len(input) * 2 by default if nothing is provided
341 // and we didn't get frame content size.
342 size := len(input) * 2
343 // Cap to 1 MB.
344 if size > 1<<20 {
345 size = 1 << 20
346 }
347 if uint64(size) > d.o.maxDecodedSize {
348 size = int(d.o.maxDecodedSize)
349 }
350 dst = make([]byte, 0, size)
351 }
352
353 dst, err = frame.runDecoder(dst, block)
354 if err != nil {
355 return dst, err
356 }
357 if len(frame.bBuf) == 0 {
358 if debug {
359 println("frame dbuf empty")
360 }
361 break
362 }
363 }
364 return dst, nil
365}
366
367// nextBlock returns the next block.
368// If an error occurs d.err will be set.
369// Optionally the function can block for new output.
370// If non-blocking mode is used the returned boolean will be false
371// if no data was available without blocking.
372func (d *Decoder) nextBlock(blocking bool) (ok bool) {
373 if d.current.d != nil {
374 if debug {
375 printf("re-adding current decoder %p", d.current.d)
376 }
377 d.decoders <- d.current.d
378 d.current.d = nil
379 }
380 if d.current.err != nil {
381 // Keep error state.
382 return blocking
383 }
384
385 if blocking {
386 d.current.decodeOutput = <-d.current.output
387 } else {
388 select {
389 case d.current.decodeOutput = <-d.current.output:
390 default:
391 return false
392 }
393 }
394 if debug {
395 println("got", len(d.current.b), "bytes, error:", d.current.err)
396 }
397 return true
398}
399
400// Close will release all resources.
401// It is NOT possible to reuse the decoder after this.
402func (d *Decoder) Close() {
403 if d.current.err == ErrDecoderClosed {
404 return
405 }
406 d.drainOutput()
407 if d.stream != nil {
408 close(d.stream)
409 d.streamWg.Wait()
410 d.stream = nil
411 }
412 if d.decoders != nil {
413 close(d.decoders)
414 for dec := range d.decoders {
415 dec.Close()
416 }
417 d.decoders = nil
418 }
419 if d.current.d != nil {
420 d.current.d.Close()
421 d.current.d = nil
422 }
423 d.current.err = ErrDecoderClosed
424}
425
426// IOReadCloser returns the decoder as an io.ReadCloser for convenience.
427// Any changes to the decoder will be reflected, so the returned ReadCloser
428// can be reused along with the decoder.
429// io.WriterTo is also supported by the returned ReadCloser.
430func (d *Decoder) IOReadCloser() io.ReadCloser {
431 return closeWrapper{d: d}
432}
433
434// closeWrapper wraps a function call as a closer.
435type closeWrapper struct {
436 d *Decoder
437}
438
439// WriteTo forwards WriteTo calls to the decoder.
440func (c closeWrapper) WriteTo(w io.Writer) (n int64, err error) {
441 return c.d.WriteTo(w)
442}
443
444// Read forwards read calls to the decoder.
445func (c closeWrapper) Read(p []byte) (n int, err error) {
446 return c.d.Read(p)
447}
448
449// Close closes the decoder.
450func (c closeWrapper) Close() error {
451 c.d.Close()
452 return nil
453}
454
455type decodeOutput struct {
456 d *blockDec
457 b []byte
458 err error
459}
460
461type decodeStream struct {
462 r io.Reader
463
464 // Blocks ready to be written to output.
465 output chan decodeOutput
466
467 // cancel reading from the input
468 cancel chan struct{}
469}
470
471// errEndOfStream indicates that everything from the stream was read.
472var errEndOfStream = errors.New("end-of-stream")
473
474// Create Decoder:
475// Spawn n block decoders. These accept tasks to decode a block.
476// Create goroutine that handles stream processing, this will send history to decoders as they are available.
477// Decoders update the history as they decode.
478// When a block is returned:
479// a) history is sent to the next decoder,
480// b) content written to CRC.
481// c) return data to WRITER.
482// d) wait for next block to return data.
483// Once WRITTEN, the decoders reused by the writer frame decoder for re-use.
484func (d *Decoder) startStreamDecoder(inStream chan decodeStream) {
485 defer d.streamWg.Done()
486 frame := newFrameDec(d.o)
487 for stream := range inStream {
488 if debug {
489 println("got new stream")
490 }
491 br := readerWrapper{r: stream.r}
492 decodeStream:
493 for {
494 frame.history.reset()
495 err := frame.reset(&br)
496 if debug && err != nil {
497 println("Frame decoder returned", err)
498 }
499 if err == nil && frame.DictionaryID != nil {
500 dict, ok := d.dicts[*frame.DictionaryID]
501 if !ok {
502 err = ErrUnknownDictionary
503 } else {
504 frame.history.setDict(&dict)
505 }
506 }
507 if err != nil {
508 stream.output <- decodeOutput{
509 err: err,
510 }
511 break
512 }
513 if debug {
514 println("starting frame decoder")
515 }
516
517 // This goroutine will forward history between frames.
518 frame.frameDone.Add(1)
519 frame.initAsync()
520
521 go frame.startDecoder(stream.output)
522 decodeFrame:
523 // Go through all blocks of the frame.
524 for {
525 dec := <-d.decoders
526 select {
527 case <-stream.cancel:
528 if !frame.sendErr(dec, io.EOF) {
529 // To not let the decoder dangle, send it back.
530 stream.output <- decodeOutput{d: dec}
531 }
532 break decodeStream
533 default:
534 }
535 err := frame.next(dec)
536 switch err {
537 case io.EOF:
538 // End of current frame, no error
539 println("EOF on next block")
540 break decodeFrame
541 case nil:
542 continue
543 default:
544 println("block decoder returned", err)
545 break decodeStream
546 }
547 }
548 // All blocks have started decoding, check if there are more frames.
549 println("waiting for done")
550 frame.frameDone.Wait()
551 println("done waiting...")
552 }
553 frame.frameDone.Wait()
554 println("Sending EOS")
555 stream.output <- decodeOutput{err: errEndOfStream}
556 }
557}